2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Alexandros Kouvakas <alexandros.kouvakas@gmail.com>
22 from nepi.execution.attribute import Attribute, Flags, Types
23 from nepi.execution.resource import ResourceManager, ResourceFactory, clsinit_copy, \
25 from nepi.resources.linux.application import LinuxApplication
26 from nepi.resources.planetlab.node import PlanetlabNode
27 from nepi.resources.planetlab.openvswitch.ovs import OVSWitch
28 from nepi.util.timefuncs import tnow, tdiffsec
29 from nepi.resources.planetlab.vroute import PlanetlabVroute
30 from nepi.resources.planetlab.tap import PlanetlabTap
36 reschedule_delay = "0.5s"
39 class OVSTunnel(LinuxApplication):
41 .. class:: Class Args :
43 :param ec: The Experiment controller
44 :type ec: ExperimentController
45 :param guid: guid of the RM
47 :param creds: Credentials to communicate with the rm
53 _authorized_connections = ["OVSPort", "PlanetlabTap"]
56 def _register_attributes(cls):
57 """ Register the attributes of Connection RM
60 network = Attribute("network", "IPv4 Network Address",
61 flags = Flags.ExecReadOnly)
63 cipher = Attribute("cipher",
64 "Cipher to encript communication. "
65 "One of PLAIN, AES, Blowfish, DES, DES3. ",
67 allowed = ["PLAIN", "AES", "Blowfish", "DES", "DES3"],
68 type = Types.Enumerate,
69 flags = Flags.ExecReadOnly)
71 cipher_key = Attribute("cipherKey",
72 "Specify a symmetric encryption key with which to protect "
73 "packets across the tunnel. python-crypto must be installed "
75 flags = Flags.ExecReadOnly)
77 txqueuelen = Attribute("txQueueLen",
78 "Specifies the interface's transmission queue length. "
81 flags = Flags.ExecReadOnly)
83 bwlimit = Attribute("bwLimit",
84 "Specifies the interface's emulated bandwidth in bytes "
87 flags = Flags.ExecReadOnly)
89 cls._register_attribute(network)
90 cls._register_attribute(cipher)
91 cls._register_attribute(cipher_key)
92 cls._register_attribute(txqueuelen)
93 cls._register_attribute(bwlimit)
95 def __init__(self, ec, guid):
97 :param ec: The Experiment controller
98 :type ec: ExperimentController
99 :param guid: guid of the RM
103 super(OVSTunnel, self).__init__(ec, guid)
104 self._home = "tunnel-%s" % self.guid
105 self.port_info_tunl = []
112 def log_message(self, msg):
113 return " guid %d - Tunnel - %s " % (self.guid, msg)
118 return self._nodes[0]
120 def app_home(self, node):
121 return os.path.join(self.node.exp_home, self._home)
123 def run_home(self, node):
124 return os.path.join(self.app_home(node), self.ec.run_id)
126 def port_endpoints(self):
127 # Switch-Switch connection
129 for guid in self.connections:
130 rm = self.ec.get_resource(guid)
131 if hasattr(rm, "create_port"):
136 def mixed_endpoints(self):
137 # Switch-Host connection
139 for guid in self.connections:
140 rm = self.ec.get_resource(guid)
141 if hasattr(rm, "create_port"):
143 elif hasattr(rm, "udp_connect_command"):
147 def get_node(self, endpoint):
148 # Get connected to the nodes
150 if hasattr(endpoint, "create_port"):
151 rm_list = endpoint.get_connected(OVSWitch.get_rtype())
153 rm = rm_list[0].get_connected(PlanetlabNode.get_rtype())
155 rm = endpoint.get_connected(PlanetlabNode.get_rtype())
163 if self.check_endpoints:
164 port_endpoints = self.port_endpoints()
165 if port_endpoints: return port_endpoints[0]
167 mixed_endpoints = self.mixed_endpoints()
168 if mixed_endpoints: return mixed_endpoints[0]
172 if self.check_endpoints:
173 port_endpoints = self.port_endpoints()
174 if port_endpoints: return port_endpoints[1]
176 mixed_endpoints = self.mixed_endpoints()
177 if mixed_endpoints: return mixed_endpoints[1]
180 def check_endpoints(self):
181 """ Check if the links are between switches
182 or switch-host. Return False for latter.
184 port_endpoints = self.port_endpoints()
185 if len(port_endpoints) == 2:
189 def get_port_info(self, endpoint, rem_endpoint):
190 """ Retrieve the port_info list for each port
192 :param port_info_tunl: [hostname, publ_IP_addr, port_name,
193 virtual_ip, local_port_Numb]
194 :type port_info_tunl: list
196 self.port_info_tunl = []
197 if self.check_endpoints:
198 # Use for the link switch-->switch
199 self.port_info_tunl.append(endpoint.port_info)
200 host0, ip0, pname0, virt_ip0, pnumber0 = self.port_info_tunl[0]
201 self.port_info_tunl.append(rem_endpoint.port_info)
202 host1, ip1, pname1, virt_ip1, pnumber1 = self.port_info_tunl[1]
203 return (pname0, ip1, pnumber1)
205 # Use for the link host-->switch
206 self.port_info_tunl.append(endpoint.port_info)
207 host0, ip0, pname0, virt_ip0, pnumber0 = self.port_info_tunl[0]
210 def udp_connect(self, endpoint, rem_endpoint):
211 # Collect info from rem_endpoint
212 self._nodes = self.get_node(rem_endpoint)
213 remote_ip = socket.gethostbyname(self.node.get("hostname"))
214 # Collect info from endpoint
215 self._nodes = self.get_node(endpoint)
216 local_port_file = os.path.join(self.run_home(self.node),
218 remote_port_file = os.path.join(self.run_home(self.node),
220 ret_file = os.path.join(self.run_home(self.node),
222 cipher = self.get("cipher")
223 cipher_key = self.get("cipherKey")
224 bwlimit = self.get("bwLimit")
225 txqueuelen = self.get("txQueueLen")
227 rem_port = str(self.get_port_info(rem_endpoint, endpoint))
228 # Upload the remote port in a file
229 self.node.upload(rem_port,
234 udp_connect_command = endpoint.udp_connect_command(
235 remote_ip, local_port_file, remote_port_file,
236 ret_file, cipher, cipher_key, bwlimit, txqueuelen)
238 # upload command to host_connect.sh script
239 shfile = os.path.join(self.app_home(self.node), "host_connect.sh")
240 self.node.upload(udp_connect_command,
245 # invoke connect script
246 cmd = "bash %s" % shfile
247 (out, err), proc = self.node.run(cmd, self.run_home(self.node),
249 stdout = "udp_stdout",
250 stderr = "udp_stderr")
252 # check if execution errors
253 msg = "Failed to connect endpoints"
256 self.error(msg, out, err)
257 raise RuntimeError, msg
259 msg = "Connection on host %s configured" \
260 % self.node.get("hostname")
263 # Wait for pid file to be generated
264 self._nodes = self.get_node(endpoint)
265 pid, ppid = self.node.wait_pid(self.run_home(self.node))
267 # If the process is not running, check for error information
268 # on the remote machine
269 if not pid or not ppid:
270 (out, err), proc = self.node.check_errors(self.run_home(self.node))
271 # Out is what was written in the stderr file
273 msg = " Failed to start command '%s' " % command
274 self.error(msg, out, err)
275 raise RuntimeError, msg
279 def switch_connect(self, endpoint, rem_endpoint):
280 """ Get switch connect command
282 # Get and configure switch connection command
283 (local_port_name, remote_ip, remote_port_num) = self.get_port_info(
284 endpoint, rem_endpoint)
285 switch_connect_command = endpoint.switch_connect_command(
286 local_port_name, remote_ip, remote_port_num)
287 self._nodes = self.get_node(endpoint)
289 # Upload command to the file sw_connect.sh
290 shfile = os.path.join(self.app_home(self.node), "sw_connect.sh")
291 self.node.upload(switch_connect_command,
296 #invoke connect script
297 cmd = "bash %s" % shfile
298 (out, err), proc = self.node.run(cmd, self.run_home(self.node),
300 stdout = "sw_stdout",
301 stderr = "sw_stderr")
303 # check if execution errors occured
305 msg = "Failed to connect endpoints"
306 self.error(msg, out, err)
307 raise RuntimeError, msg
310 msg = "Connection on port %s configured" % local_port_name
313 def wait_local_port(self):
314 """ Waits until the if_name file for the command is generated,
315 and returns the if_name for the device """
320 (out, err), proc = self.node.check_output(self.run_home(self.node), 'local_port')
323 local_port = int(out)
329 msg = "Couldn't retrieve local_port"
330 self.error(msg, out, err)
331 raise RuntimeError, msg
335 def sw_host_connect(self, endpoint, rem_endpoint):
336 """Link switch--> host
338 # Retrieve remote port number from rem_endpoint
339 local_port_name = endpoint.get('port_name')
340 self._nodes = self.get_node(rem_endpoint)
342 # time.sleep(4) # Without this, sometimes I get nothing in remote_port_num
344 remote_port_num = self.wait_local_port()
345 remote_ip = socket.gethostbyname(self.node.get("hostname"))
346 switch_connect_command = endpoint.switch_connect_command(
347 local_port_name, remote_ip, remote_port_num)
349 # Upload command to the file sw_connect.sh
350 self._nodes = self.get_node(endpoint)
351 shfile = os.path.join(self.app_home(self.node), "sw_connect.sh")
352 self.node.upload(switch_connect_command,
357 # Invoke connect script
358 cmd = "bash %s" % shfile
359 (out, err), proc = self.node.run(cmd, self.run_home(self.node),
361 stdout = "sw_stdout",
362 stderr = "sw_stderr")
364 # Check if execution errors occured
367 msg = "Failed to connect endpoints"
368 self.error(msg, out, err)
369 raise RuntimeError, msg
372 msg = "Connection on port %s configured" % local_port_name
375 def do_provision(self):
376 """ Provision the tunnel
379 self._nodes = self.get_node(self.endpoint1)
380 self.node.mkdir(self.run_home(self.node))
381 self._nodes = self.get_node(self.endpoint2)
382 self.node.mkdir(self.run_home(self.node))
384 if self.check_endpoints:
385 #Invoke connect script between switches
386 self.switch_connect(self.endpoint1, self.endpoint2)
387 self.switch_connect(self.endpoint2, self.endpoint1)
390 # Invoke connect script between switch & host
391 (self._pid, self._ppid) = self.udp_connect(self.endpoint2, self.endpoint1)
392 self.sw_host_connect(self.endpoint1, self.endpoint2)
394 super(OVSTunnel, self).do_provision()
398 rclass = ResourceFactory.get_resource_type(PlanetlabTap.get_rtype())
399 for guid in self.connections:
400 rm = self.ec.get_resource(guid)
401 if isinstance(rm, rclass):
406 for guid in self.connections:
407 rm_port = self.ec.get_resource(guid)
408 if hasattr(rm_port, "create_port"):
409 rm_list = rm_port.get_connected(OVSWitch.get_rtype())
414 if not self.check_endpoints:
415 self._vroute = self.ec.register_resource("PlanetlabVroute")
416 self.ec.set(self._vroute, "action", "add")
417 self.ec.set(self._vroute, "network", self.get("network"))
419 self.ec.register_connection(self._vroute, self.tap.guid)
421 self.ec.deploy(guids=[self._vroute], group = self.deployment_group)
425 if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \
426 (not self.endpoint2 or self.endpoint2.state < ResourceState.READY):
427 self.ec.schedule(reschedule_delay, self.deploy)
434 super(OVSTunnel, self).do_deploy()
436 def do_release(self):
437 """ Release the udp_tunnel on endpoint2.
438 On endpoint1 means nothing special.
440 if not self.check_endpoints:
441 # Kill the TAP devices
442 # TODO: Make more generic Release method of PLTAP
443 if self._pid and self._ppid:
444 self._nodes = self.get_node(self.endpoint2)
445 (out, err), proc = self.node.kill(self._pid,
446 self._ppid, sudo = True)
447 if err or proc.poll():
448 # check if execution errors occurred
449 msg = " Failed to delete TAP device"
450 self.error(msg, err, err)
452 super(OVSTunnel, self).do_release()