Ticket #30: Routing in PlanetLab, with a (currently broken) test
[nepi.git] / src / nepi / testbeds / planetlab / tunproto.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import weakref
5 import os
6 import os.path
7 import rspawn
8 import subprocess
9 import threading
10 import base64
11 import time
12 import re
13
14 from nepi.util import server
15
16 class TunProtoBase(object):
17     def __init__(self, local, peer, home_path, key):
18         # Weak references, since ifaces do have a reference to the
19         # tunneling protocol implementation - we don't want strong
20         # circular references.
21         self.peer = weakref.ref(peer)
22         self.local = weakref.ref(local)
23         
24         self.port = 15000
25         self.mode = 'pl-tun'
26         self.key = key
27         
28         self.home_path = home_path
29         
30         self._launcher = None
31         self._started = False
32         self._pid = None
33         self._ppid = None
34         self._if_name = None
35
36     def _make_home(self):
37         local = self.local()
38         
39         if not local:
40             raise RuntimeError, "Lost reference to peering interfaces before launching"
41         if not local.node:
42             raise RuntimeError, "Unconnected TUN - missing node"
43         
44         # Make sure all the paths are created where 
45         # they have to be created for deployment
46         cmd = "mkdir -p %s" % (server.shell_escape(self.home_path),)
47         (out,err),proc = server.popen_ssh_command(
48             cmd,
49             host = local.node.hostname,
50             port = None,
51             user = local.node.slicename,
52             agent = None,
53             ident_key = local.node.ident_path,
54             server_key = local.node.server_key
55             )
56         
57         if proc.wait():
58             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
59         
60     
61     def _install_scripts(self):
62         local = self.local()
63         
64         if not local:
65             raise RuntimeError, "Lost reference to peering interfaces before launching"
66         if not local.node:
67             raise RuntimeError, "Unconnected TUN - missing node"
68         
69         # Install the tun_connect script and tunalloc utility
70         from nepi.util import tunchannel
71         sources = [
72             os.path.join(os.path.dirname(__file__), 'scripts', 'tun_connect.py'),
73             os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c'),
74             re.sub(r"([.]py)[co]$", r'\1', tunchannel.__file__, 1), # pyc/o files are version-specific
75         ]
76         dest = "%s@%s:%s" % (
77             local.node.slicename, local.node.hostname, 
78             os.path.join(self.home_path,'.'),)
79         (out,err),proc = server.popen_scp(
80             sources,
81             dest,
82             ident_key = local.node.ident_path,
83             server_key = local.node.server_key
84             )
85     
86         if proc.wait():
87             raise RuntimeError, "Failed upload TUN connect script %r: %s %s" % (sources, out,err,)
88         
89         # Make sure all dependencies are satisfied
90         local.node.wait_dependencies()
91
92         cmd = ( (
93             "cd %(home)s && gcc -fPIC -shared tunalloc.c -o tunalloc.so"
94             + ( " && "
95                 "wget -q -c -O python-passfd-src.tar.gz %(passfd_url)s && "
96                 "mkdir -p python-passfd && "
97                 "cd python-passfd && "
98                 "tar xzf ../python-passfd-src.tar.gz --strip-components=1 && "
99                 "python setup.py build && "
100                 "python setup.py install --install-lib .. "
101                 
102                 if local.tun_proto == "fd" else ""
103             ) )
104         % {
105             'home' : server.shell_escape(self.home_path),
106             'passfd_url' : "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-passfd/archive/2a6472c64c87.tar.gz",
107         } )
108         (out,err),proc = server.popen_ssh_command(
109             cmd,
110             host = local.node.hostname,
111             port = None,
112             user = local.node.slicename,
113             agent = None,
114             ident_key = local.node.ident_path,
115             server_key = local.node.server_key
116             )
117         
118         if proc.wait():
119             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
120         
121     def launch(self, check_proto, listen, extra_args=[]):
122         peer = self.peer()
123         local = self.local()
124         
125         if not peer or not local:
126             raise RuntimeError, "Lost reference to peering interfaces before launching"
127         
128         peer_port = peer.tun_port
129         peer_addr = peer.tun_addr
130         peer_proto= peer.tun_proto
131         
132         local_port = self.port
133         local_cap  = local.capture
134         local_addr = local.address
135         local_mask = local.netprefix
136         local_snat = local.snat
137         local_txq  = local.txqueuelen
138         local_p2p  = local.pointopoint
139         
140         if not local_p2p and hasattr(peer, 'address'):
141             local_p2p = peer.address
142
143         if check_proto != peer_proto:
144             raise RuntimeError, "Peering protocol mismatch: %s != %s" % (check_proto, peer_proto)
145         
146         if not listen and ((peer_proto != 'fd' and not peer_port) or not peer_addr):
147             raise RuntimeError, "Misconfigured peer: %s" % (peer,)
148         
149         if listen and ((peer_proto != 'fd' and not local_port) or not local_addr or not local_mask):
150             raise RuntimeError, "Misconfigured TUN: %s" % (local,)
151         
152         args = ["python", "tun_connect.py", 
153             "-m", str(self.mode),
154             "-A", str(local_addr),
155             "-M", str(local_mask)]
156         
157         if check_proto == 'fd':
158             passfd_arg = str(peer_addr)
159             if passfd_arg.startswith('\x00'):
160                 # cannot shell_encode null characters :(
161                 passfd_arg = "base64:"+base64.b64encode(passfd_arg)
162             else:
163                 passfd_arg = '$HOME/'+server.shell_escape(passfd_arg)
164             args.extend([
165                 "--pass-fd", passfd_arg
166             ])
167         else:
168             args.extend([
169                 "-p", str(local_port if listen else peer_port),
170                 "-k", str(self.key)
171             ])
172         
173         if local_snat:
174             args.append("-S")
175         if local_p2p:
176             args.extend(("-P",str(local_p2p)))
177         if local_txq:
178             args.extend(("-Q",str(local_txq)))
179         if extra_args:
180             args.extend(map(str,extra_args))
181         if not listen and check_proto != 'fd':
182             args.append(str(peer_addr))
183         
184         self._make_home()
185         self._install_scripts()
186         
187         # Start process in a "daemonized" way, using nohup and heavy
188         # stdin/out redirection to avoid connection issues
189         (out,err),proc = rspawn.remote_spawn(
190             " ".join(args),
191             
192             pidfile = './pid',
193             home = self.home_path,
194             stdin = '/dev/null',
195             stdout = 'capture' if local_cap else '/dev/null',
196             stderr = rspawn.STDOUT,
197             sudo = True,
198             
199             host = local.node.hostname,
200             port = None,
201             user = local.node.slicename,
202             agent = None,
203             ident_key = local.node.ident_path,
204             server_key = local.node.server_key
205             )
206         
207         if proc.wait():
208             raise RuntimeError, "Failed to set up TUN: %s %s" % (out,err,)
209         
210         self._started = True
211     
212     def _launch_and_wait(self, *p, **kw):
213         local = self.local()
214         
215         self.launch(*p, **kw)
216         
217         # Wait for the process to be started
218         while self.status() == rspawn.NOT_STARTED:
219             time.sleep(1.0)
220         
221         # Wait for the connection to be established
222         if local.capture:
223             for spin in xrange(30):
224                 if self.status() != rspawn.RUNNING:
225                     break
226                 
227                 (out,err),proc = server.popen_ssh_command(
228                     "cd %(home)s ; grep -c Connected capture" % dict(
229                         home = server.shell_escape(self.home_path)),
230                     host = local.node.hostname,
231                     port = None,
232                     user = local.node.slicename,
233                     agent = None,
234                     ident_key = local.node.ident_path,
235                     server_key = local.node.server_key
236                     )
237                 
238                 if proc.wait():
239                     break
240                 
241                 if out.strip() != '0':
242                     break
243                 
244                 time.sleep(1.0)
245     
246     @property
247     def if_name(self):
248         if not self._if_name:
249             # Inspect the trace to check the assigned iface
250             local = self.local()
251             if local and local.capture:
252                 for spin in xrange(30):
253                     (out,err),proc = server.popen_ssh_command(
254                         "cd %(home)s ; grep 'Using tun:' capture | head -1" % dict(
255                             home = server.shell_escape(self.home_path)),
256                         host = local.node.hostname,
257                         port = None,
258                         user = local.node.slicename,
259                         agent = None,
260                         ident_key = local.node.ident_path,
261                         server_key = local.node.server_key
262                         )
263                     
264                     if proc.wait():
265                         return
266                     
267                     out = out.strip()
268                     
269                     match = re.match(r"Using +tun: +([-a-zA-Z0-9]*) +.*",out)
270                     if match:
271                         self._if_name = match.group(1)
272         return self._if_name
273     
274     def async_launch(self, check_proto, listen, extra_args=[]):
275         if not self._launcher:
276             self._launcher = threading.Thread(
277                 target = self._launch_and_wait,
278                 args = (check_proto, listen, extra_args))
279             self._launcher.start()
280     
281     def async_launch_wait(self):
282         if self._launcher:
283             self._launcher.join()
284             if not self._started:
285                 raise RuntimeError, "Failed to launch TUN forwarder"
286         elif not self._started:
287             self.launch()
288
289     def checkpid(self):            
290         local = self.local()
291         
292         if not local:
293             raise RuntimeError, "Lost reference to local interface"
294         
295         # Get PID/PPID
296         # NOTE: wait a bit for the pidfile to be created
297         if self._started and not self._pid or not self._ppid:
298             pidtuple = rspawn.remote_check_pid(
299                 os.path.join(self.home_path,'pid'),
300                 host = local.node.hostname,
301                 port = None,
302                 user = local.node.slicename,
303                 agent = None,
304                 ident_key = local.node.ident_path,
305                 server_key = local.node.server_key
306                 )
307             
308             if pidtuple:
309                 self._pid, self._ppid = pidtuple
310     
311     def status(self):
312         local = self.local()
313         
314         if not local:
315             raise RuntimeError, "Lost reference to local interface"
316         
317         self.checkpid()
318         if not self._started:
319             return rspawn.NOT_STARTED
320         elif not self._pid or not self._ppid:
321             return rspawn.NOT_STARTED
322         else:
323             status = rspawn.remote_status(
324                 self._pid, self._ppid,
325                 host = local.node.hostname,
326                 port = None,
327                 user = local.node.slicename,
328                 agent = None,
329                 ident_key = local.node.ident_path
330                 )
331             return status
332     
333     def kill(self):
334         local = self.local()
335         
336         if not local:
337             raise RuntimeError, "Lost reference to local interface"
338         
339         status = self.status()
340         if status == rspawn.RUNNING:
341             # kill by ppid+pid - SIGTERM first, then try SIGKILL
342             rspawn.remote_kill(
343                 self._pid, self._ppid,
344                 host = local.node.hostname,
345                 port = None,
346                 user = local.node.slicename,
347                 agent = None,
348                 ident_key = local.node.ident_path,
349                 server_key = local.node.server_key,
350                 sudo = True
351                 )
352         
353     def sync_trace(self, local_dir, whichtrace):
354         if whichtrace != 'packets':
355             return None
356         
357         local = self.local()
358         
359         if not local:
360             return None
361         
362         local_path = os.path.join(local_dir, 'capture')
363         
364         # create parent local folders
365         proc = subprocess.Popen(
366             ["mkdir", "-p", os.path.dirname(local_path)],
367             stdout = open("/dev/null","w"),
368             stdin = open("/dev/null","r"))
369
370         if proc.wait():
371             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
372         
373         # sync files
374         (out,err),proc = server.popen_scp(
375             '%s@%s:%s' % (local.node.slicename, local.node.hostname, 
376                 os.path.join(self.home_path, 'capture')),
377             local_path,
378             port = None,
379             agent = None,
380             ident_key = local.node.ident_path,
381             server_key = local.node.server_key
382             )
383         
384         if proc.wait():
385             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
386         
387         return local_path
388         
389         
390     def prepare(self):
391         """
392         First-phase setup
393         
394         eg: set up listening ports
395         """
396         raise NotImplementedError
397     
398     def setup(self):
399         """
400         Second-phase setup
401         
402         eg: connect to peer
403         """
404         raise NotImplementedError
405     
406     def shutdown(self):
407         """
408         Cleanup
409         """
410         raise NotImplementedError
411         
412
413 class TunProtoUDP(TunProtoBase):
414     def __init__(self, local, peer, home_path, key, listening):
415         super(TunProtoUDP, self).__init__(local, peer, home_path, key)
416         self.listening = listening
417     
418     def prepare(self):
419         pass
420     
421     def setup(self):
422         self.async_launch('udp', False, ("-u",str(self.port)))
423     
424     def shutdown(self):
425         self.kill()
426
427 class TunProtoFD(TunProtoBase):
428     def __init__(self, local, peer, home_path, key, listening):
429         super(TunProtoFD, self).__init__(local, peer, home_path, key)
430         self.listening = listening
431     
432     def prepare(self):
433         pass
434     
435     def setup(self):
436         self.async_launch('fd', False)
437     
438     def shutdown(self):
439         self.kill()
440
441 class TunProtoTCP(TunProtoBase):
442     def __init__(self, local, peer, home_path, key, listening):
443         super(TunProtoTCP, self).__init__(local, peer, home_path, key)
444         self.listening = listening
445     
446     def prepare(self):
447         if self.listening:
448             self.async_launch('tcp', True)
449     
450     def setup(self):
451         if not self.listening:
452             # make sure our peer is ready
453             peer = self.peer()
454             if peer and peer.peer_proto_impl:
455                 peer.peer_proto_impl.async_launch_wait()
456             
457             if not self._started:
458                 self.launch('tcp', False)
459         else:
460             # make sure WE are ready
461             self.async_launch_wait()
462         
463         self.checkpid()
464     
465     def shutdown(self):
466         self.kill()
467
468 class TapProtoUDP(TunProtoUDP):
469     def __init__(self, local, peer, home_path, key, listening):
470         super(TapProtoUDP, self).__init__(local, peer, home_path, key, listening)
471         self.mode = 'pl-tap'
472
473 class TapProtoTCP(TunProtoTCP):
474     def __init__(self, local, peer, home_path, key, listening):
475         super(TapProtoTCP, self).__init__(local, peer, home_path, key, listening)
476         self.mode = 'pl-tap'
477
478 class TapProtoFD(TunProtoFD):
479     def __init__(self, local, peer, home_path, key, listening):
480         super(TapProtoFD, self).__init__(local, peer, home_path, key, listening)
481         self.mode = 'pl-tap'
482
483
484
485 TUN_PROTO_MAP = {
486     'tcp' : TunProtoTCP,
487     'udp' : TunProtoUDP,
488     'fd'  : TunProtoFD,
489 }
490
491 TAP_PROTO_MAP = {
492     'tcp' : TapProtoTCP,
493     'udp' : TapProtoUDP,
494     'fd'  : TapProtoFD,
495 }
496
497