2 # -*- coding: utf-8 -*-
16 from nepi.util import server
18 class TunProtoBase(object):
19 def __init__(self, local, peer, home_path, key):
20 # Weak references, since ifaces do have a reference to the
21 # tunneling protocol implementation - we don't want strong
22 # circular references.
23 self.peer = weakref.ref(peer)
24 self.local = weakref.ref(local)
30 self.home_path = home_path
34 self._started_listening = False
35 self._starting = False
41 self._logger = logging.getLogger('nepi.testbeds.planetlab')
46 return '<%s for %s>' % (self.__class__.__name__, local)
48 return super(TunProtoBase,self).__str__()
54 raise RuntimeError, "Lost reference to peering interfaces before launching"
56 raise RuntimeError, "Unconnected TUN - missing node"
58 # Make sure all the paths are created where
59 # they have to be created for deployment
60 # Also remove pidfile, if there is one.
61 # Old pidfiles from previous runs can be troublesome.
62 cmd = "mkdir -p %(home)s ; rm -f %(home)s/pid %(home)s/*.so" % {
63 'home' : server.shell_escape(self.home_path)
65 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
67 host = local.node.hostname,
69 user = local.node.slicename,
71 ident_key = local.node.ident_path,
72 server_key = local.node.server_key,
78 raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
81 def _install_scripts(self):
85 raise RuntimeError, "Lost reference to peering interfaces before launching"
87 raise RuntimeError, "Unconnected TUN - missing node"
89 # Install the tun_connect script and tunalloc utility
90 from nepi.util import tunchannel
91 from nepi.util import ipaddr2
93 os.path.join(os.path.dirname(__file__), 'scripts', 'tun_connect.py'),
94 os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c'),
95 re.sub(r"([.]py)[co]$", r'\1', tunchannel.__file__, 1), # pyc/o files are version-specific
96 re.sub(r"([.]py)[co]$", r'\1', ipaddr2.__file__, 1), # pyc/o files are version-specific
98 if local.filter_module:
99 filter_sources = filter(bool,map(str.strip,local.filter_module.module.split()))
100 filter_module = filter_sources[0]
102 # Translate paths to builtin sources
103 for i,source in enumerate(filter_sources):
104 if not os.path.exists(source):
105 # Um... try the builtin folder
106 source = os.path.join(os.path.dirname(__file__), "scripts", source)
107 if os.path.exists(source):
109 filter_sources[i] = source
111 sources.extend(set(filter_sources))
115 filter_sources = None
116 dest = "%s@%s:%s" % (
117 local.node.slicename, local.node.hostname,
118 os.path.join(self.home_path,'.'),)
119 (out,err),proc = server.eintr_retry(server.popen_scp)(
122 ident_key = local.node.ident_path,
123 server_key = local.node.server_key
127 raise RuntimeError, "Failed upload TUN connect script %r: %s %s" % (sources, out,err,)
129 # Make sure all dependencies are satisfied
130 local.node.wait_dependencies()
134 "gcc -fPIC -shared tunalloc.c -o tunalloc.so && "
136 "wget -q -c -O python-iovec-src.tar.gz %(iovec_url)s && "
137 "mkdir -p python-iovec && "
138 "cd python-iovec && "
139 "tar xzf ../python-iovec-src.tar.gz --strip-components=1 && "
140 "python setup.py build && "
141 "python setup.py install --install-lib .. && "
145 "gcc -fPIC -shared %(sources)s -o %(module)s.so " % {
146 'module' : os.path.basename(filter_module).rsplit('.',1)[0],
147 'sources' : ' '.join(map(os.path.basename,filter_sources))
150 if filter_module is not None and filter_module.endswith('.c')
155 "wget -q -c -O python-passfd-src.tar.gz %(passfd_url)s && "
156 "mkdir -p python-passfd && "
157 "cd python-passfd && "
158 "tar xzf ../python-passfd-src.tar.gz --strip-components=1 && "
159 "python setup.py build && "
160 "python setup.py install --install-lib .. "
162 if local.tun_proto == "fd"
167 'home' : server.shell_escape(self.home_path),
168 'passfd_url' : "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-passfd/archive/2a6472c64c87.tar.gz",
169 'iovec_url' : "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-iovec/archive/tip.tar.gz",
171 (out,err),proc = server.popen_ssh_command(
173 host = local.node.hostname,
175 user = local.node.slicename,
177 ident_key = local.node.ident_path,
178 server_key = local.node.server_key,
183 raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
185 def launch(self, check_proto, listen, extra_args=[]):
187 raise AssertionError, "Double start"
189 self._starting = True
194 if not peer or not local:
195 raise RuntimeError, "Lost reference to peering interfaces before launching"
197 peer_port = peer.tun_port
198 peer_addr = peer.tun_addr
199 peer_proto= peer.tun_proto
200 peer_cipher=peer.tun_cipher
202 local_port = self.port
203 local_cap = local.capture
204 local_addr = local.address
205 local_mask = local.netprefix
206 local_snat = local.snat
207 local_txq = local.txqueuelen
208 local_p2p = local.pointopoint
209 local_cipher=local.tun_cipher
210 local_mcast= local.multicast
211 local_bwlim= local.bwlimit
213 if not local_p2p and hasattr(peer, 'address'):
214 local_p2p = peer.address
216 if check_proto != peer_proto:
217 raise RuntimeError, "Peering protocol mismatch: %s != %s" % (check_proto, peer_proto)
219 if local_cipher != peer_cipher:
220 raise RuntimeError, "Peering cipher mismatch: %s != %s" % (local_cipher, peer_cipher)
222 if not listen and ((peer_proto != 'fd' and not peer_port) or not peer_addr):
223 raise RuntimeError, "Misconfigured peer: %s" % (peer,)
225 if listen and ((peer_proto != 'fd' and not local_port) or not local_addr or not local_mask):
226 raise RuntimeError, "Misconfigured TUN: %s" % (local,)
228 if check_proto == 'gre' and local_cipher.lower() != 'plain':
229 raise RuntimeError, "Misconfigured TUN: %s - GRE tunnels do not support encryption. Got %s, you MUST use PLAIN" % (local, local_cipher,)
231 if local.filter_module:
232 if check_proto not in ('udp', 'tcp'):
233 raise RuntimeError, "Miscofnigured TUN: %s - filtered tunnels only work with udp or tcp links" % (local,)
234 filter_module = filter(bool,map(str.strip,local.filter_module.module.split()))
235 filter_module = os.path.join('.',os.path.basename(filter_module[0]))
236 if filter_module.endswith('.c'):
237 filter_module = filter_module.rsplit('.',1)[0] + '.so'
238 filter_args = local.filter_module.args
243 args = ["python", "tun_connect.py",
244 "-m", str(self.mode),
245 "-A", str(local_addr),
246 "-M", str(local_mask),
247 "-C", str(local_cipher)]
249 if check_proto == 'fd':
250 passfd_arg = str(peer_addr)
251 if passfd_arg.startswith('\x00'):
252 # cannot shell_encode null characters :(
253 passfd_arg = "base64:"+base64.b64encode(passfd_arg)
255 passfd_arg = '$HOME/'+server.shell_escape(passfd_arg)
257 "--pass-fd", passfd_arg
259 elif check_proto == 'gre':
261 "-K", str(min(local_port, peer_port))
265 "-p", str(local_port if listen else peer_port),
272 args.extend(("-P",str(local_p2p)))
274 args.extend(("-Q",str(local_txq)))
277 elif local_cap == 'pcap':
278 args.extend(('-c','pcap'))
280 args.append("--multicast")
282 args.extend(("-b",str(local_bwlim*1024)))
284 args.extend(map(str,extra_args))
285 if not listen and check_proto != 'fd':
286 args.append(str(peer_addr))
288 args.extend(("--filter", filter_module))
290 args.extend(("--filter-args", filter_args))
292 self._logger.info("Starting %s", self)
295 self._install_scripts()
297 # Start process in a "daemonized" way, using nohup and heavy
298 # stdin/out redirection to avoid connection issues
299 (out,err),proc = rspawn.remote_spawn(
303 home = self.home_path,
306 stderr = rspawn.STDOUT,
309 host = local.node.hostname,
311 user = local.node.slicename,
313 ident_key = local.node.ident_path,
314 server_key = local.node.server_key
318 raise RuntimeError, "Failed to set up TUN: %s %s" % (out,err,)
323 # Tunnel should be still running in its node
324 # Just check its pidfile and we're done
326 self._started_listening = True
329 def _launch_and_wait(self, *p, **kw):
331 self.__launch_and_wait(*p, **kw)
335 self._launcher._exc.append(sys.exc_info())
339 def __launch_and_wait(self, *p, **kw):
342 self.launch(*p, **kw)
344 # Wait for the process to be started
345 while self.status() == rspawn.NOT_STARTED:
348 # Wait for the connection to be established
350 for spin in xrange(30):
351 if self.status() != rspawn.RUNNING:
352 self._logger.warn("FAILED TO CONNECT! %s", self)
356 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
357 "cd %(home)s ; grep -c Connected capture" % dict(
358 home = server.shell_escape(self.home_path)),
359 host = local.node.hostname,
361 user = local.node.slicename,
363 ident_key = local.node.ident_path,
364 server_key = local.node.server_key,
366 err_on_timeout = False
370 if out.strip() == '1':
373 # At least listening?
374 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
375 "cd %(home)s ; grep -c Listening capture" % dict(
376 home = server.shell_escape(self.home_path)),
377 host = local.node.hostname,
379 user = local.node.slicename,
381 ident_key = local.node.ident_path,
382 server_key = local.node.server_key,
384 err_on_timeout = False
388 if out.strip() == '1':
389 self._started_listening = True
391 time.sleep(min(30.0, retrytime))
394 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
395 "cat %(home)s/capture" % dict(
396 home = server.shell_escape(self.home_path)),
397 host = local.node.hostname,
399 user = local.node.slicename,
401 ident_key = local.node.ident_path,
402 server_key = local.node.server_key,
405 err_on_timeout = False
409 raise RuntimeError, "FAILED TO CONNECT %s: %s%s" % (self,out,err)
413 if not self._if_name:
414 # Inspect the trace to check the assigned iface
417 cmd = "cd %(home)s ; grep 'Using tun:' capture | head -1" % dict(
418 home = server.shell_escape(self.home_path))
419 for spin in xrange(30):
420 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
422 host = local.node.hostname,
424 user = local.node.slicename,
426 ident_key = local.node.ident_path,
427 server_key = local.node.server_key,
429 err_on_timeout = False
433 self._logger.debug("if_name: failed cmd %s", cmd)
439 match = re.match(r"Using +tun: +([-a-zA-Z0-9]*).*",out)
441 self._if_name = match.group(1)
444 self._logger.debug("if_name: %r does not match expected pattern from cmd %s", out, cmd)
446 self._logger.debug("if_name: empty output from cmd %s", cmd)
449 self._logger.warn("if_name: Could not get interface name")
457 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
458 "ip show %s >/dev/null 2>&1 && echo ALIVE || echo DEAD" % (name,),
459 host = local.node.hostname,
461 user = local.node.slicename,
463 ident_key = local.node.ident_path,
464 server_key = local.node.server_key,
466 err_on_timeout = False
473 if out.strip() == 'DEAD':
475 elif out.strip() == 'ALIVE':
479 def async_launch(self, check_proto, listen, extra_args=[]):
480 if not self._started and not self._launcher:
481 self._launcher = threading.Thread(
482 target = self._launch_and_wait,
483 args = (check_proto, listen, extra_args))
484 self._launcher._exc = []
485 self._launcher.start()
487 def async_launch_wait(self):
489 self._launcher.join()
491 if self._launcher._exc:
492 exctyp,exval,exctrace = self._launcher._exc[0]
493 raise exctyp,exval,exctrace
494 elif not self._started:
495 raise RuntimeError, "Failed to launch TUN forwarder"
496 elif not self._started:
499 def async_launch_wait_listening(self):
501 for x in xrange(180):
502 if self._launcher._exc:
503 exctyp,exval,exctrace = self._launcher._exc[0]
504 raise exctyp,exval,exctrace
505 elif self._started and self._started_listening:
508 elif not self._started:
515 raise RuntimeError, "Lost reference to local interface"
518 # NOTE: wait a bit for the pidfile to be created
519 if self._started and not self._pid or not self._ppid:
520 pidtuple = rspawn.remote_check_pid(
521 os.path.join(self.home_path,'pid'),
522 host = local.node.hostname,
524 user = local.node.slicename,
526 ident_key = local.node.ident_path,
527 server_key = local.node.server_key
531 self._pid, self._ppid = pidtuple
537 raise RuntimeError, "Lost reference to local interface"
540 if not self._started:
541 return rspawn.NOT_STARTED
542 elif not self._pid or not self._ppid:
543 return rspawn.NOT_STARTED
545 status = rspawn.remote_status(
546 self._pid, self._ppid,
547 host = local.node.hostname,
549 user = local.node.slicename,
551 ident_key = local.node.ident_path,
552 server_key = local.node.server_key
556 def kill(self, nowait = True):
560 raise RuntimeError, "Lost reference to local interface"
562 status = self.status()
563 if status == rspawn.RUNNING:
564 self._logger.info("Stopping %s", self)
566 # kill by ppid+pid - SIGTERM first, then try SIGKILL
568 self._pid, self._ppid,
569 host = local.node.hostname,
571 user = local.node.slicename,
573 ident_key = local.node.ident_path,
574 server_key = local.node.server_key,
582 status = self.status()
583 if status != rspawn.RUNNING:
584 self._logger.info("Stopped %s", self)
587 interval = min(30.0, interval * 1.1)
589 self.kill(nowait=False)
593 if not self.if_alive():
594 self._logger.info("Device down %s", self)
597 interval = min(30.0, interval * 1.1)
600 # tracename : (remotename, localname)
601 'packets' : ('capture','capture'),
602 'pcap' : ('pcap','capture.pcap'),
605 def remote_trace_path(self, whichtrace, tracemap = None):
606 tracemap = self._TRACEMAP if not tracemap else tracemap
609 if whichtrace not in tracemap:
612 return os.path.join(self.home_path, tracemap[whichtrace][1])
614 def sync_trace(self, local_dir, whichtrace, tracemap = None):
615 tracemap = self._TRACEMAP if not tracemap else tracemap
617 if whichtrace not in tracemap:
625 local_path = os.path.join(local_dir, tracemap[whichtrace][1])
627 # create parent local folders
628 if os.path.dirname(local_path):
629 proc = subprocess.Popen(
630 ["mkdir", "-p", os.path.dirname(local_path)],
631 stdout = open("/dev/null","w"),
632 stdin = open("/dev/null","r"))
635 raise RuntimeError, "Failed to synchronize trace"
638 (out,err),proc = server.popen_scp(
639 '%s@%s:%s' % (local.node.slicename, local.node.hostname,
640 os.path.join(self.home_path, tracemap[whichtrace][0])),
644 ident_key = local.node.ident_path,
645 server_key = local.node.server_key
649 raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
658 eg: set up listening ports
660 raise NotImplementedError
668 raise NotImplementedError
674 raise NotImplementedError
683 class TunProtoUDP(TunProtoBase):
684 def __init__(self, local, peer, home_path, key, listening):
685 super(TunProtoUDP, self).__init__(local, peer, home_path, key)
686 self.listening = listening
692 self.async_launch('udp', False, ("-u",str(self.port)))
700 def launch(self, check_proto='udp', listen=False, extra_args=None):
701 if extra_args is None:
702 extra_args = ("-u",str(self.port))
703 super(TunProtoUDP, self).launch(check_proto, listen, extra_args)
705 class TunProtoFD(TunProtoBase):
706 def __init__(self, local, peer, home_path, key, listening):
707 super(TunProtoFD, self).__init__(local, peer, home_path, key)
708 self.listening = listening
714 self.async_launch('fd', False)
722 def launch(self, check_proto='fd', listen=False, extra_args=[]):
723 super(TunProtoFD, self).launch(check_proto, listen, extra_args)
725 class TunProtoGRE(TunProtoBase):
726 def __init__(self, local, peer, home_path, key, listening):
727 super(TunProtoGRE, self).__init__(local, peer, home_path, key)
728 self.listening = listening
729 self.mode = 'pl-gre-ip'
735 self.async_launch('gre', False)
743 def launch(self, check_proto='gre', listen=False, extra_args=[]):
744 super(TunProtoGRE, self).launch(check_proto, listen, extra_args)
746 class TunProtoTCP(TunProtoBase):
747 def __init__(self, local, peer, home_path, key, listening):
748 super(TunProtoTCP, self).__init__(local, peer, home_path, key)
749 self.listening = listening
753 self.async_launch('tcp', True)
756 if not self.listening:
757 # make sure our peer is ready
759 if peer and peer.peer_proto_impl:
760 peer.peer_proto_impl.async_launch_wait_listening()
762 if not self._started:
763 self.async_launch('tcp', False)
773 def launch(self, check_proto='tcp', listen=None, extra_args=[]):
775 listen = self.listening
776 super(TunProtoTCP, self).launch(check_proto, listen, extra_args)
778 class TapProtoUDP(TunProtoUDP):
779 def __init__(self, local, peer, home_path, key, listening):
780 super(TapProtoUDP, self).__init__(local, peer, home_path, key, listening)
783 class TapProtoTCP(TunProtoTCP):
784 def __init__(self, local, peer, home_path, key, listening):
785 super(TapProtoTCP, self).__init__(local, peer, home_path, key, listening)
788 class TapProtoFD(TunProtoFD):
789 def __init__(self, local, peer, home_path, key, listening):
790 super(TapProtoFD, self).__init__(local, peer, home_path, key, listening)
793 class TapProtoGRE(TunProtoGRE):
794 def __init__(self, local, peer, home_path, key, listening):
795 super(TapProtoGRE, self).__init__(local, peer, home_path, key, listening)
796 self.mode = 'pl-gre-eth'