2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Alexandros Kouvakas <alexandros.kouvakas@gmail.com>
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
24 from nepi.resources.linux.application import LinuxApplication
25 from nepi.resources.planetlab.node import PlanetlabNode
26 from nepi.resources.planetlab.openvswitch.ovs import OVSWitch
27 from nepi.util.timefuncs import tnow, tdiffsec
33 reschedule_delay = "0.5s"
36 class OVSTunnel(LinuxApplication):
38 .. class:: Class Args :
40 :param ec: The Experiment controller
41 :type ec: ExperimentController
42 :param guid: guid of the RM
44 :param creds: Credentials to communicate with the rm
50 _authorized_connections = ["OVSPort", "PlanetlabTap"]
53 def _register_attributes(cls):
54 """ Register the attributes of Connection RM
57 cipher = Attribute("cipher",
58 "Cipher to encript communication. "
59 "One of PLAIN, AES, Blowfish, DES, DES3. ",
61 allowed = ["PLAIN", "AES", "Blowfish", "DES", "DES3"],
62 type = Types.Enumerate,
63 flags = Flags.ExecReadOnly)
65 cipher_key = Attribute("cipherKey",
66 "Specify a symmetric encryption key with which to protect "
67 "packets across the tunnel. python-crypto must be installed "
69 flags = Flags.ExecReadOnly)
71 txqueuelen = Attribute("txQueueLen",
72 "Specifies the interface's transmission queue length. "
75 flags = Flags.ExecReadOnly)
77 bwlimit = Attribute("bwLimit",
78 "Specifies the interface's emulated bandwidth in bytes "
81 flags = Flags.ExecReadOnly)
83 cls._register_attribute(cipher)
84 cls._register_attribute(cipher_key)
85 cls._register_attribute(txqueuelen)
86 cls._register_attribute(bwlimit)
88 def __init__(self, ec, guid):
90 :param ec: The Experiment controller
91 :type ec: ExperimentController
92 :param guid: guid of the RM
96 super(OVSTunnel, self).__init__(ec, guid)
97 self._home = "tunnel-%s" % self.guid
98 self.port_info_tunl = []
104 def log_message(self, msg):
105 return " guid %d - Tunnel - %s " % (self.guid, msg)
110 return self._nodes[0]
112 def app_home(self, node):
113 return os.path.join(node.exp_home, self._home)
115 def run_home(self, node):
116 return os.path.join(self.app_home(node), self.ec.run_id)
118 def port_endpoints(self):
119 # Switch-Switch connection
121 for guid in self.connections:
122 rm = self.ec.get_resource(guid)
123 if hasattr(rm, "create_port"):
127 def mixed_endpoints(self):
128 # Switch-Host connection
130 for guid in self.connections:
131 rm = self.ec.get_resource(guid)
132 if hasattr(rm, "create_port"):
134 elif hasattr(rm, "udp_connect_command"):
138 def get_node(self, endpoint):
139 # Get connected to the nodes
141 if hasattr(endpoint, "create_port"):
142 rm_list = endpoint.get_connected(OVSWitch.get_rtype())
144 rm = rm_list[0].get_connected(PlanetlabNode.get_rtype())
146 rm = endpoint.get_connected(PlanetlabNode.get_rtype())
154 if self.check_endpoints():
155 port_endpoints = self.port_endpoints()
156 if port_endpoints: return port_endpoints[0]
158 mixed_endpoints = self.mixed_endpoints()
159 if mixed_endpoints: return mixed_endpoints[0]
163 if self.check_endpoints():
164 port_endpoints = self.port_endpoints()
165 if port_endpoints: return port_endpoints[1]
167 mixed_endpoints = self.mixed_endpoints()
168 if mixed_endpoints: return mixed_endpoints[1]
170 def check_endpoints(self):
171 """ Check if the links are between switches
172 or switch-host. Return False for latter.
174 port_endpoints = self.port_endpoints()
175 if len(port_endpoints) == 2:
179 def get_port_info(self, endpoint, rem_endpoint):
180 """ Retrieve the port_info list for each port
182 :param port_info_tunl: [hostname, publ_IP_addr, port_name,
183 virtual_ip, local_port_Numb]
184 :type port_info_tunl: list
186 self.port_info_tunl = []
187 if self.check_endpoints():
188 # Use for the link switch-->switch
189 self.port_info_tunl.append(endpoint.port_info)
190 host0, ip0, pname0, virt_ip0, pnumber0 = self.port_info_tunl[0]
191 self.port_info_tunl.append(rem_endpoint.port_info)
192 host1, ip1, pname1, virt_ip1, pnumber1 = self.port_info_tunl[1]
193 return (pname0, ip1, pnumber1)
195 # Use for the link host-->switch
196 self.port_info_tunl.append(endpoint.port_info)
197 host0, ip0, pname0, virt_ip0, pnumber0 = self.port_info_tunl[0]
200 def udp_connect(self, endpoint, rem_endpoint):
201 # Collect info from rem_endpoint
202 self._nodes = self.get_node(rem_endpoint)
203 remote_ip = socket.gethostbyname(self.node.get("hostname"))
204 # Collect info from endpoint
205 self._nodes = self.get_node(endpoint)
206 local_port_file = os.path.join(self.run_home(self.node),
208 remote_port_file = os.path.join(self.run_home(self.node),
210 ret_file = os.path.join(self.run_home(self.node),
212 cipher = self.get("cipher")
213 cipher_key = self.get("cipherKey")
214 bwlimit = self.get("bwLimit")
215 txqueuelen = self.get("txQueueLen")
217 rem_port = str(self.get_port_info(rem_endpoint, endpoint))
218 # Upload the remote port in a file
219 self.node.upload(rem_port,
224 udp_connect_command = endpoint.udp_connect_command(
225 remote_ip, local_port_file, remote_port_file,
226 ret_file, cipher, cipher_key, bwlimit, txqueuelen)
228 # upload command to host_connect.sh script
229 shfile = os.path.join(self.app_home(self.node), "host_connect.sh")
230 self.node.upload(udp_connect_command,
235 # invoke connect script
236 cmd = "bash %s" % shfile
237 (out, err), proc = self.node.run(cmd, self.run_home(self.node),
239 stdout = "udp_stdout",
240 stderr = "udp_stderr")
242 # check if execution errors
243 msg = "Failed to connect endpoints"
246 self.error(msg, out, err)
247 raise RuntimeError, msg
249 msg = "Connection on host %s configured" \
250 % self.node.get("hostname")
253 # Wait for pid file to be generated
254 self._nodes = self.get_node(endpoint)
255 pid, ppid = self.node.wait_pid(self.run_home(self.node))
257 # If the process is not running, check for error information
258 # on the remote machine
259 if not pid or not ppid:
260 (out, err), proc = self.node.check_errors(self.run_home(self.node))
261 # Out is what was written in the stderr file
263 msg = " Failed to start command '%s' " % command
264 self.error(msg, out, err)
265 raise RuntimeError, msg
269 def switch_connect(self, endpoint, rem_endpoint):
270 """ Get switch connect command
272 # Get and configure switch connection command
273 (local_port_name, remote_ip, remote_port_num) = self.get_port_info(
274 endpoint, rem_endpoint)
275 switch_connect_command = endpoint.switch_connect_command(
276 local_port_name, remote_ip, remote_port_num)
277 self._nodes = self.get_node(endpoint)
279 # Upload command to the file sw_connect.sh
280 shfile = os.path.join(self.app_home(self.node), "sw_connect.sh")
281 self.node.upload(switch_connect_command,
286 #invoke connect script
287 cmd = "bash %s" % shfile
288 (out, err), proc = self.node.run(cmd, self.run_home(self.node),
290 stdout = "sw_stdout",
291 stderr = "sw_stderr")
293 # check if execution errors occured
295 msg = "Failed to connect endpoints"
296 self.error(msg, out, err)
297 raise RuntimeError, msg
300 msg = "Connection on port %s configured" % local_port_name
303 def sw_host_connect(self, endpoint, rem_endpoint):
304 """Link switch--> host
306 # Retrieve remote port number from rem_endpoint
307 local_port_name = endpoint.get('port_name')
308 self._nodes = self.get_node(rem_endpoint)
309 time.sleep(2) # Without this, sometimes I get nothing in remote_port_num
311 (out, err), proc = self.node.check_output(self.run_home(self.node), 'local_port')
312 remote_port_num = int(out)
313 remote_ip = socket.gethostbyname(self.node.get("hostname"))
314 switch_connect_command = endpoint.switch_connect_command(
315 local_port_name, remote_ip, remote_port_num)
317 # Upload command to the file sw_connect.sh
318 self._nodes = self.get_node(endpoint)
319 shfile = os.path.join(self.app_home(self.node), "sw_connect.sh")
320 self.node.upload(switch_connect_command,
325 # Invoke connect script
326 cmd = "bash %s" % shfile
327 (out, err), proc = self.node.run(cmd, self.run_home(self.node),
329 stdout = "sw_stdout",
330 stderr = "sw_stderr")
332 # Check if execution errors occured
335 msg = "Failed to connect endpoints"
336 self.error(msg, out, err)
337 raise RuntimeError, msg
340 msg = "Connection on port %s configured" % local_port_name
343 def do_provision(self):
344 """ Provision the tunnel
347 self._nodes = self.get_node(self.endpoint1)
348 self.node.mkdir(self.run_home(self.node))
349 self._nodes = self.get_node(self.endpoint2)
350 self.node.mkdir(self.run_home(self.node))
352 if self.check_endpoints():
353 #Invoke connect script between switches
354 self.switch_connect(self.endpoint1, self.endpoint2)
355 self.switch_connect(self.endpoint2, self.endpoint1)
358 # Invoke connect script between switch & host
359 (self._pid, self._ppid) = self.udp_connect(self.endpoint2, self.endpoint1)
360 self.sw_host_connect(self.endpoint1, self.endpoint2)
362 super(OVSTunnel, self).do_provision()
365 if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \
366 (not self.endpoint2 or self.endpoint2.state < ResourceState.READY):
367 self.ec.schedule(reschedule_delay, self.deploy)
373 super(OVSTunnel, self).do_deploy()
375 def do_release(self):
376 """ Release the udp_tunnel on endpoint2.
377 On endpoint1 means nothing special.
379 if not self.check_endpoints():
380 # Kill the TAP devices
381 # TODO: Make more generic Release method of PLTAP
382 if self._pid and self._ppid:
383 self._nodes = self.get_node(self.endpoint2)
384 (out, err), proc = self.node.kill(self._pid,
385 self._ppid, sudo = True)
386 if err or proc.poll():
387 # check if execution errors occurred
388 msg = " Failed to delete TAP device"
389 self.error(msg, err, err)
391 super(OVSTunnel, self).do_release()