2 # -*- coding: utf-8 -*-
16 from nepi.util import server
18 class TunProtoBase(object):
19 def __init__(self, local, peer, home_path, key):
20 # Weak references, since ifaces do have a reference to the
21 # tunneling protocol implementation - we don't want strong
22 # circular references.
23 self.peer = weakref.ref(peer)
24 self.local = weakref.ref(local)
30 self.home_path = home_path
34 self._started_listening = False
35 self._starting = False
41 self._logger = logging.getLogger('nepi.testbeds.planetlab')
46 return '<%s for %s>' % (self.__class__.__name__, local)
48 return super(TunProtoBase,self).__str__()
54 raise RuntimeError, "Lost reference to peering interfaces before launching"
56 raise RuntimeError, "Unconnected TUN - missing node"
58 # Make sure all the paths are created where
59 # they have to be created for deployment
60 # Also remove pidfile, if there is one.
61 # Old pidfiles from previous runs can be troublesome.
62 cmd = "mkdir -p %(home)s ; rm -f %(home)s/pid %(home)s/*.so" % {
63 'home' : server.shell_escape(self.home_path)
65 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
67 host = local.node.hostname,
69 user = local.node.slicename,
71 ident_key = local.node.ident_path,
72 server_key = local.node.server_key
76 raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
79 def _install_scripts(self):
83 raise RuntimeError, "Lost reference to peering interfaces before launching"
85 raise RuntimeError, "Unconnected TUN - missing node"
87 # Install the tun_connect script and tunalloc utility
88 from nepi.util import tunchannel
90 os.path.join(os.path.dirname(__file__), 'scripts', 'tun_connect.py'),
91 os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c'),
92 re.sub(r"([.]py)[co]$", r'\1', tunchannel.__file__, 1), # pyc/o files are version-specific
94 if local.filter_module:
95 filter_sources = filter(bool,map(str.strip,local.filter_module.module.split()))
96 filter_module = filter_sources[0]
97 sources.extend(set(filter_sources))
100 filter_sources = None
101 dest = "%s@%s:%s" % (
102 local.node.slicename, local.node.hostname,
103 os.path.join(self.home_path,'.'),)
104 (out,err),proc = server.eintr_retry(server.popen_scp)(
107 ident_key = local.node.ident_path,
108 server_key = local.node.server_key
112 raise RuntimeError, "Failed upload TUN connect script %r: %s %s" % (sources, out,err,)
114 # Make sure all dependencies are satisfied
115 local.node.wait_dependencies()
119 "gcc -fPIC -shared tunalloc.c -o tunalloc.so && "
121 "wget -q -c -O python-iovec-src.tar.gz %(iovec_url)s && "
122 "mkdir -p python-iovec && "
123 "cd python-iovec && "
124 "tar xzf ../python-iovec-src.tar.gz --strip-components=1 && "
125 "python setup.py build && "
126 "python setup.py install --install-lib .. && "
130 "gcc -fPIC -shared %(sources)s -o %(module)s.so " % {
131 'module' : os.path.basename(filter_module).rsplit('.',1)[0],
132 'sources' : ' '.join(map(os.path.basename,filter_sources))
135 if filter_module is not None and filter_module.endswith('.c')
140 "wget -q -c -O python-passfd-src.tar.gz %(passfd_url)s && "
141 "mkdir -p python-passfd && "
142 "cd python-passfd && "
143 "tar xzf ../python-passfd-src.tar.gz --strip-components=1 && "
144 "python setup.py build && "
145 "python setup.py install --install-lib .. "
147 if local.tun_proto == "fd"
152 'home' : server.shell_escape(self.home_path),
153 'passfd_url' : "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-passfd/archive/2a6472c64c87.tar.gz",
154 'iovec_url' : "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-iovec/archive/tip.tar.gz",
156 (out,err),proc = server.popen_ssh_command(
158 host = local.node.hostname,
160 user = local.node.slicename,
162 ident_key = local.node.ident_path,
163 server_key = local.node.server_key
167 raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
169 def launch(self, check_proto, listen, extra_args=[]):
171 raise AssertionError, "Double start"
173 self._starting = True
178 if not peer or not local:
179 raise RuntimeError, "Lost reference to peering interfaces before launching"
181 peer_port = peer.tun_port
182 peer_addr = peer.tun_addr
183 peer_proto= peer.tun_proto
184 peer_cipher=peer.tun_cipher
186 local_port = self.port
187 local_cap = local.capture
188 local_addr = local.address
189 local_mask = local.netprefix
190 local_snat = local.snat
191 local_txq = local.txqueuelen
192 local_p2p = local.pointopoint
193 local_cipher=local.tun_cipher
195 if not local_p2p and hasattr(peer, 'address'):
196 local_p2p = peer.address
198 if check_proto != peer_proto:
199 raise RuntimeError, "Peering protocol mismatch: %s != %s" % (check_proto, peer_proto)
201 if local_cipher != peer_cipher:
202 raise RuntimeError, "Peering cipher mismatch: %s != %s" % (local_cipher, peer_cipher)
204 if not listen and ((peer_proto != 'fd' and not peer_port) or not peer_addr):
205 raise RuntimeError, "Misconfigured peer: %s" % (peer,)
207 if listen and ((peer_proto != 'fd' and not local_port) or not local_addr or not local_mask):
208 raise RuntimeError, "Misconfigured TUN: %s" % (local,)
210 if check_proto == 'gre' and local_cipher.lower() != 'plain':
211 raise RuntimeError, "Misconfigured TUN: %s - GRE tunnels do not support encryption. Got %s, you MUST use PLAIN" % (local, local_cipher,)
213 if local.filter_module:
214 if check_proto not in ('udp', 'tcp'):
215 raise RuntimeError, "Miscofnigured TUN: %s - filtered tunnels only work with udp or tcp links" % (local,)
216 filter_module = filter(bool,map(str.strip,local.filter_module.module.split()))
217 filter_module = os.path.join('.',os.path.basename(filter_module[0]))
218 if filter_module.endswith('.c'):
219 filter_module = filter_module.rsplit('.',1)[0] + '.so'
223 args = ["python", "tun_connect.py",
224 "-m", str(self.mode),
225 "-A", str(local_addr),
226 "-M", str(local_mask),
227 "-C", str(local_cipher)]
229 if check_proto == 'fd':
230 passfd_arg = str(peer_addr)
231 if passfd_arg.startswith('\x00'):
232 # cannot shell_encode null characters :(
233 passfd_arg = "base64:"+base64.b64encode(passfd_arg)
235 passfd_arg = '$HOME/'+server.shell_escape(passfd_arg)
237 "--pass-fd", passfd_arg
239 elif check_proto == 'gre':
241 "-K", str(min(local_port, peer_port))
245 "-p", str(local_port if listen else peer_port),
252 args.extend(("-P",str(local_p2p)))
254 args.extend(("-Q",str(local_txq)))
257 elif local_cap == 'pcap':
258 args.extend(('-c','pcap'))
260 args.extend(map(str,extra_args))
261 if not listen and check_proto != 'fd':
262 args.append(str(peer_addr))
264 args.extend(("--filter", filter_module))
266 self._logger.info("Starting %s", self)
269 self._install_scripts()
271 # Start process in a "daemonized" way, using nohup and heavy
272 # stdin/out redirection to avoid connection issues
273 (out,err),proc = rspawn.remote_spawn(
277 home = self.home_path,
280 stderr = rspawn.STDOUT,
283 host = local.node.hostname,
285 user = local.node.slicename,
287 ident_key = local.node.ident_path,
288 server_key = local.node.server_key
292 raise RuntimeError, "Failed to set up TUN: %s %s" % (out,err,)
297 # Tunnel should be still running in its node
298 # Just check its pidfile and we're done
300 self._started_listening = True
303 def _launch_and_wait(self, *p, **kw):
305 self.__launch_and_wait(*p, **kw)
309 self._launcher._exc.append(sys.exc_info())
313 def __launch_and_wait(self, *p, **kw):
316 self.launch(*p, **kw)
318 # Wait for the process to be started
319 while self.status() == rspawn.NOT_STARTED:
322 # Wait for the connection to be established
324 for spin in xrange(30):
325 if self.status() != rspawn.RUNNING:
326 self._logger.warn("FAILED TO CONNECT! %s", self)
330 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
331 "cd %(home)s ; grep -c Connected capture" % dict(
332 home = server.shell_escape(self.home_path)),
333 host = local.node.hostname,
335 user = local.node.slicename,
337 ident_key = local.node.ident_path,
338 server_key = local.node.server_key
342 if out.strip() == '1':
345 # At least listening?
346 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
347 "cd %(home)s ; grep -c Listening capture" % dict(
348 home = server.shell_escape(self.home_path)),
349 host = local.node.hostname,
351 user = local.node.slicename,
353 ident_key = local.node.ident_path,
354 server_key = local.node.server_key
358 if out.strip() == '1':
359 self._started_listening = True
361 time.sleep(min(30.0, retrytime))
364 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
365 "cat %(home)s/capture" % dict(
366 home = server.shell_escape(self.home_path)),
367 host = local.node.hostname,
369 user = local.node.slicename,
371 ident_key = local.node.ident_path,
372 server_key = local.node.server_key
376 raise RuntimeError, "FAILED TO CONNECT %s: %s%s" % (self,out,err)
380 if not self._if_name:
381 # Inspect the trace to check the assigned iface
384 cmd = "cd %(home)s ; grep 'Using tun:' capture | head -1" % dict(
385 home = server.shell_escape(self.home_path))
386 for spin in xrange(30):
387 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
389 host = local.node.hostname,
391 user = local.node.slicename,
393 ident_key = local.node.ident_path,
394 server_key = local.node.server_key
398 self._logger.debug("if_name: failed cmd %s", cmd)
404 match = re.match(r"Using +tun: +([-a-zA-Z0-9]*).*",out)
406 self._if_name = match.group(1)
409 self._logger.debug("if_name: %r does not match expected pattern from cmd %s", out, cmd)
411 self._logger.debug("if_name: empty output from cmd %s", cmd)
414 self._logger.warn("if_name: Could not get interface name")
422 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
423 "ip show %s >/dev/null 2>&1 && echo ALIVE || echo DEAD" % (name,),
424 host = local.node.hostname,
426 user = local.node.slicename,
428 ident_key = local.node.ident_path,
429 server_key = local.node.server_key
436 if out.strip() == 'DEAD':
438 elif out.strip() == 'ALIVE':
442 def async_launch(self, check_proto, listen, extra_args=[]):
443 if not self._started and not self._launcher:
444 self._launcher = threading.Thread(
445 target = self._launch_and_wait,
446 args = (check_proto, listen, extra_args))
447 self._launcher._exc = []
448 self._launcher.start()
450 def async_launch_wait(self):
452 self._launcher.join()
454 if self._launcher._exc:
455 exctyp,exval,exctrace = self._launcher._exc[0]
456 raise exctyp,exval,exctrace
457 elif not self._started:
458 raise RuntimeError, "Failed to launch TUN forwarder"
459 elif not self._started:
462 def async_launch_wait_listening(self):
464 for x in xrange(180):
465 if self._launcher._exc:
466 exctyp,exval,exctrace = self._launcher._exc[0]
467 raise exctyp,exval,exctrace
468 elif self._started and self._started_listening:
471 elif not self._started:
478 raise RuntimeError, "Lost reference to local interface"
481 # NOTE: wait a bit for the pidfile to be created
482 if self._started and not self._pid or not self._ppid:
483 pidtuple = rspawn.remote_check_pid(
484 os.path.join(self.home_path,'pid'),
485 host = local.node.hostname,
487 user = local.node.slicename,
489 ident_key = local.node.ident_path,
490 server_key = local.node.server_key
494 self._pid, self._ppid = pidtuple
500 raise RuntimeError, "Lost reference to local interface"
503 if not self._started:
504 return rspawn.NOT_STARTED
505 elif not self._pid or not self._ppid:
506 return rspawn.NOT_STARTED
508 status = rspawn.remote_status(
509 self._pid, self._ppid,
510 host = local.node.hostname,
512 user = local.node.slicename,
514 ident_key = local.node.ident_path,
515 server_key = local.node.server_key
519 def kill(self, nowait = True):
523 raise RuntimeError, "Lost reference to local interface"
525 status = self.status()
526 if status == rspawn.RUNNING:
527 self._logger.info("Stopping %s", self)
529 # kill by ppid+pid - SIGTERM first, then try SIGKILL
531 self._pid, self._ppid,
532 host = local.node.hostname,
534 user = local.node.slicename,
536 ident_key = local.node.ident_path,
537 server_key = local.node.server_key,
545 status = self.status()
546 if status != rspawn.RUNNING:
547 self._logger.info("Stopped %s", self)
550 interval = min(30.0, interval * 1.1)
552 self.kill(nowait=False)
556 if not self.if_alive():
557 self._logger.info("Device down %s", self)
560 interval = min(30.0, interval * 1.1)
563 # tracename : (remotename, localname)
564 'packets' : ('capture','capture'),
565 'pcap' : ('pcap','capture.pcap'),
568 def remote_trace_path(self, whichtrace):
569 tracemap = self._TRACEMAP
571 if whichtrace not in tracemap:
574 return os.path.join(self.home_path, tracemap[whichtrace][1])
576 def sync_trace(self, local_dir, whichtrace):
577 tracemap = self._TRACEMAP
579 if whichtrace not in tracemap:
587 local_path = os.path.join(local_dir, tracemap[whichtrace][1])
589 # create parent local folders
590 if os.path.dirname(local_path):
591 proc = subprocess.Popen(
592 ["mkdir", "-p", os.path.dirname(local_path)],
593 stdout = open("/dev/null","w"),
594 stdin = open("/dev/null","r"))
597 raise RuntimeError, "Failed to synchronize trace"
600 (out,err),proc = server.popen_scp(
601 '%s@%s:%s' % (local.node.slicename, local.node.hostname,
602 os.path.join(self.home_path, tracemap[whichtrace][0])),
606 ident_key = local.node.ident_path,
607 server_key = local.node.server_key
611 raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
620 eg: set up listening ports
622 raise NotImplementedError
630 raise NotImplementedError
636 raise NotImplementedError
645 class TunProtoUDP(TunProtoBase):
646 def __init__(self, local, peer, home_path, key, listening):
647 super(TunProtoUDP, self).__init__(local, peer, home_path, key)
648 self.listening = listening
654 self.async_launch('udp', False, ("-u",str(self.port)))
662 def launch(self, check_proto='udp', listen=False, extra_args=None):
663 if extra_args is None:
664 extra_args = ("-u",str(self.port))
665 super(TunProtoUDP, self).launch(check_proto, listen, extra_args)
667 class TunProtoFD(TunProtoBase):
668 def __init__(self, local, peer, home_path, key, listening):
669 super(TunProtoFD, self).__init__(local, peer, home_path, key)
670 self.listening = listening
676 self.async_launch('fd', False)
684 def launch(self, check_proto='fd', listen=False, extra_args=[]):
685 super(TunProtoFD, self).launch(check_proto, listen, extra_args)
687 class TunProtoGRE(TunProtoBase):
688 def __init__(self, local, peer, home_path, key, listening):
689 super(TunProtoGRE, self).__init__(local, peer, home_path, key)
690 self.listening = listening
691 self.mode = 'pl-gre-ip'
697 self.async_launch('gre', False)
705 def launch(self, check_proto='gre', listen=False, extra_args=[]):
706 super(TunProtoGRE, self).launch(check_proto, listen, extra_args)
708 class TunProtoTCP(TunProtoBase):
709 def __init__(self, local, peer, home_path, key, listening):
710 super(TunProtoTCP, self).__init__(local, peer, home_path, key)
711 self.listening = listening
715 self.async_launch('tcp', True)
718 if not self.listening:
719 # make sure our peer is ready
721 if peer and peer.peer_proto_impl:
722 peer.peer_proto_impl.async_launch_wait_listening()
724 if not self._started:
725 self.async_launch('tcp', False)
735 def launch(self, check_proto='tcp', listen=None, extra_args=[]):
737 listen = self.listening
738 super(TunProtoTCP, self).launch(check_proto, listen, extra_args)
740 class TapProtoUDP(TunProtoUDP):
741 def __init__(self, local, peer, home_path, key, listening):
742 super(TapProtoUDP, self).__init__(local, peer, home_path, key, listening)
745 class TapProtoTCP(TunProtoTCP):
746 def __init__(self, local, peer, home_path, key, listening):
747 super(TapProtoTCP, self).__init__(local, peer, home_path, key, listening)
750 class TapProtoFD(TunProtoFD):
751 def __init__(self, local, peer, home_path, key, listening):
752 super(TapProtoFD, self).__init__(local, peer, home_path, key, listening)
755 class TapProtoGRE(TunProtoGRE):
756 def __init__(self, local, peer, home_path, key, listening):
757 super(TapProtoGRE, self).__init__(local, peer, home_path, key, listening)
758 self.mode = 'pl-gre-eth'