Modified FailureManager to abort only when critical resources fail
[nepi.git] / src / nepi / resources / planetlab / openvswitch / tunnel.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #             Alexandros Kouvakas <alexandros.kouvakas@gmail.com>
20
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23         ResourceState, failtrap
24 from nepi.resources.linux.application import LinuxApplication
25 from nepi.resources.planetlab.node import PlanetlabNode            
26 from nepi.resources.planetlab.openvswitch.ovs import OVSWitch   
27 from nepi.util.timefuncs import tnow, tdiffsec     
28
29 import os
30 import time
31 import socket
32
33 reschedule_delay = "0.5s"
34
35 @clsinit_copy                 
36 class OVSTunnel(LinuxApplication):
37     """
38     .. class:: Class Args :
39       
40         :param ec: The Experiment controller
41         :type ec: ExperimentController
42         :param guid: guid of the RM
43         :type guid: int
44         :param creds: Credentials to communicate with the rm 
45         :type creds: dict
46
47     """
48     
49     _rtype = "OVSTunnel"
50     _authorized_connections = ["OVSPort", "PlanetlabTap"]    
51
52     @classmethod
53     def _register_attributes(cls):
54         """ Register the attributes of Connection RM 
55
56         """
57         cipher = Attribute("cipher",
58                "Cipher to encript communication. "
59                 "One of PLAIN, AES, Blowfish, DES, DES3. ",
60                 default = None,
61                 allowed = ["PLAIN", "AES", "Blowfish", "DES", "DES3"],
62                 type = Types.Enumerate, 
63                 flags = Flags.ExecReadOnly)
64
65         cipher_key = Attribute("cipherKey",
66                 "Specify a symmetric encryption key with which to protect "
67                 "packets across the tunnel. python-crypto must be installed "
68                 "on the system." ,
69                 flags = Flags.ExecReadOnly)
70
71         txqueuelen = Attribute("txQueueLen",
72                 "Specifies the interface's transmission queue length. "
73                 "Defaults to 1000. ", 
74                 type = Types.Integer, 
75                 flags = Flags.ExecReadOnly)
76
77         bwlimit = Attribute("bwLimit",
78                 "Specifies the interface's emulated bandwidth in bytes "
79                 "per second.",
80                 type = Types.Integer, 
81                 flags = Flags.ExecReadOnly)
82
83         cls._register_attribute(cipher)
84         cls._register_attribute(cipher_key)
85         cls._register_attribute(txqueuelen)
86         cls._register_attribute(bwlimit)
87
88     def __init__(self, ec, guid):
89         """
90         :param ec: The Experiment controller
91         :type ec: ExperimentController
92         :param guid: guid of the RM
93         :type guid: int
94     
95         """
96         super(OVSTunnel, self).__init__(ec, guid)
97         self._home = "tunnel-%s" % self.guid
98         self.port_info_tunl = []
99         self._nodes = []
100         self._pid = None
101         self._ppid = None
102
103     @property
104     def node(self):
105         return self._nodes[0]
106
107     def app_home(self, node):
108         return os.path.join(node.exp_home, self._home)
109
110     def run_home(self, node):
111         return os.path.join(self.app_home(node), self.ec.run_id)
112
113     def port_endpoints(self):
114         # Switch-Switch connection
115         connected = []
116         for guid in self.connections:
117             rm = self.ec.get_resource(guid)
118             if hasattr(rm, "create_port"):
119                 connected.append(rm)
120         return connected
121
122     def mixed_endpoints(self):
123         # Switch-Host connection
124         connected = [1, 2]
125         for guid in self.connections:
126             rm = self.ec.get_resource(guid)
127             if hasattr(rm, "create_port"):
128                 connected[0] = rm
129             elif hasattr(rm, "udp_connect_command"):
130                 connected[1] = rm
131         return connected
132
133     def get_node(self, endpoint):
134         # Get connected to the nodes
135         if hasattr(endpoint, "create_port"):
136             res = []
137             rm_list = endpoint.get_connected(OVSWitch.rtype())
138             if rm_list:
139                 rm = rm_list[0].get_connected(PlanetlabNode.rtype())
140                 if rm: 
141                     res.append(rm[0])
142             return res
143         else:
144             res = []
145             rm = endpoint.get_connected(PlanetlabNode.rtype())
146             if rm :
147                 res.append(rm[0])
148             return res
149
150     @property
151     def endpoint1(self):
152         if self.check_endpoints():
153             port_endpoints = self.port_endpoints()
154             if port_endpoints: return port_endpoints[0]
155         else:
156             mixed_endpoints = self.mixed_endpoints()
157             if mixed_endpoints: return mixed_endpoints[0]
158
159     @property
160     def endpoint2(self):
161         if self.check_endpoints():
162             port_endpoints = self.port_endpoints()
163             if port_endpoints: return port_endpoints[1]
164         else:
165             mixed_endpoints = self.mixed_endpoints()
166             if mixed_endpoints: return mixed_endpoints[1]
167                 
168     def check_endpoints(self):
169         """ Check if the links are between switches
170             or switch-host. Return False for latter.
171         """
172         port_endpoints = self.port_endpoints()
173         if len(port_endpoints) == 2:
174             return True
175         else: 
176             return False
177
178     def get_port_info(self, endpoint, rem_endpoint):
179         """ Retrieve the port_info list for each port
180         
181             :param port_info_tunl: [hostname, publ_IP_addr, port_name,
182                 virtual_ip, local_port_Numb]
183             :type port_info_tunl: list
184         """
185         self.port_info_tunl = []
186         if self.check_endpoints():
187             # Use for the link switch-->switch
188             self.port_info_tunl.append(endpoint.port_info)
189             host0, ip0, pname0, virt_ip0, pnumber0 = self.port_info_tunl[0]
190             self.port_info_tunl.append(rem_endpoint.port_info)
191             host1, ip1, pname1, virt_ip1, pnumber1 = self.port_info_tunl[1]
192             return (pname0, ip1, pnumber1)      
193          
194         else:
195             # Use for the link host-->switch
196             self.port_info_tunl.append(endpoint.port_info)
197             host0, ip0, pname0, virt_ip0, pnumber0 = self.port_info_tunl[0]
198             return pnumber0
199     
200     def udp_connect(self, endpoint, rem_endpoint):     
201         # Collect info from rem_endpoint
202         self._nodes = self.get_node(rem_endpoint)
203         remote_ip = socket.gethostbyname(self.node.get("hostname"))
204         # Collect info from endpoint
205         self._nodes = self.get_node(endpoint) 
206         local_port_file = os.path.join(self.run_home(self.node), 
207                 "local_port")
208         remote_port_file = os.path.join(self.run_home(self.node), 
209                 "remote_port")
210         ret_file = os.path.join(self.run_home(self.node), 
211                 "ret_file")
212         cipher = self.get("cipher")
213         cipher_key = self.get("cipherKey")
214         bwlimit = self.get("bwLimit")
215         txqueuelen = self.get("txQueueLen")
216
217         rem_port = str(self.get_port_info(rem_endpoint, endpoint))           
218         # Upload the remote port in a file
219         self.node.upload(rem_port,
220                 remote_port_file,
221                 text = True,
222                 overwrite = False)
223        
224         udp_connect_command = endpoint.udp_connect_command(
225                 remote_ip, local_port_file, remote_port_file,
226                 ret_file, cipher, cipher_key, bwlimit, txqueuelen) 
227
228         # upload command to host_connect.sh script
229         shfile = os.path.join(self.app_home(self.node), "host_connect.sh")
230         self.node.upload(udp_connect_command,
231                 shfile,
232                 text = True,
233                 overwrite = False)
234
235         # invoke connect script
236         cmd = "bash %s" % shfile
237         (out, err), proc = self.node.run(cmd, self.run_home(self.node),
238                 sudo  = True,
239                 stdout = "udp_stdout",
240                 stderr = "udp_stderr")
241
242         # check if execution errors
243         msg = "Failed to connect endpoints"
244
245         if proc.poll():
246             self.fail()
247             self.error(msg, out, err)
248             raise RuntimeError, msg
249
250         msg = "Connection on host %s configured" \
251             % self.node.get("hostname")
252         self.info(msg)
253          
254         # Wait for pid file to be generated
255         self._nodes = self.get_node(endpoint) 
256         pid, ppid = self.node.wait_pid(self.run_home(self.node))
257         
258         # If the process is not running, check for error information
259         # on the remote machine
260         if not pid or not ppid:
261             (out, err), proc = self.node.check_errors(self.run_home(self.node))
262             # Out is what was written in the stderr file
263             if err:
264                 self.fail()
265                 msg = " Failed to start command '%s' " % command
266                 self.error(msg, out, err)
267                 raise RuntimeError, msg
268                 
269         return (pid, ppid)
270
271     def switch_connect(self, endpoint, rem_endpoint):
272         """ Get switch connect command
273         """
274         # Get and configure switch connection command
275         (local_port_name, remote_ip, remote_port_num) = self.get_port_info(
276                 endpoint, rem_endpoint)
277         switch_connect_command = endpoint.switch_connect_command(
278                 local_port_name, remote_ip, remote_port_num)
279         self._nodes = self.get_node(endpoint) 
280
281         # Upload command to the file sw_connect.sh
282         shfile = os.path.join(self.app_home(self.node), "sw_connect.sh")
283         self.node.upload(switch_connect_command,
284                 shfile,
285                 text = True,
286                 overwrite = False)
287
288         #invoke connect script
289         cmd = "bash %s" % shfile
290         (out, err), proc = self.node.run(cmd, self.run_home(self.node),
291                 sudo  = True,
292                 stdout = "sw_stdout",
293                 stderr = "sw_stderr")
294         
295         # check if execution errors occured
296         msg = "Failed to connect endpoints"
297
298         if proc.poll():
299             self.fail()
300             self.error(msg, out, err)
301             raise RuntimeError, msg
302         else:
303             msg = "Connection on port %s configured" % local_port_name
304             self.info(msg)
305             return 
306
307     def sw_host_connect(self, endpoint, rem_endpoint):
308         """Link switch--> host
309         """
310         # Retrieve remote port number from rem_endpoint
311         local_port_name = endpoint.get('port_name')
312         self._nodes = self.get_node(rem_endpoint)
313         time.sleep(2) # Without this, sometimes I get nothing in remote_port_num
314         remote_port_num = ''
315         (out, err), proc = self.node.check_output(self.run_home(self.node), 'local_port')
316         remote_port_num = int(out)
317         remote_ip = socket.gethostbyname(self.node.get("hostname"))
318         switch_connect_command = endpoint.switch_connect_command(
319                 local_port_name, remote_ip, remote_port_num)
320
321         # Upload command to the file sw_connect.sh
322         self._nodes = self.get_node(endpoint) 
323         shfile = os.path.join(self.app_home(self.node), "sw_connect.sh")
324         self.node.upload(switch_connect_command,
325                 shfile,
326                 text = True,
327                 overwrite = False)
328
329         #invoke connect script
330         cmd = "bash %s" % shfile
331         (out, err), proc = self.node.run(cmd, self.run_home(self.node),
332                 sudo  = True,
333                 stdout = "sw_stdout",
334                 stderr = "sw_stderr")
335         
336         # check if execution errors occured
337         msg = "Failed to connect endpoints"
338
339         if proc.poll():
340             self.fail()
341             self.error(msg, out, err)
342             raise RuntimeError, msg
343         else:
344             msg = "Connection on port %s configured" % local_port_name
345             self.info(msg)
346             return                                                      
347
348     @failtrap
349     def provision(self):
350         """ Provision the tunnel
351         """
352         # Create folders
353         self._nodes = self.get_node(self.endpoint1)
354         self.node.mkdir(self.run_home(self.node))
355         self._nodes = self.get_node(self.endpoint2)
356         self.node.mkdir(self.run_home(self.node))
357
358         if self.check_endpoints():
359             #Invoke connect script between switches
360             switch_connect1 = self.switch_connect(self.endpoint1, self.endpoint2)
361             switch_connect2 = self.switch_connect(self.endpoint2, self.endpoint1)
362
363         else: 
364             # Invoke connect script between switch & host
365             (self._pid, self._ppid) = self.udp_connect(self.endpoint2, self.endpoint1)
366             switch_connect = self.sw_host_connect(self.endpoint1, self.endpoint2)
367
368         super(OVSTunnel, self).provision()
369
370     @failtrap
371     def deploy(self):
372         if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \
373             (not self.endpoint2 or self.endpoint2.state < ResourceState.READY):
374             self.ec.schedule(reschedule_delay, self.deploy)
375         else:
376             self.discover()
377             self.provision()
378
379             super(OVSTunnel, self).deploy()
380  
381     def release(self):
382         """ Release the udp_tunnel on endpoint2.
383             On endpoint1 means nothing special.        
384         """
385         try:
386             if not self.check_endpoints():
387                 # Kill the TAP devices
388                 # TODO: Make more generic Release method of PLTAP
389                 if self._pid and self._ppid:
390                     self._nodes = self.get_node(self.endpoint2) 
391                     (out, err), proc = self.node.kill(self._pid,
392                             self._ppid, sudo = True)
393                 if err or proc.poll():
394                         # check if execution errors occurred
395                         msg = " Failed to delete TAP device"
396                         self.error(msg, err, err)
397                         self.fail()
398         except:
399             import traceback
400             err = traceback.format_exc()
401             self.error(err)
402
403         super(OVSTunnel, self).release()
404
405