Push openflow RMs
[nepi.git] / src / nepi / resources / planetlab / openvswitch / tunnel.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #             Alexandros Kouvakas <alexandros.kouvakas@gmail.com>
20
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState 
23 from nepi.resources.linux.application import LinuxApplication
24 from nepi.resources.planetlab.node import PlanetlabNode            
25 from nepi.resources.planetlab.openvswitch.ovs import OVSWitch   
26 from nepi.util.timefuncs import tnow, tdiffsec     
27
28 import os
29 import time
30 import socket
31
32
33 reschedule_delay = "0.5s"
34
35 @clsinit_copy                 
36 class Tunnel(LinuxApplication):
37     """
38     .. class:: Class Args :
39       
40         :param ec: The Experiment controller
41         :type ec: ExperimentController
42         :param guid: guid of the RM
43         :type guid: int
44         :param creds: Credentials to communicate with the rm 
45         :type creds: dict
46
47     """
48     
49     _rtype = "Tunnel"
50     _authorized_connections = ["OVSPort", "PlanetlabTap"]    
51
52     @classmethod
53     def _register_attributes(cls):
54         """ Register the attributes of Connection RM 
55
56         """
57         cipher = Attribute("cipher",
58                "Cipher to encript communication. "
59                 "One of PLAIN, AES, Blowfish, DES, DES3. ",
60                 default = None,
61                 allowed = ["PLAIN", "AES", "Blowfish", "DES", "DES3"],
62                 type = Types.Enumerate, 
63                 flags = Flags.ExecReadOnly)
64
65         cipher_key = Attribute("cipherKey",
66                 "Specify a symmetric encryption key with which to protect "
67                 "packets across the tunnel. python-crypto must be installed "
68                 "on the system." ,
69                 flags = Flags.ExecReadOnly)
70
71         txqueuelen = Attribute("txQueueLen",
72                 "Specifies the interface's transmission queue length. "
73                 "Defaults to 1000. ", 
74                 type = Types.Integer, 
75                 flags = Flags.ExecReadOnly)
76
77         bwlimit = Attribute("bwLimit",
78                 "Specifies the interface's emulated bandwidth in bytes "
79                 "per second.",
80                 type = Types.Integer, 
81                 flags = Flags.ExecReadOnly)
82
83         cls._register_attribute(cipher)
84         cls._register_attribute(cipher_key)
85         cls._register_attribute(txqueuelen)
86         cls._register_attribute(bwlimit)
87
88     def __init__(self, ec, guid):
89         """
90         :param ec: The Experiment controller
91         :type ec: ExperimentController
92         :param guid: guid of the RM
93         :type guid: int
94     
95         """
96         super(Tunnel, self).__init__(ec, guid)
97         self._home = "tunnel-%s" % self.guid
98         self.port_info_tunl = []
99         self._nodes = []
100         self._pid = None
101         self._ppid = None
102
103     @property
104     def node(self):
105         return self._nodes[0]
106
107     def app_home(self, node):
108         return os.path.join(node.exp_home, self._home)
109
110     def run_home(self, node):
111         return os.path.join(self.app_home(node), self.ec.run_id)
112
113     def port_endpoints(self):
114         # Switch-Switch connection
115         connected = []
116         for guid in self.connections:
117             rm = self.ec.get_resource(guid)
118             if hasattr(rm, "create_port"):
119                 connected.append(rm)
120         return connected
121
122     def mixed_endpoints(self):
123         # Switch-Host connection
124         connected = [1, 2]
125         for guid in self.connections:
126             rm = self.ec.get_resource(guid)
127             if hasattr(rm, "create_port"):
128                 connected[0] = rm
129             elif hasattr(rm, "udp_connect_command"):
130                 connected[1] = rm
131         return connected
132
133     def get_node(self, endpoint):
134         # Get connected to the nodes
135         if hasattr(endpoint, "create_port"):
136             res = []
137             rm_list = endpoint.get_connected(OVSWitch.rtype())
138             if rm_list:
139                 rm = rm_list[0].get_connected(PlanetlabNode.rtype())
140                 if rm: 
141                     res.append(rm[0])
142             return res
143         else:
144             res = []
145             rm = endpoint.get_connected(PlanetlabNode.rtype())
146             if rm :
147                 res.append(rm[0])
148             return res
149
150     @property
151     def endpoint1(self):
152         if self.check_endpoints():
153             port_endpoints = self.port_endpoints()
154             if port_endpoints: return port_endpoints[0]
155         else:
156             mixed_endpoints = self.mixed_endpoints()
157             if mixed_endpoints: return mixed_endpoints[0]
158
159     @property
160     def endpoint2(self):
161         if self.check_endpoints():
162             port_endpoints = self.port_endpoints()
163             if port_endpoints: return port_endpoints[1]
164         else:
165             mixed_endpoints = self.mixed_endpoints()
166             if mixed_endpoints: return mixed_endpoints[1]
167                 
168     def check_endpoints(self):
169         """ Check if the links are between switches
170             or switch-host. Return False for latter.
171         """
172         port_endpoints = self.port_endpoints()
173         if len(port_endpoints) == 2:
174             return True
175         else: 
176             return False
177
178     def get_port_info(self, endpoint, rem_endpoint):
179         """ Retrieve the port_info list for each port
180         
181             :param port_info_tunl: [hostname, publ_IP_addr, port_name,
182                 virtual_ip, local_port_Numb]
183             :type port_info_tunl: list
184         """
185         self.port_info_tunl = []
186         if self.check_endpoints():
187             # Use for the link switch-->switch
188             self.port_info_tunl.append(endpoint.port_info)
189             host0, ip0, pname0, virt_ip0, pnumber0 = self.port_info_tunl[0]
190             self.port_info_tunl.append(rem_endpoint.port_info)
191             host1, ip1, pname1, virt_ip1, pnumber1 = self.port_info_tunl[1]
192             return (pname0, ip1, pnumber1)      
193          
194         else:
195             # Use for the link host-->switch
196             self.port_info_tunl.append(endpoint.port_info)
197             host0, ip0, pname0, virt_ip0, pnumber0 = self.port_info_tunl[0]
198             return pnumber0
199     
200     def udp_connect(self, endpoint, rem_endpoint):     
201         # Collect info from rem_endpoint
202         self._nodes = self.get_node(rem_endpoint)
203         remote_ip = socket.gethostbyname(self.node.get("hostname"))
204         # Collect info from endpoint
205         self._nodes = self.get_node(endpoint) 
206         local_port_file = os.path.join(self.run_home(self.node), 
207                 "local_port")
208         remote_port_file = os.path.join(self.run_home(self.node), 
209                 "remote_port")
210         ret_file = os.path.join(self.run_home(self.node), 
211                 "ret_file")
212         cipher = self.get("cipher")
213         cipher_key = self.get("cipherKey")
214         bwlimit = self.get("bwLimit")
215         txqueuelen = self.get("txQueueLen")
216
217         rem_port = str(self.get_port_info(rem_endpoint, endpoint))           
218         # Upload the remote port in a file
219         self.node.upload(rem_port,
220                 remote_port_file,
221                 text = True,
222                 overwrite = False)
223        
224         udp_connect_command = endpoint.udp_connect_command(
225                 remote_ip, local_port_file, remote_port_file,
226                 ret_file, cipher, cipher_key, bwlimit, txqueuelen) 
227
228         # upload command to host_connect.sh script
229         shfile = os.path.join(self.app_home(self.node), "host_connect.sh")
230         self.node.upload(udp_connect_command,
231                 shfile,
232                 text = True,
233                 overwrite = False)
234
235         # invoke connect script
236         cmd = "bash %s" % shfile
237         (out, err), proc = self.node.run(cmd, self.run_home(self.node),
238                 sudo  = True,
239                 stdout = "udp_stdout",
240                 stderr = "udp_stderr")
241
242         # check if execution errors
243         msg = "Failed to connect endpoints"
244
245         if proc.poll():
246             self.fail()
247             self.error(msg, out, err)
248             raise RuntimeError, msg
249         msg = "Connection on host %s configured" \
250             % self.node.get("hostname")
251         self.info(msg)
252          
253         # Wait for pid file to be generated
254         self._nodes = self.get_node(endpoint) 
255         pid, ppid = self.node.wait_pid(self.run_home(self.node))
256         
257         # If the process is not running, check for error information
258         # on the remote machine
259         if not pid or not ppid:
260             (out, err), proc = self.node.check_errors(self.run_home(self.node))
261             # Out is what was written in the stderr file
262             if err:
263                 self.fail()
264                 msg = " Failed to start command '%s' " % command
265                 self.error(msg, out, err)
266                 raise RuntimeError, msg
267                 
268         return (pid, ppid)
269
270     def switch_connect(self, endpoint, rem_endpoint):
271         """ Get switch connect command
272         """
273         # Get and configure switch connection command
274         (local_port_name, remote_ip, remote_port_num) = self.get_port_info(
275                 endpoint, rem_endpoint)
276         switch_connect_command = endpoint.switch_connect_command(
277                 local_port_name, remote_ip, remote_port_num)
278         self._nodes = self.get_node(endpoint) 
279
280         # Upload command to the file sw_connect.sh
281         shfile = os.path.join(self.app_home(self.node), "sw_connect.sh")
282         self.node.upload(switch_connect_command,
283                 shfile,
284                 text = True,
285                 overwrite = False)
286
287         #invoke connect script
288         cmd = "bash %s" % shfile
289         (out, err), proc = self.node.run(cmd, self.run_home(self.node),
290                 sudo  = True,
291                 stdout = "sw_stdout",
292                 stderr = "sw_stderr")
293         
294         # check if execution errors occured
295         msg = "Failed to connect endpoints"
296
297         if proc.poll():
298             self.fail()
299             self.error(msg, out, err)
300             raise RuntimeError, msg
301         else:
302             msg = "Connection on port %s configured" % local_port_name
303             self.info(msg)
304             return 
305
306     def sw_host_connect(self, endpoint, rem_endpoint):
307         """Link switch--> host
308         """
309         # Retrieve remote port number from rem_endpoint
310         local_port_name = endpoint.get('port_name')
311         self._nodes = self.get_node(rem_endpoint)
312         time.sleep(2) # Without this, sometimes I get nothing in remote_port_num
313         remote_port_num = ''
314         (out, err), proc = self.node.check_output(self.run_home(self.node), 'local_port')
315         remote_port_num = int(out)
316         remote_ip = socket.gethostbyname(self.node.get("hostname"))
317         switch_connect_command = endpoint.switch_connect_command(
318                 local_port_name, remote_ip, remote_port_num)
319
320         # Upload command to the file sw_connect.sh
321         self._nodes = self.get_node(endpoint) 
322         shfile = os.path.join(self.app_home(self.node), "sw_connect.sh")
323         self.node.upload(switch_connect_command,
324                 shfile,
325                 text = True,
326                 overwrite = False)
327
328         #invoke connect script
329         cmd = "bash %s" % shfile
330         (out, err), proc = self.node.run(cmd, self.run_home(self.node),
331                 sudo  = True,
332                 stdout = "sw_stdout",
333                 stderr = "sw_stderr")
334         
335         # check if execution errors occured
336         msg = "Failed to connect endpoints"
337
338         if proc.poll():
339             self.fail()
340             self.error(msg, out, err)
341             raise RuntimeError, msg
342         else:
343             msg = "Connection on port %s configured" % local_port_name
344             self.info(msg)
345             return                                                      
346
347     def provision(self):
348         """ Provision the tunnel
349         """
350         # Create folders
351         self._nodes = self.get_node(self.endpoint1)
352         self.node.mkdir(self.run_home(self.node))
353         self._nodes = self.get_node(self.endpoint2)
354         self.node.mkdir(self.run_home(self.node))
355
356         if self.check_endpoints():
357             #Invoke connect script between switches
358             switch_connect1 = self.switch_connect(self.endpoint1, self.endpoint2)
359             switch_connect2 = self.switch_connect(self.endpoint2, self.endpoint1)
360
361         else: 
362             # Invoke connect script between switch & host
363             (self._pid, self._ppid) = self.udp_connect(self.endpoint2, self.endpoint1)
364             switch_connect = self.sw_host_connect(self.endpoint1, self.endpoint2)
365
366         self.debug("------- READY -------")
367         self._provision_time = tnow()
368         self._state = ResourceState.PROVISIONED
369
370     def discover(self):
371         """ Discover the tunnel
372
373         """     
374         pass
375
376     def deploy(self):
377         if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \
378             (not self.endpoint2 or self.endpoint2.state < ResourceState.READY):
379             self.ec.schedule(reschedule_delay, self.deploy)
380         else:
381             try:
382                 self.discover()
383                 self.provision()
384             except:
385                 self.fail()
386                 raise
387  
388             self.debug("----- READY ---- ")
389             self._ready_time = tnow()
390             self._state = ResourceState.READY
391
392     def start(self):
393         """ Start the RM. It means nothing special for 
394             ovsport for now.
395         """
396         pass
397         
398         
399     def stop(self):
400         """ Stop the RM. It means nothing special for 
401             ovsport for now.        
402         """
403         pass
404
405     def release(self):
406         """ Release the udp_tunnel on endpoint2.
407             On endpoint1 means nothing special.        
408         """
409         if not self.check_endpoints():
410             # Kill the TAP devices
411             # TODO: Make more generic Release method of PLTAP
412             if self._pid and self._ppid:
413                 self._nodes = self.get_node(self.endpoint2) 
414                 (out, err), proc = self.node.kill(self._pid,
415                         self._ppid, sudo = True)
416             if err or proc.poll():
417                     # check if execution errors occurred
418                     msg = " Failed to delete TAP device"
419                     self.error(msg, err, err)
420                     self.fail()
421             
422         self._state = ResourceState.RELEASED
423
424
425
426
427
428
429
430