2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Alexandros Kouvakas <alexandros.kouvakas@gmail.com>
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState
23 from nepi.resources.linux.application import LinuxApplication
24 from nepi.resources.planetlab.node import PlanetlabNode
25 from nepi.resources.planetlab.openvswitch.ovs import OVSWitch
26 from nepi.util.timefuncs import tnow, tdiffsec
33 reschedule_delay = "0.5s"
36 class Tunnel(LinuxApplication):
38 .. class:: Class Args :
40 :param ec: The Experiment controller
41 :type ec: ExperimentController
42 :param guid: guid of the RM
44 :param creds: Credentials to communicate with the rm
50 _authorized_connections = ["OVSPort", "PlanetlabTap"]
53 def _register_attributes(cls):
54 """ Register the attributes of Connection RM
57 cipher = Attribute("cipher",
58 "Cipher to encript communication. "
59 "One of PLAIN, AES, Blowfish, DES, DES3. ",
61 allowed = ["PLAIN", "AES", "Blowfish", "DES", "DES3"],
62 type = Types.Enumerate,
63 flags = Flags.ExecReadOnly)
65 cipher_key = Attribute("cipherKey",
66 "Specify a symmetric encryption key with which to protect "
67 "packets across the tunnel. python-crypto must be installed "
69 flags = Flags.ExecReadOnly)
71 txqueuelen = Attribute("txQueueLen",
72 "Specifies the interface's transmission queue length. "
75 flags = Flags.ExecReadOnly)
77 bwlimit = Attribute("bwLimit",
78 "Specifies the interface's emulated bandwidth in bytes "
81 flags = Flags.ExecReadOnly)
83 cls._register_attribute(cipher)
84 cls._register_attribute(cipher_key)
85 cls._register_attribute(txqueuelen)
86 cls._register_attribute(bwlimit)
88 def __init__(self, ec, guid):
90 :param ec: The Experiment controller
91 :type ec: ExperimentController
92 :param guid: guid of the RM
96 super(Tunnel, self).__init__(ec, guid)
97 self._home = "tunnel-%s" % self.guid
98 self.port_info_tunl = []
105 return self._nodes[0]
107 def app_home(self, node):
108 return os.path.join(node.exp_home, self._home)
110 def run_home(self, node):
111 return os.path.join(self.app_home(node), self.ec.run_id)
113 def port_endpoints(self):
114 # Switch-Switch connection
116 for guid in self.connections:
117 rm = self.ec.get_resource(guid)
118 if hasattr(rm, "create_port"):
122 def mixed_endpoints(self):
123 # Switch-Host connection
125 for guid in self.connections:
126 rm = self.ec.get_resource(guid)
127 if hasattr(rm, "create_port"):
129 elif hasattr(rm, "udp_connect_command"):
133 def get_node(self, endpoint):
134 # Get connected to the nodes
135 if hasattr(endpoint, "create_port"):
137 rm_list = endpoint.get_connected(OVSWitch.rtype())
139 rm = rm_list[0].get_connected(PlanetlabNode.rtype())
145 rm = endpoint.get_connected(PlanetlabNode.rtype())
152 if self.check_endpoints():
153 port_endpoints = self.port_endpoints()
154 if port_endpoints: return port_endpoints[0]
156 mixed_endpoints = self.mixed_endpoints()
157 if mixed_endpoints: return mixed_endpoints[0]
161 if self.check_endpoints():
162 port_endpoints = self.port_endpoints()
163 if port_endpoints: return port_endpoints[1]
165 mixed_endpoints = self.mixed_endpoints()
166 if mixed_endpoints: return mixed_endpoints[1]
168 def check_endpoints(self):
169 """ Check if the links are between switches
170 or switch-host. Return False for latter.
172 port_endpoints = self.port_endpoints()
173 if len(port_endpoints) == 2:
178 def get_port_info(self, endpoint, rem_endpoint):
179 """ Retrieve the port_info list for each port
181 :param port_info_tunl: [hostname, publ_IP_addr, port_name,
182 virtual_ip, local_port_Numb]
183 :type port_info_tunl: list
185 self.port_info_tunl = []
186 if self.check_endpoints():
187 # Use for the link switch-->switch
188 self.port_info_tunl.append(endpoint.port_info)
189 host0, ip0, pname0, virt_ip0, pnumber0 = self.port_info_tunl[0]
190 self.port_info_tunl.append(rem_endpoint.port_info)
191 host1, ip1, pname1, virt_ip1, pnumber1 = self.port_info_tunl[1]
192 return (pname0, ip1, pnumber1)
195 # Use for the link host-->switch
196 self.port_info_tunl.append(endpoint.port_info)
197 host0, ip0, pname0, virt_ip0, pnumber0 = self.port_info_tunl[0]
200 def udp_connect(self, endpoint, rem_endpoint):
201 # Collect info from rem_endpoint
202 self._nodes = self.get_node(rem_endpoint)
203 remote_ip = socket.gethostbyname(self.node.get("hostname"))
204 # Collect info from endpoint
205 self._nodes = self.get_node(endpoint)
206 local_port_file = os.path.join(self.run_home(self.node),
208 remote_port_file = os.path.join(self.run_home(self.node),
210 ret_file = os.path.join(self.run_home(self.node),
212 cipher = self.get("cipher")
213 cipher_key = self.get("cipherKey")
214 bwlimit = self.get("bwLimit")
215 txqueuelen = self.get("txQueueLen")
217 rem_port = str(self.get_port_info(rem_endpoint, endpoint))
218 # Upload the remote port in a file
219 self.node.upload(rem_port,
224 udp_connect_command = endpoint.udp_connect_command(
225 remote_ip, local_port_file, remote_port_file,
226 ret_file, cipher, cipher_key, bwlimit, txqueuelen)
228 # upload command to host_connect.sh script
229 shfile = os.path.join(self.app_home(self.node), "host_connect.sh")
230 self.node.upload(udp_connect_command,
235 # invoke connect script
236 cmd = "bash %s" % shfile
237 (out, err), proc = self.node.run(cmd, self.run_home(self.node),
239 stdout = "udp_stdout",
240 stderr = "udp_stderr")
242 # check if execution errors
243 msg = "Failed to connect endpoints"
247 self.error(msg, out, err)
248 raise RuntimeError, msg
249 msg = "Connection on host %s configured" \
250 % self.node.get("hostname")
253 # Wait for pid file to be generated
254 self._nodes = self.get_node(endpoint)
255 pid, ppid = self.node.wait_pid(self.run_home(self.node))
257 # If the process is not running, check for error information
258 # on the remote machine
259 if not pid or not ppid:
260 (out, err), proc = self.node.check_errors(self.run_home(self.node))
261 # Out is what was written in the stderr file
264 msg = " Failed to start command '%s' " % command
265 self.error(msg, out, err)
266 raise RuntimeError, msg
270 def switch_connect(self, endpoint, rem_endpoint):
271 """ Get switch connect command
273 # Get and configure switch connection command
274 (local_port_name, remote_ip, remote_port_num) = self.get_port_info(
275 endpoint, rem_endpoint)
276 switch_connect_command = endpoint.switch_connect_command(
277 local_port_name, remote_ip, remote_port_num)
278 self._nodes = self.get_node(endpoint)
280 # Upload command to the file sw_connect.sh
281 shfile = os.path.join(self.app_home(self.node), "sw_connect.sh")
282 self.node.upload(switch_connect_command,
287 #invoke connect script
288 cmd = "bash %s" % shfile
289 (out, err), proc = self.node.run(cmd, self.run_home(self.node),
291 stdout = "sw_stdout",
292 stderr = "sw_stderr")
294 # check if execution errors occured
295 msg = "Failed to connect endpoints"
299 self.error(msg, out, err)
300 raise RuntimeError, msg
302 msg = "Connection on port %s configured" % local_port_name
306 def sw_host_connect(self, endpoint, rem_endpoint):
307 """Link switch--> host
309 # Retrieve remote port number from rem_endpoint
310 local_port_name = endpoint.get('port_name')
311 self._nodes = self.get_node(rem_endpoint)
312 time.sleep(2) # Without this, sometimes I get nothing in remote_port_num
314 (out, err), proc = self.node.check_output(self.run_home(self.node), 'local_port')
315 remote_port_num = int(out)
316 remote_ip = socket.gethostbyname(self.node.get("hostname"))
317 switch_connect_command = endpoint.switch_connect_command(
318 local_port_name, remote_ip, remote_port_num)
320 # Upload command to the file sw_connect.sh
321 self._nodes = self.get_node(endpoint)
322 shfile = os.path.join(self.app_home(self.node), "sw_connect.sh")
323 self.node.upload(switch_connect_command,
328 #invoke connect script
329 cmd = "bash %s" % shfile
330 (out, err), proc = self.node.run(cmd, self.run_home(self.node),
332 stdout = "sw_stdout",
333 stderr = "sw_stderr")
335 # check if execution errors occured
336 msg = "Failed to connect endpoints"
340 self.error(msg, out, err)
341 raise RuntimeError, msg
343 msg = "Connection on port %s configured" % local_port_name
348 """ Provision the tunnel
351 self._nodes = self.get_node(self.endpoint1)
352 self.node.mkdir(self.run_home(self.node))
353 self._nodes = self.get_node(self.endpoint2)
354 self.node.mkdir(self.run_home(self.node))
356 if self.check_endpoints():
357 #Invoke connect script between switches
358 switch_connect1 = self.switch_connect(self.endpoint1, self.endpoint2)
359 switch_connect2 = self.switch_connect(self.endpoint2, self.endpoint1)
362 # Invoke connect script between switch & host
363 (self._pid, self._ppid) = self.udp_connect(self.endpoint2, self.endpoint1)
364 switch_connect = self.sw_host_connect(self.endpoint1, self.endpoint2)
366 self.debug("------- READY -------")
367 self._provision_time = tnow()
368 self._state = ResourceState.PROVISIONED
371 """ Discover the tunnel
377 if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \
378 (not self.endpoint2 or self.endpoint2.state < ResourceState.READY):
379 self.ec.schedule(reschedule_delay, self.deploy)
388 self.debug("----- READY ---- ")
389 self._ready_time = tnow()
390 self._state = ResourceState.READY
393 """ Start the RM. It means nothing special for
400 """ Stop the RM. It means nothing special for
406 """ Release the udp_tunnel on endpoint2.
407 On endpoint1 means nothing special.
409 if not self.check_endpoints():
410 # Kill the TAP devices
411 # TODO: Make more generic Release method of PLTAP
412 if self._pid and self._ppid:
413 self._nodes = self.get_node(self.endpoint2)
414 (out, err), proc = self.node.kill(self._pid,
415 self._ppid, sudo = True)
416 if err or proc.poll():
417 # check if execution errors occurred
418 msg = " Failed to delete TAP device"
419 self.error(msg, err, err)
422 self._state = ResourceState.RELEASED