2 # -*- coding: utf-8 -*-
16 from nepi.util import server
18 class TunProtoBase(object):
19 def __init__(self, local, peer, home_path, key):
20 # Weak references, since ifaces do have a reference to the
21 # tunneling protocol implementation - we don't want strong
22 # circular references.
23 self.peer = weakref.ref(peer)
24 self.local = weakref.ref(local)
29 self.cross_slice = False
31 self.home_path = home_path
35 self._started_listening = False
36 self._starting = False
42 self._logger = logging.getLogger('nepi.testbeds.planetlab')
47 return '<%s for %s>' % (self.__class__.__name__, local)
49 return super(TunProtoBase,self).__str__()
55 raise RuntimeError, "Lost reference to peering interfaces before launching"
57 raise RuntimeError, "Unconnected TUN - missing node"
59 # Make sure all the paths are created where
60 # they have to be created for deployment
61 # Also remove pidfile, if there is one.
62 # Old pidfiles from previous runs can be troublesome.
63 cmd = "mkdir -p %(home)s ; rm -f %(home)s/pid %(home)s/*.so" % {
64 'home' : server.shell_escape(self.home_path)
66 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
68 host = local.node.hostname,
70 user = local.node.slicename,
72 ident_key = local.node.ident_path,
73 server_key = local.node.server_key,
79 raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
82 def _install_scripts(self):
86 raise RuntimeError, "Lost reference to peering interfaces before launching"
88 raise RuntimeError, "Unconnected TUN - missing node"
90 # Install the tun_connect script and tunalloc utility
91 from nepi.util import tunchannel
92 from nepi.util import ipaddr2
94 os.path.join(os.path.dirname(__file__), 'scripts', 'tun_connect.py'),
95 os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c'),
96 re.sub(r"([.]py)[co]$", r'\1', tunchannel.__file__, 1), # pyc/o files are version-specific
97 re.sub(r"([.]py)[co]$", r'\1', ipaddr2.__file__, 1), # pyc/o files are version-specific
99 if local.filter_module:
100 filter_sources = filter(bool,map(str.strip,local.filter_module.module.split()))
101 filter_module = filter_sources[0]
103 # Translate paths to builtin sources
104 for i,source in enumerate(filter_sources):
105 if not os.path.exists(source):
106 # Um... try the builtin folder
107 source = os.path.join(os.path.dirname(__file__), "scripts", source)
108 if os.path.exists(source):
110 filter_sources[i] = source
112 sources.extend(set(filter_sources))
116 filter_sources = None
117 dest = "%s@%s:%s" % (
118 local.node.slicename, local.node.hostname,
119 os.path.join(self.home_path,'.'),)
120 (out,err),proc = server.eintr_retry(server.popen_scp)(
123 ident_key = local.node.ident_path,
124 server_key = local.node.server_key
128 raise RuntimeError, "Failed upload TUN connect script %r: %s %s" % (sources, out,err,)
130 # Make sure all dependencies are satisfied
131 local.node.wait_dependencies()
135 "gcc -fPIC -shared tunalloc.c -o tunalloc.so && "
137 "wget -q -c -O python-iovec-src.tar.gz %(iovec_url)s && "
138 "mkdir -p python-iovec && "
139 "cd python-iovec && "
140 "tar xzf ../python-iovec-src.tar.gz --strip-components=1 && "
141 "python setup.py build && "
142 "python setup.py install --install-lib .. && "
146 "gcc -fPIC -shared %(sources)s -o %(module)s.so " % {
147 'module' : os.path.basename(filter_module).rsplit('.',1)[0],
148 'sources' : ' '.join(map(os.path.basename,filter_sources))
151 if filter_module is not None and filter_module.endswith('.c')
156 "wget -q -c -O python-passfd-src.tar.gz %(passfd_url)s && "
157 "mkdir -p python-passfd && "
158 "cd python-passfd && "
159 "tar xzf ../python-passfd-src.tar.gz --strip-components=1 && "
160 "python setup.py build && "
161 "python setup.py install --install-lib .. "
163 if local.tun_proto == "fd"
168 'home' : server.shell_escape(self.home_path),
169 'passfd_url' : "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-passfd/archive/2a6472c64c87.tar.gz",
170 'iovec_url' : "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-iovec/archive/tip.tar.gz",
172 (out,err),proc = server.popen_ssh_command(
174 host = local.node.hostname,
176 user = local.node.slicename,
178 ident_key = local.node.ident_path,
179 server_key = local.node.server_key,
184 raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
186 def launch(self, check_proto, listen, extra_args=[]):
188 raise AssertionError, "Double start"
190 self._starting = True
195 if not peer or not local:
196 raise RuntimeError, "Lost reference to peering interfaces before launching"
198 peer_port = peer.tun_port
199 peer_addr = peer.tun_addr
200 peer_proto= peer.tun_proto
201 peer_cipher=peer.tun_cipher
203 local_port = self.port
204 local_cap = local.capture
205 local_addr = local.address
206 local_mask = local.netprefix
207 local_snat = local.snat
208 local_txq = local.txqueuelen
209 local_p2p = local.pointopoint
210 local_cipher=local.tun_cipher
211 local_mcast= local.multicast
212 local_bwlim= local.bwlimit
213 local_mcastfwd = local.multicast_forwarder
215 if not local_p2p and hasattr(peer, 'address'):
216 local_p2p = peer.address
218 if check_proto != peer_proto:
219 raise RuntimeError, "Peering protocol mismatch: %s != %s" % (check_proto, peer_proto)
221 if local_cipher != peer_cipher:
222 raise RuntimeError, "Peering cipher mismatch: %s != %s" % (local_cipher, peer_cipher)
224 if not listen and ((peer_proto != 'fd' and not peer_port) or not peer_addr):
225 raise RuntimeError, "Misconfigured peer: %s" % (peer,)
227 if listen and ((peer_proto != 'fd' and not local_port) or not local_addr or not local_mask):
228 raise RuntimeError, "Misconfigured TUN: %s" % (local,)
230 if check_proto == 'gre' and local_cipher.lower() != 'plain':
231 raise RuntimeError, "Misconfigured TUN: %s - GRE tunnels do not support encryption. Got %s, you MUST use PLAIN" % (local, local_cipher,)
233 if local.filter_module:
234 if check_proto not in ('udp', 'tcp'):
235 raise RuntimeError, "Miscofnigured TUN: %s - filtered tunnels only work with udp or tcp links" % (local,)
236 filter_module = filter(bool,map(str.strip,local.filter_module.module.split()))
237 filter_module = os.path.join('.',os.path.basename(filter_module[0]))
238 if filter_module.endswith('.c'):
239 filter_module = filter_module.rsplit('.',1)[0] + '.so'
240 filter_args = local.filter_module.args
245 args = ["python", "tun_connect.py",
246 "-m", str(self.mode),
247 "-A", str(local_addr),
248 "-M", str(local_mask),
249 "-C", str(local_cipher),
252 if check_proto == 'fd':
253 passfd_arg = str(peer_addr)
254 if passfd_arg.startswith('\x00'):
255 # cannot shell_encode null characters :(
256 passfd_arg = "base64:"+base64.b64encode(passfd_arg)
258 passfd_arg = '$HOME/'+server.shell_escape(passfd_arg)
260 "--pass-fd", passfd_arg
262 elif check_proto == 'gre':
265 "-K", str(self.key.strip('='))
269 "-p", str(local_port if listen else peer_port),
276 args.extend(("-P",str(local_p2p)))
278 args.extend(("-Q",str(local_txq)))
281 elif local_cap == 'pcap':
282 args.extend(('-c','pcap'))
284 args.extend(("-b",str(local_bwlim*1024)))
286 args.extend(map(str,extra_args))
287 if not listen and check_proto != 'fd':
288 args.append(str(peer_addr))
290 args.extend(("--filter", filter_module))
292 args.extend(("--filter-args", filter_args))
293 if local_mcast and local_mcastfwd:
294 args.extend(("--multicast-forwarder", local_mcastfwd))
296 self._logger.info("Starting %s", self)
299 self._install_scripts()
301 # Start process in a "daemonized" way, using nohup and heavy
302 # stdin/out redirection to avoid connection issues
303 (out,err),proc = rspawn.remote_spawn(
307 home = self.home_path,
310 stderr = rspawn.STDOUT,
313 host = local.node.hostname,
315 user = local.node.slicename,
317 ident_key = local.node.ident_path,
318 server_key = local.node.server_key
322 raise RuntimeError, "Failed to set up TUN: %s %s" % (out,err,)
327 # Tunnel should be still running in its node
328 # Just check its pidfile and we're done
330 self._started_listening = True
333 def _launch_and_wait(self, *p, **kw):
335 self.__launch_and_wait(*p, **kw)
339 self._launcher._exc.append(sys.exc_info())
343 def __launch_and_wait(self, *p, **kw):
346 self.launch(*p, **kw)
348 # Wait for the process to be started
349 while self.status() == rspawn.NOT_STARTED:
352 # Wait for the connection to be established
354 for spin in xrange(30):
355 if self.status() != rspawn.RUNNING:
356 self._logger.warn("FAILED TO CONNECT! %s", self)
360 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
361 "cd %(home)s ; grep -a -c Connected capture" % dict(
362 home = server.shell_escape(self.home_path)),
363 host = local.node.hostname,
365 user = local.node.slicename,
367 ident_key = local.node.ident_path,
368 server_key = local.node.server_key,
370 err_on_timeout = False
374 if out.strip() == '1':
377 # At least listening?
378 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
379 "cd %(home)s ; grep -a -c Listening capture" % dict(
380 home = server.shell_escape(self.home_path)),
381 host = local.node.hostname,
383 user = local.node.slicename,
385 ident_key = local.node.ident_path,
386 server_key = local.node.server_key,
388 err_on_timeout = False
392 if out.strip() == '1':
393 self._started_listening = True
395 time.sleep(min(30.0, retrytime))
398 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
399 "cat %(home)s/capture" % dict(
400 home = server.shell_escape(self.home_path)),
401 host = local.node.hostname,
403 user = local.node.slicename,
405 ident_key = local.node.ident_path,
406 server_key = local.node.server_key,
409 err_on_timeout = False
413 raise RuntimeError, "FAILED TO CONNECT %s: %s%s" % (self,out,err)
417 if not self._if_name:
418 # Inspect the trace to check the assigned iface
421 cmd = "cd %(home)s ; grep -a 'Using tun:' capture | head -1" % dict(
422 home = server.shell_escape(self.home_path))
423 for spin in xrange(30):
424 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
426 host = local.node.hostname,
428 user = local.node.slicename,
430 ident_key = local.node.ident_path,
431 server_key = local.node.server_key,
433 err_on_timeout = False
437 self._logger.debug("if_name: failed cmd %s", cmd)
443 match = re.match(r"Using +tun: +([-a-zA-Z0-9]*).*",out)
445 self._if_name = match.group(1)
448 self._logger.debug("if_name: %r does not match expected pattern from cmd %s", out, cmd)
450 self._logger.debug("if_name: empty output from cmd %s", cmd)
453 self._logger.warn("if_name: Could not get interface name")
461 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
462 "ip show %s >/dev/null 2>&1 && echo ALIVE || echo DEAD" % (name,),
463 host = local.node.hostname,
465 user = local.node.slicename,
467 ident_key = local.node.ident_path,
468 server_key = local.node.server_key,
470 err_on_timeout = False
477 if out.strip() == 'DEAD':
479 elif out.strip() == 'ALIVE':
483 def async_launch(self, check_proto, listen, extra_args=[]):
484 if not self._started and not self._launcher:
485 self._launcher = threading.Thread(
486 target = self._launch_and_wait,
487 args = (check_proto, listen, extra_args))
488 self._launcher._exc = []
489 self._launcher.start()
491 def async_launch_wait(self):
493 self._launcher.join()
495 if self._launcher._exc:
496 exctyp,exval,exctrace = self._launcher._exc[0]
497 raise exctyp,exval,exctrace
498 elif not self._started:
499 raise RuntimeError, "Failed to launch TUN forwarder"
500 elif not self._started:
503 def async_launch_wait_listening(self):
505 for x in xrange(180):
506 if self._launcher._exc:
507 exctyp,exval,exctrace = self._launcher._exc[0]
508 raise exctyp,exval,exctrace
509 elif self._started and self._started_listening:
512 elif not self._started:
519 raise RuntimeError, "Lost reference to local interface"
522 # NOTE: wait a bit for the pidfile to be created
523 if self._started and not self._pid or not self._ppid:
524 pidtuple = rspawn.remote_check_pid(
525 os.path.join(self.home_path,'pid'),
526 host = local.node.hostname,
528 user = local.node.slicename,
530 ident_key = local.node.ident_path,
531 server_key = local.node.server_key
535 self._pid, self._ppid = pidtuple
541 raise RuntimeError, "Lost reference to local interface"
544 if not self._started:
545 return rspawn.NOT_STARTED
546 elif not self._pid or not self._ppid:
547 return rspawn.NOT_STARTED
549 status = rspawn.remote_status(
550 self._pid, self._ppid,
551 host = local.node.hostname,
553 user = local.node.slicename,
555 ident_key = local.node.ident_path,
556 server_key = local.node.server_key
560 def kill(self, nowait = True):
564 raise RuntimeError, "Lost reference to local interface"
566 status = self.status()
567 if status == rspawn.RUNNING:
568 self._logger.info("Stopping %s", self)
570 # kill by ppid+pid - SIGTERM first, then try SIGKILL
572 self._pid, self._ppid,
573 host = local.node.hostname,
575 user = local.node.slicename,
577 ident_key = local.node.ident_path,
578 server_key = local.node.server_key,
586 status = self.status()
587 if status != rspawn.RUNNING:
588 self._logger.info("Stopped %s", self)
591 interval = min(30.0, interval * 1.1)
593 self.kill(nowait=False)
597 if not self.if_alive():
598 self._logger.info("Device down %s", self)
601 interval = min(30.0, interval * 1.1)
604 # tracename : (remotename, localname)
605 'packets' : ('capture','capture'),
606 'pcap' : ('pcap','capture.pcap'),
609 def remote_trace_path(self, whichtrace, tracemap = None):
610 tracemap = self._TRACEMAP if not tracemap else tracemap
613 if whichtrace not in tracemap:
616 return os.path.join(self.home_path, tracemap[whichtrace][1])
618 def sync_trace(self, local_dir, whichtrace, tracemap = None):
619 tracemap = self._TRACEMAP if not tracemap else tracemap
621 if whichtrace not in tracemap:
629 local_path = os.path.join(local_dir, tracemap[whichtrace][1])
631 # create parent local folders
632 if os.path.dirname(local_path):
633 proc = subprocess.Popen(
634 ["mkdir", "-p", os.path.dirname(local_path)],
635 stdout = open("/dev/null","w"),
636 stdin = open("/dev/null","r"))
639 raise RuntimeError, "Failed to synchronize trace"
642 (out,err),proc = server.popen_scp(
643 '%s@%s:%s' % (local.node.slicename, local.node.hostname,
644 os.path.join(self.home_path, tracemap[whichtrace][0])),
648 ident_key = local.node.ident_path,
649 server_key = local.node.server_key
653 raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
662 eg: set up listening ports
664 raise NotImplementedError
672 raise NotImplementedError
678 raise NotImplementedError
687 class TunProtoUDP(TunProtoBase):
688 def __init__(self, local, peer, home_path, key, listening):
689 super(TunProtoUDP, self).__init__(local, peer, home_path, key)
690 self.listening = listening
696 self.async_launch('udp', False, ("-u",str(self.port)))
704 def launch(self, check_proto='udp', listen=False, extra_args=None):
705 if extra_args is None:
706 extra_args = ("-u",str(self.port))
707 super(TunProtoUDP, self).launch(check_proto, listen, extra_args)
709 class TunProtoFD(TunProtoBase):
710 def __init__(self, local, peer, home_path, key, listening):
711 super(TunProtoFD, self).__init__(local, peer, home_path, key)
712 self.listening = listening
718 self.async_launch('fd', False)
726 def launch(self, check_proto='fd', listen=False, extra_args=[]):
727 super(TunProtoFD, self).launch(check_proto, listen, extra_args)
729 class TunProtoGRE(TunProtoBase):
730 def __init__(self, local, peer, home_path, key, listening):
731 super(TunProtoGRE, self).__init__(local, peer, home_path, key)
732 self.listening = listening
733 self.mode = 'pl-gre-ip'
739 self.async_launch('gre', False)
747 def launch(self, check_proto='gre', listen=False, extra_args=[]):
748 super(TunProtoGRE, self).launch(check_proto, listen, extra_args)
750 class TunProtoTCP(TunProtoBase):
751 def __init__(self, local, peer, home_path, key, listening):
752 super(TunProtoTCP, self).__init__(local, peer, home_path, key)
753 self.listening = listening
757 self.async_launch('tcp', True)
760 if not self.listening:
761 # make sure our peer is ready
763 if peer and peer.peer_proto_impl:
764 peer.peer_proto_impl.async_launch_wait_listening()
766 if not self._started:
767 self.async_launch('tcp', False)
777 def launch(self, check_proto='tcp', listen=None, extra_args=[]):
779 listen = self.listening
780 super(TunProtoTCP, self).launch(check_proto, listen, extra_args)
782 class TapProtoUDP(TunProtoUDP):
783 def __init__(self, local, peer, home_path, key, listening):
784 super(TapProtoUDP, self).__init__(local, peer, home_path, key, listening)
787 class TapProtoTCP(TunProtoTCP):
788 def __init__(self, local, peer, home_path, key, listening):
789 super(TapProtoTCP, self).__init__(local, peer, home_path, key, listening)
792 class TapProtoFD(TunProtoFD):
793 def __init__(self, local, peer, home_path, key, listening):
794 super(TapProtoFD, self).__init__(local, peer, home_path, key, listening)
797 class TapProtoGRE(TunProtoGRE):
798 def __init__(self, local, peer, home_path, key, listening):
799 super(TapProtoGRE, self).__init__(local, peer, home_path, key, listening)
800 self.mode = 'pl-gre-eth'