update the openflow part and the ping experiment to work
[nepi.git] / src / nepi / resources / planetlab / openvswitch / tunnel.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #             Alexandros Kouvakas <alexandros.kouvakas@gmail.com>
20
21
22 from nepi.execution.attribute import Attribute, Flags, Types
23 from nepi.execution.resource import ResourceManager, ResourceFactory, clsinit_copy, \
24         ResourceState
25 from nepi.resources.linux.application import LinuxApplication
26 from nepi.resources.planetlab.node import PlanetlabNode            
27 from nepi.resources.planetlab.openvswitch.ovs import OVSWitch   
28 from nepi.util.timefuncs import tnow, tdiffsec    
29 from nepi.resources.planetlab.vroute import PlanetlabVroute
30 from nepi.resources.planetlab.tap import PlanetlabTap
31
32 import os
33 import time
34 import socket
35
36 reschedule_delay = "0.5s"
37
38 @clsinit_copy                 
39 class OVSTunnel(LinuxApplication):
40     """
41     .. class:: Class Args :
42       
43         :param ec: The Experiment controller
44         :type ec: ExperimentController
45         :param guid: guid of the RM
46         :type guid: int
47         :param creds: Credentials to communicate with the rm 
48         :type creds: dict
49
50     """
51     
52     _rtype = "OVSTunnel"
53     _authorized_connections = ["OVSPort", "PlanetlabTap"]    
54
55     @classmethod
56     def _register_attributes(cls):
57         """ Register the attributes of Connection RM 
58
59         """
60         network = Attribute("network", "IPv4 Network Address",
61                flags = Flags.ExecReadOnly)
62
63         cipher = Attribute("cipher",
64                "Cipher to encript communication. "
65                 "One of PLAIN, AES, Blowfish, DES, DES3. ",
66                 default = None,
67                 allowed = ["PLAIN", "AES", "Blowfish", "DES", "DES3"],
68                 type = Types.Enumerate, 
69                 flags = Flags.ExecReadOnly)
70
71         cipher_key = Attribute("cipherKey",
72                 "Specify a symmetric encryption key with which to protect "
73                 "packets across the tunnel. python-crypto must be installed "
74                 "on the system." ,
75                 flags = Flags.ExecReadOnly)
76
77         txqueuelen = Attribute("txQueueLen",
78                 "Specifies the interface's transmission queue length. "
79                 "Defaults to 1000. ", 
80                 type = Types.Integer, 
81                 flags = Flags.ExecReadOnly)
82
83         bwlimit = Attribute("bwLimit",
84                 "Specifies the interface's emulated bandwidth in bytes "
85                 "per second.",
86                 type = Types.Integer, 
87                 flags = Flags.ExecReadOnly)
88
89         cls._register_attribute(network)
90         cls._register_attribute(cipher)
91         cls._register_attribute(cipher_key)
92         cls._register_attribute(txqueuelen)
93         cls._register_attribute(bwlimit)
94
95     def __init__(self, ec, guid):
96         """
97         :param ec: The Experiment controller
98         :type ec: ExperimentController
99         :param guid: guid of the RM
100         :type guid: int
101     
102         """
103         super(OVSTunnel, self).__init__(ec, guid)
104         self._home = "tunnel-%s" % self.guid
105         self.port_info_tunl = []
106         self._nodes = []
107         self._pid = None
108         self._ppid = None
109         self._vroute = None
110
111
112     def log_message(self, msg):
113         return " guid %d - Tunnel - %s " % (self.guid, msg)
114
115     @property
116     def node(self):
117         if self._nodes:
118             return self._nodes[0]
119
120     def app_home(self, node):
121         return os.path.join(self.node.exp_home, self._home)
122
123     def run_home(self, node):
124         return os.path.join(self.app_home(node), self.ec.run_id)
125
126     def port_endpoints(self):
127         # Switch-Switch connection
128         connected = []
129         for guid in self.connections:
130             rm = self.ec.get_resource(guid)
131             if hasattr(rm, "create_port"):
132                 connected.append(rm)
133         return connected
134
135     
136     def mixed_endpoints(self):
137         # Switch-Host connection
138         connected = [1, 2]
139         for guid in self.connections:
140             rm = self.ec.get_resource(guid)
141             if hasattr(rm, "create_port"):
142                 connected[0] = rm
143             elif hasattr(rm, "udp_connect_command"):
144                 connected[1] = rm
145         return connected
146
147     def get_node(self, endpoint):
148         # Get connected to the nodes
149         res = []
150         if hasattr(endpoint, "create_port"):
151             rm_list = endpoint.get_connected(OVSWitch.get_rtype())
152             if rm_list:
153                 rm = rm_list[0].get_connected(PlanetlabNode.get_rtype())
154         else:
155             rm = endpoint.get_connected(PlanetlabNode.get_rtype())
156
157         if rm :
158             res.append(rm[0])
159         return res
160
161     @property
162     def endpoint1(self):
163         if self.check_endpoints:
164             port_endpoints = self.port_endpoints()
165             if port_endpoints: return port_endpoints[0]
166         else:
167             mixed_endpoints = self.mixed_endpoints()
168             if mixed_endpoints: return mixed_endpoints[0]
169
170     @property
171     def endpoint2(self):
172         if self.check_endpoints:
173             port_endpoints = self.port_endpoints()
174             if port_endpoints: return port_endpoints[1]
175         else:
176             mixed_endpoints = self.mixed_endpoints()
177             if mixed_endpoints: return mixed_endpoints[1]
178
179     @property          
180     def check_endpoints(self):
181         """ Check if the links are between switches
182             or switch-host. Return False for latter.
183         """
184         port_endpoints = self.port_endpoints()
185         if len(port_endpoints) == 2:
186             return True
187         return False
188
189     def get_port_info(self, endpoint, rem_endpoint):
190         """ Retrieve the port_info list for each port
191         
192             :param port_info_tunl: [hostname, publ_IP_addr, port_name,
193                 virtual_ip, local_port_Numb]
194             :type port_info_tunl: list
195         """
196         self.port_info_tunl = []
197         if self.check_endpoints:
198             # Use for the link switch-->switch
199             self.port_info_tunl.append(endpoint.port_info)
200             host0, ip0, pname0, virt_ip0, pnumber0 = self.port_info_tunl[0]
201             self.port_info_tunl.append(rem_endpoint.port_info)
202             host1, ip1, pname1, virt_ip1, pnumber1 = self.port_info_tunl[1]
203             return (pname0, ip1, pnumber1)      
204          
205         # Use for the link host-->switch
206         self.port_info_tunl.append(endpoint.port_info)
207         host0, ip0, pname0, virt_ip0, pnumber0 = self.port_info_tunl[0]
208         return pnumber0
209     
210     def udp_connect(self, endpoint, rem_endpoint):     
211         # Collect info from rem_endpoint
212         self._nodes = self.get_node(rem_endpoint)
213         remote_ip = socket.gethostbyname(self.node.get("hostname"))
214         # Collect info from endpoint
215         self._nodes = self.get_node(endpoint) 
216         local_port_file = os.path.join(self.run_home(self.node), 
217                 "local_port")
218         remote_port_file = os.path.join(self.run_home(self.node), 
219                 "remote_port")
220         ret_file = os.path.join(self.run_home(self.node), 
221                 "ret_file")
222         cipher = self.get("cipher")
223         cipher_key = self.get("cipherKey")
224         bwlimit = self.get("bwLimit")
225         txqueuelen = self.get("txQueueLen")
226
227         rem_port = str(self.get_port_info(rem_endpoint, endpoint))           
228         # Upload the remote port in a file
229         self.node.upload(rem_port,
230                 remote_port_file,
231                 text = True,
232                 overwrite = False)
233        
234         udp_connect_command = endpoint.udp_connect_command(
235                 remote_ip, local_port_file, remote_port_file,
236                 ret_file, cipher, cipher_key, bwlimit, txqueuelen) 
237
238         # upload command to host_connect.sh script
239         shfile = os.path.join(self.app_home(self.node), "host_connect.sh")
240         self.node.upload(udp_connect_command,
241                 shfile,
242                 text = True,
243                 overwrite = False)
244
245         # invoke connect script
246         cmd = "bash %s" % shfile
247         (out, err), proc = self.node.run(cmd, self.run_home(self.node),
248                 sudo  = True,
249                 stdout = "udp_stdout",
250                 stderr = "udp_stderr")
251
252         # check if execution errors
253         msg = "Failed to connect endpoints"
254
255         if proc.poll():
256             self.error(msg, out, err)
257             raise RuntimeError, msg
258
259         msg = "Connection on host %s configured" \
260             % self.node.get("hostname")
261         self.debug(msg)
262          
263         # Wait for pid file to be generated
264         self._nodes = self.get_node(endpoint) 
265         pid, ppid = self.node.wait_pid(self.run_home(self.node))
266         
267         # If the process is not running, check for error information
268         # on the remote machine
269         if not pid or not ppid:
270             (out, err), proc = self.node.check_errors(self.run_home(self.node))
271             # Out is what was written in the stderr file
272             if err:
273                 msg = " Failed to start command '%s' " % command
274                 self.error(msg, out, err)
275                 raise RuntimeError, msg
276                 
277         return (pid, ppid)
278
279     def switch_connect(self, endpoint, rem_endpoint):
280         """ Get switch connect command
281         """
282         # Get and configure switch connection command
283         (local_port_name, remote_ip, remote_port_num) = self.get_port_info(
284                 endpoint, rem_endpoint)
285         switch_connect_command = endpoint.switch_connect_command(
286                 local_port_name, remote_ip, remote_port_num)
287         self._nodes = self.get_node(endpoint) 
288
289         # Upload command to the file sw_connect.sh
290         shfile = os.path.join(self.app_home(self.node), "sw_connect.sh")
291         self.node.upload(switch_connect_command,
292                 shfile,
293                 text = True,
294                 overwrite = False)
295
296         #invoke connect script
297         cmd = "bash %s" % shfile
298         (out, err), proc = self.node.run(cmd, self.run_home(self.node),
299                 sudo  = True,
300                 stdout = "sw_stdout",
301                 stderr = "sw_stderr")
302         
303         # check if execution errors occured
304         if proc.poll():
305             msg = "Failed to connect endpoints"
306             self.error(msg, out, err)
307             raise RuntimeError, msg
308
309         # For debugging
310         msg = "Connection on port %s configured" % local_port_name
311         self.info(msg)
312
313     def wait_local_port(self):
314         """ Waits until the if_name file for the command is generated, 
315             and returns the if_name for the device """
316         local_port = None
317         delay = 1.0
318
319         for i in xrange(10):
320             (out, err), proc = self.node.check_output(self.run_home(self.node), 'local_port')
321
322             if out:
323                 local_port = int(out)
324                 break
325             else:
326                 time.sleep(delay)
327                 delay = delay * 1.5
328         else:
329             msg = "Couldn't retrieve local_port"
330             self.error(msg, out, err)
331             raise RuntimeError, msg
332
333         return local_port
334
335     def sw_host_connect(self, endpoint, rem_endpoint):
336         """Link switch--> host
337         """
338         # Retrieve remote port number from rem_endpoint
339         local_port_name = endpoint.get('port_name')
340         self._nodes = self.get_node(rem_endpoint)
341
342      #   time.sleep(4) # Without this, sometimes I get nothing in remote_port_num
343         out = err= ''
344         remote_port_num = self.wait_local_port()
345         remote_ip = socket.gethostbyname(self.node.get("hostname"))
346         switch_connect_command = endpoint.switch_connect_command(
347                 local_port_name, remote_ip, remote_port_num)
348
349         # Upload command to the file sw_connect.sh
350         self._nodes = self.get_node(endpoint) 
351         shfile = os.path.join(self.app_home(self.node), "sw_connect.sh")
352         self.node.upload(switch_connect_command,
353                 shfile,
354                 text = True,
355                 overwrite = False)
356
357         # Invoke connect script
358         cmd = "bash %s" % shfile
359         (out, err), proc = self.node.run(cmd, self.run_home(self.node),
360                 sudo  = True,
361                 stdout = "sw_stdout",
362                 stderr = "sw_stderr")
363         
364         # Check if execution errors occured
365
366         if proc.poll():
367             msg = "Failed to connect endpoints"
368             self.error(msg, out, err)
369             raise RuntimeError, msg
370
371         # For debugging
372         msg = "Connection on port %s configured" % local_port_name
373         self.debug(msg)                                                   
374
375     def do_provision(self):
376         """ Provision the tunnel
377         """
378         # Create folders
379         self._nodes = self.get_node(self.endpoint1)
380         self.node.mkdir(self.run_home(self.node))
381         self._nodes = self.get_node(self.endpoint2)
382         self.node.mkdir(self.run_home(self.node))
383
384         if self.check_endpoints:
385             #Invoke connect script between switches
386             self.switch_connect(self.endpoint1, self.endpoint2)
387             self.switch_connect(self.endpoint2, self.endpoint1)
388
389         else: 
390             # Invoke connect script between switch & host
391             (self._pid, self._ppid) = self.udp_connect(self.endpoint2, self.endpoint1)
392             self.sw_host_connect(self.endpoint1, self.endpoint2)
393
394         super(OVSTunnel, self).do_provision()
395
396     @property
397     def tap(self):
398         rclass = ResourceFactory.get_resource_type(PlanetlabTap.get_rtype())
399         for guid in self.connections:
400             rm = self.ec.get_resource(guid)
401             if isinstance(rm, rclass):
402                 return rm
403
404     @property
405     def ovswitch(self):
406         for guid in self.connections:
407             rm_port = self.ec.get_resource(guid)
408             if hasattr(rm_port, "create_port"):
409                 rm_list = rm_port.get_connected(OVSWitch.get_rtype())
410                 if rm_list:
411                     return rm_list[0]
412
413     def configure(self):
414         if not self.check_endpoints:
415             self._vroute = self.ec.register_resource("PlanetlabVroute")
416             self.ec.set(self._vroute, "action", "add")
417             self.ec.set(self._vroute, "network", self.get("network"))
418
419             self.ec.register_connection(self._vroute, self.tap.guid)
420             # schedule deploy
421             self.ec.deploy(guids=[self._vroute], group = self.deployment_group)
422
423
424     def do_deploy(self):
425         if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \
426             (not self.endpoint2 or self.endpoint2.state < ResourceState.READY):
427             self.ec.schedule(reschedule_delay, self.deploy)
428             return
429
430         self.do_discover()
431         self.do_provision()
432         self.configure()
433
434         super(OVSTunnel, self).do_deploy()
435  
436     def do_release(self):
437         """ Release the udp_tunnel on endpoint2.
438             On endpoint1 means nothing special.        
439         """
440         if not self.check_endpoints:
441             # Kill the TAP devices
442             # TODO: Make more generic Release method of PLTAP
443             if self._pid and self._ppid:
444                 self._nodes = self.get_node(self.endpoint2) 
445                 (out, err), proc = self.node.kill(self._pid,
446                         self._ppid, sudo = True)
447                 if err or proc.poll():
448                     # check if execution errors occurred
449                     msg = " Failed to delete TAP device"
450                     self.error(msg, err, err)
451
452         super(OVSTunnel, self).do_release()
453