Fixing RM.DEPLOY being executed after/during RM.RELEASE by adding a release_lock...
[nepi.git] / src / nepi / resources / planetlab / openvswitch / tunnel.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #             Alexandros Kouvakas <alexandros.kouvakas@gmail.com>
20
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23         ResourceState
24 from nepi.resources.linux.application import LinuxApplication
25 from nepi.resources.planetlab.node import PlanetlabNode            
26 from nepi.resources.planetlab.openvswitch.ovs import OVSWitch   
27 from nepi.util.timefuncs import tnow, tdiffsec     
28
29 import os
30 import time
31 import socket
32
33 reschedule_delay = "0.5s"
34
35 @clsinit_copy                 
36 class OVSTunnel(LinuxApplication):
37     """
38     .. class:: Class Args :
39       
40         :param ec: The Experiment controller
41         :type ec: ExperimentController
42         :param guid: guid of the RM
43         :type guid: int
44         :param creds: Credentials to communicate with the rm 
45         :type creds: dict
46
47     """
48     
49     _rtype = "OVSTunnel"
50     _authorized_connections = ["OVSPort", "PlanetlabTap"]    
51
52     @classmethod
53     def _register_attributes(cls):
54         """ Register the attributes of Connection RM 
55
56         """
57         cipher = Attribute("cipher",
58                "Cipher to encript communication. "
59                 "One of PLAIN, AES, Blowfish, DES, DES3. ",
60                 default = None,
61                 allowed = ["PLAIN", "AES", "Blowfish", "DES", "DES3"],
62                 type = Types.Enumerate, 
63                 flags = Flags.ExecReadOnly)
64
65         cipher_key = Attribute("cipherKey",
66                 "Specify a symmetric encryption key with which to protect "
67                 "packets across the tunnel. python-crypto must be installed "
68                 "on the system." ,
69                 flags = Flags.ExecReadOnly)
70
71         txqueuelen = Attribute("txQueueLen",
72                 "Specifies the interface's transmission queue length. "
73                 "Defaults to 1000. ", 
74                 type = Types.Integer, 
75                 flags = Flags.ExecReadOnly)
76
77         bwlimit = Attribute("bwLimit",
78                 "Specifies the interface's emulated bandwidth in bytes "
79                 "per second.",
80                 type = Types.Integer, 
81                 flags = Flags.ExecReadOnly)
82
83         cls._register_attribute(cipher)
84         cls._register_attribute(cipher_key)
85         cls._register_attribute(txqueuelen)
86         cls._register_attribute(bwlimit)
87
88     def __init__(self, ec, guid):
89         """
90         :param ec: The Experiment controller
91         :type ec: ExperimentController
92         :param guid: guid of the RM
93         :type guid: int
94     
95         """
96         super(OVSTunnel, self).__init__(ec, guid)
97         self._home = "tunnel-%s" % self.guid
98         self.port_info_tunl = []
99         self._nodes = []
100         self._pid = None
101         self._ppid = None
102
103     @property
104     def node(self):
105         return self._nodes[0]
106
107     def app_home(self, node):
108         return os.path.join(node.exp_home, self._home)
109
110     def run_home(self, node):
111         return os.path.join(self.app_home(node), self.ec.run_id)
112
113     def port_endpoints(self):
114         # Switch-Switch connection
115         connected = []
116         for guid in self.connections:
117             rm = self.ec.get_resource(guid)
118             if hasattr(rm, "create_port"):
119                 connected.append(rm)
120         return connected
121
122     def mixed_endpoints(self):
123         # Switch-Host connection
124         connected = [1, 2]
125         for guid in self.connections:
126             rm = self.ec.get_resource(guid)
127             if hasattr(rm, "create_port"):
128                 connected[0] = rm
129             elif hasattr(rm, "udp_connect_command"):
130                 connected[1] = rm
131         return connected
132
133     def get_node(self, endpoint):
134         # Get connected to the nodes
135         if hasattr(endpoint, "create_port"):
136             res = []
137             rm_list = endpoint.get_connected(OVSWitch.rtype())
138             if rm_list:
139                 rm = rm_list[0].get_connected(PlanetlabNode.rtype())
140                 if rm: 
141                     res.append(rm[0])
142             return res
143         else:
144             res = []
145             rm = endpoint.get_connected(PlanetlabNode.rtype())
146             if rm :
147                 res.append(rm[0])
148             return res
149
150     @property
151     def endpoint1(self):
152         if self.check_endpoints():
153             port_endpoints = self.port_endpoints()
154             if port_endpoints: return port_endpoints[0]
155         else:
156             mixed_endpoints = self.mixed_endpoints()
157             if mixed_endpoints: return mixed_endpoints[0]
158
159     @property
160     def endpoint2(self):
161         if self.check_endpoints():
162             port_endpoints = self.port_endpoints()
163             if port_endpoints: return port_endpoints[1]
164         else:
165             mixed_endpoints = self.mixed_endpoints()
166             if mixed_endpoints: return mixed_endpoints[1]
167                 
168     def check_endpoints(self):
169         """ Check if the links are between switches
170             or switch-host. Return False for latter.
171         """
172         port_endpoints = self.port_endpoints()
173         if len(port_endpoints) == 2:
174             return True
175         else: 
176             return False
177
178     def get_port_info(self, endpoint, rem_endpoint):
179         """ Retrieve the port_info list for each port
180         
181             :param port_info_tunl: [hostname, publ_IP_addr, port_name,
182                 virtual_ip, local_port_Numb]
183             :type port_info_tunl: list
184         """
185         self.port_info_tunl = []
186         if self.check_endpoints():
187             # Use for the link switch-->switch
188             self.port_info_tunl.append(endpoint.port_info)
189             host0, ip0, pname0, virt_ip0, pnumber0 = self.port_info_tunl[0]
190             self.port_info_tunl.append(rem_endpoint.port_info)
191             host1, ip1, pname1, virt_ip1, pnumber1 = self.port_info_tunl[1]
192             return (pname0, ip1, pnumber1)      
193          
194         else:
195             # Use for the link host-->switch
196             self.port_info_tunl.append(endpoint.port_info)
197             host0, ip0, pname0, virt_ip0, pnumber0 = self.port_info_tunl[0]
198             return pnumber0
199     
200     def udp_connect(self, endpoint, rem_endpoint):     
201         # Collect info from rem_endpoint
202         self._nodes = self.get_node(rem_endpoint)
203         remote_ip = socket.gethostbyname(self.node.get("hostname"))
204         # Collect info from endpoint
205         self._nodes = self.get_node(endpoint) 
206         local_port_file = os.path.join(self.run_home(self.node), 
207                 "local_port")
208         remote_port_file = os.path.join(self.run_home(self.node), 
209                 "remote_port")
210         ret_file = os.path.join(self.run_home(self.node), 
211                 "ret_file")
212         cipher = self.get("cipher")
213         cipher_key = self.get("cipherKey")
214         bwlimit = self.get("bwLimit")
215         txqueuelen = self.get("txQueueLen")
216
217         rem_port = str(self.get_port_info(rem_endpoint, endpoint))           
218         # Upload the remote port in a file
219         self.node.upload(rem_port,
220                 remote_port_file,
221                 text = True,
222                 overwrite = False)
223        
224         udp_connect_command = endpoint.udp_connect_command(
225                 remote_ip, local_port_file, remote_port_file,
226                 ret_file, cipher, cipher_key, bwlimit, txqueuelen) 
227
228         # upload command to host_connect.sh script
229         shfile = os.path.join(self.app_home(self.node), "host_connect.sh")
230         self.node.upload(udp_connect_command,
231                 shfile,
232                 text = True,
233                 overwrite = False)
234
235         # invoke connect script
236         cmd = "bash %s" % shfile
237         (out, err), proc = self.node.run(cmd, self.run_home(self.node),
238                 sudo  = True,
239                 stdout = "udp_stdout",
240                 stderr = "udp_stderr")
241
242         # check if execution errors
243         msg = "Failed to connect endpoints"
244
245         if proc.poll():
246             self.error(msg, out, err)
247             raise RuntimeError, msg
248
249         msg = "Connection on host %s configured" \
250             % self.node.get("hostname")
251         self.info(msg)
252          
253         # Wait for pid file to be generated
254         self._nodes = self.get_node(endpoint) 
255         pid, ppid = self.node.wait_pid(self.run_home(self.node))
256         
257         # If the process is not running, check for error information
258         # on the remote machine
259         if not pid or not ppid:
260             (out, err), proc = self.node.check_errors(self.run_home(self.node))
261             # Out is what was written in the stderr file
262             if err:
263                 msg = " Failed to start command '%s' " % command
264                 self.error(msg, out, err)
265                 raise RuntimeError, msg
266                 
267         return (pid, ppid)
268
269     def switch_connect(self, endpoint, rem_endpoint):
270         """ Get switch connect command
271         """
272         # Get and configure switch connection command
273         (local_port_name, remote_ip, remote_port_num) = self.get_port_info(
274                 endpoint, rem_endpoint)
275         switch_connect_command = endpoint.switch_connect_command(
276                 local_port_name, remote_ip, remote_port_num)
277         self._nodes = self.get_node(endpoint) 
278
279         # Upload command to the file sw_connect.sh
280         shfile = os.path.join(self.app_home(self.node), "sw_connect.sh")
281         self.node.upload(switch_connect_command,
282                 shfile,
283                 text = True,
284                 overwrite = False)
285
286         #invoke connect script
287         cmd = "bash %s" % shfile
288         (out, err), proc = self.node.run(cmd, self.run_home(self.node),
289                 sudo  = True,
290                 stdout = "sw_stdout",
291                 stderr = "sw_stderr")
292         
293         # check if execution errors occured
294         msg = "Failed to connect endpoints"
295
296         if proc.poll():
297             self.error(msg, out, err)
298             raise RuntimeError, msg
299         else:
300             msg = "Connection on port %s configured" % local_port_name
301             self.info(msg)
302             return 
303
304     def sw_host_connect(self, endpoint, rem_endpoint):
305         """Link switch--> host
306         """
307         # Retrieve remote port number from rem_endpoint
308         local_port_name = endpoint.get('port_name')
309         self._nodes = self.get_node(rem_endpoint)
310         time.sleep(2) # Without this, sometimes I get nothing in remote_port_num
311         remote_port_num = ''
312         (out, err), proc = self.node.check_output(self.run_home(self.node), 'local_port')
313         remote_port_num = int(out)
314         remote_ip = socket.gethostbyname(self.node.get("hostname"))
315         switch_connect_command = endpoint.switch_connect_command(
316                 local_port_name, remote_ip, remote_port_num)
317
318         # Upload command to the file sw_connect.sh
319         self._nodes = self.get_node(endpoint) 
320         shfile = os.path.join(self.app_home(self.node), "sw_connect.sh")
321         self.node.upload(switch_connect_command,
322                 shfile,
323                 text = True,
324                 overwrite = False)
325
326         #invoke connect script
327         cmd = "bash %s" % shfile
328         (out, err), proc = self.node.run(cmd, self.run_home(self.node),
329                 sudo  = True,
330                 stdout = "sw_stdout",
331                 stderr = "sw_stderr")
332         
333         # check if execution errors occured
334         msg = "Failed to connect endpoints"
335
336         if proc.poll():
337             self.error(msg, out, err)
338             raise RuntimeError, msg
339         else:
340             msg = "Connection on port %s configured" % local_port_name
341             self.info(msg)
342             return                                                      
343
344     def do_provision(self):
345         """ Provision the tunnel
346         """
347         # Create folders
348         self._nodes = self.get_node(self.endpoint1)
349         self.node.mkdir(self.run_home(self.node))
350         self._nodes = self.get_node(self.endpoint2)
351         self.node.mkdir(self.run_home(self.node))
352
353         if self.check_endpoints():
354             #Invoke connect script between switches
355             switch_connect1 = self.switch_connect(self.endpoint1, self.endpoint2)
356             switch_connect2 = self.switch_connect(self.endpoint2, self.endpoint1)
357
358         else: 
359             # Invoke connect script between switch & host
360             (self._pid, self._ppid) = self.udp_connect(self.endpoint2, self.endpoint1)
361             switch_connect = self.sw_host_connect(self.endpoint1, self.endpoint2)
362
363         super(OVSTunnel, self).do_provision()
364
365     def do_deploy(self):
366         if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \
367             (not self.endpoint2 or self.endpoint2.state < ResourceState.READY):
368             self.ec.schedule(reschedule_delay, self.deploy)
369         else:
370             self.do_discover()
371             self.do_provision()
372
373             super(OVSTunnel, self).do_deploy()
374  
375     def do_release(self):
376         """ Release the udp_tunnel on endpoint2.
377             On endpoint1 means nothing special.        
378         """
379         if not self.check_endpoints():
380             # Kill the TAP devices
381             # TODO: Make more generic Release method of PLTAP
382             if self._pid and self._ppid:
383                 self._nodes = self.get_node(self.endpoint2) 
384                 (out, err), proc = self.node.kill(self._pid,
385                         self._ppid, sudo = True)
386             if err or proc.poll():
387                     # check if execution errors occurred
388                     msg = " Failed to delete TAP device"
389                     self.error(msg, err, err)
390
391         super(OVSTunnel, self).do_release()
392