Merging with head
[nepi.git] / src / nepi / testbeds / planetlab / tunproto.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import weakref
5 import os
6 import os.path
7 import rspawn
8 import subprocess
9 import threading
10 import base64
11 import time
12 import re
13
14 from nepi.util import server
15
16 class TunProtoBase(object):
17     def __init__(self, local, peer, home_path, key):
18         # Weak references, since ifaces do have a reference to the
19         # tunneling protocol implementation - we don't want strong
20         # circular references.
21         self.peer = weakref.ref(peer)
22         self.local = weakref.ref(local)
23         
24         self.port = 15000
25         self.mode = 'pl-tun'
26         self.key = key
27         
28         self.home_path = home_path
29         
30         self._launcher = None
31         self._started = False
32         self._starting = False
33         self._pid = None
34         self._ppid = None
35         self._if_name = None
36
37     def _make_home(self):
38         local = self.local()
39         
40         if not local:
41             raise RuntimeError, "Lost reference to peering interfaces before launching"
42         if not local.node:
43             raise RuntimeError, "Unconnected TUN - missing node"
44         
45         # Make sure all the paths are created where 
46         # they have to be created for deployment
47         # Also remove pidfile, if there is one.
48         # Old pidfiles from previous runs can be troublesome.
49         cmd = "mkdir -p %(home)s ; rm -f %(home)s/pid" % {
50             'home' : server.shell_escape(self.home_path)
51         }
52         (out,err),proc = server.popen_ssh_command(
53             cmd,
54             host = local.node.hostname,
55             port = None,
56             user = local.node.slicename,
57             agent = None,
58             ident_key = local.node.ident_path,
59             server_key = local.node.server_key
60             )
61         
62         if proc.wait():
63             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
64         
65     
66     def _install_scripts(self):
67         local = self.local()
68         
69         if not local:
70             raise RuntimeError, "Lost reference to peering interfaces before launching"
71         if not local.node:
72             raise RuntimeError, "Unconnected TUN - missing node"
73         
74         # Install the tun_connect script and tunalloc utility
75         from nepi.util import tunchannel
76         sources = [
77             os.path.join(os.path.dirname(__file__), 'scripts', 'tun_connect.py'),
78             os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c'),
79             re.sub(r"([.]py)[co]$", r'\1', tunchannel.__file__, 1), # pyc/o files are version-specific
80         ]
81         dest = "%s@%s:%s" % (
82             local.node.slicename, local.node.hostname, 
83             os.path.join(self.home_path,'.'),)
84         (out,err),proc = server.popen_scp(
85             sources,
86             dest,
87             ident_key = local.node.ident_path,
88             server_key = local.node.server_key
89             )
90     
91         if proc.wait():
92             raise RuntimeError, "Failed upload TUN connect script %r: %s %s" % (sources, out,err,)
93         
94         # Make sure all dependencies are satisfied
95         local.node.wait_dependencies()
96
97         cmd = ( (
98             "cd %(home)s && gcc -fPIC -shared tunalloc.c -o tunalloc.so"
99             + ( " && "
100                 "wget -q -c -O python-passfd-src.tar.gz %(passfd_url)s && "
101                 "mkdir -p python-passfd && "
102                 "cd python-passfd && "
103                 "tar xzf ../python-passfd-src.tar.gz --strip-components=1 && "
104                 "python setup.py build && "
105                 "python setup.py install --install-lib .. "
106                 
107                 if local.tun_proto == "fd" else ""
108             ) )
109         % {
110             'home' : server.shell_escape(self.home_path),
111             'passfd_url' : "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-passfd/archive/2a6472c64c87.tar.gz",
112         } )
113         (out,err),proc = server.popen_ssh_command(
114             cmd,
115             host = local.node.hostname,
116             port = None,
117             user = local.node.slicename,
118             agent = None,
119             ident_key = local.node.ident_path,
120             server_key = local.node.server_key
121             )
122         
123         if proc.wait():
124             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
125         
126     def launch(self, check_proto, listen, extra_args=[]):
127         if self._starting:
128             raise AssertionError, "Double start"
129         
130         self._starting = True
131         
132         peer = self.peer()
133         local = self.local()
134         
135         if not peer or not local:
136             raise RuntimeError, "Lost reference to peering interfaces before launching"
137         
138         peer_port = peer.tun_port
139         peer_addr = peer.tun_addr
140         peer_proto= peer.tun_proto
141         
142         local_port = self.port
143         local_cap  = local.capture
144         local_addr = local.address
145         local_mask = local.netprefix
146         local_snat = local.snat
147         local_txq  = local.txqueuelen
148         local_p2p  = local.pointopoint
149         
150         if not local_p2p and hasattr(peer, 'address'):
151             local_p2p = peer.address
152
153         if check_proto != peer_proto:
154             raise RuntimeError, "Peering protocol mismatch: %s != %s" % (check_proto, peer_proto)
155         
156         if not listen and ((peer_proto != 'fd' and not peer_port) or not peer_addr):
157             raise RuntimeError, "Misconfigured peer: %s" % (peer,)
158         
159         if listen and ((peer_proto != 'fd' and not local_port) or not local_addr or not local_mask):
160             raise RuntimeError, "Misconfigured TUN: %s" % (local,)
161         
162         args = ["python", "tun_connect.py", 
163             "-m", str(self.mode),
164             "-A", str(local_addr),
165             "-M", str(local_mask)]
166         
167         if check_proto == 'fd':
168             passfd_arg = str(peer_addr)
169             if passfd_arg.startswith('\x00'):
170                 # cannot shell_encode null characters :(
171                 passfd_arg = "base64:"+base64.b64encode(passfd_arg)
172             else:
173                 passfd_arg = '$HOME/'+server.shell_escape(passfd_arg)
174             args.extend([
175                 "--pass-fd", passfd_arg
176             ])
177         else:
178             args.extend([
179                 "-p", str(local_port if listen else peer_port),
180                 "-k", str(self.key)
181             ])
182         
183         if local_snat:
184             args.append("-S")
185         if local_p2p:
186             args.extend(("-P",str(local_p2p)))
187         if local_txq:
188             args.extend(("-Q",str(local_txq)))
189         if not local_cap:
190             args.append("-N")
191         if extra_args:
192             args.extend(map(str,extra_args))
193         if not listen and check_proto != 'fd':
194             args.append(str(peer_addr))
195         
196         self._make_home()
197         self._install_scripts()
198         
199         # Start process in a "daemonized" way, using nohup and heavy
200         # stdin/out redirection to avoid connection issues
201         (out,err),proc = rspawn.remote_spawn(
202             " ".join(args),
203             
204             pidfile = './pid',
205             home = self.home_path,
206             stdin = '/dev/null',
207             stdout = 'capture',
208             stderr = rspawn.STDOUT,
209             sudo = True,
210             
211             host = local.node.hostname,
212             port = None,
213             user = local.node.slicename,
214             agent = None,
215             ident_key = local.node.ident_path,
216             server_key = local.node.server_key
217             )
218         
219         if proc.wait():
220             raise RuntimeError, "Failed to set up TUN: %s %s" % (out,err,)
221         
222         self._started = True
223     
224     def _launch_and_wait(self, *p, **kw):
225         local = self.local()
226         
227         self.launch(*p, **kw)
228         
229         # Wait for the process to be started
230         while self.status() == rspawn.NOT_STARTED:
231             time.sleep(1.0)
232         
233         # Wait for the connection to be established
234         for spin in xrange(30):
235             if self.status() != rspawn.RUNNING:
236                 break
237             
238             (out,err),proc = server.popen_ssh_command(
239                 "cd %(home)s ; grep -c Connected capture" % dict(
240                     home = server.shell_escape(self.home_path)),
241                 host = local.node.hostname,
242                 port = None,
243                 user = local.node.slicename,
244                 agent = None,
245                 ident_key = local.node.ident_path,
246                 server_key = local.node.server_key
247                 )
248             
249             if proc.wait():
250                 break
251             
252             if out.strip() != '0':
253                 break
254             
255             time.sleep(1.0)
256     
257     @property
258     def if_name(self):
259         if not self._if_name:
260             # Inspect the trace to check the assigned iface
261             local = self.local()
262             if local:
263                 for spin in xrange(30):
264                     (out,err),proc = server.popen_ssh_command(
265                         "cd %(home)s ; grep 'Using tun:' capture | head -1" % dict(
266                             home = server.shell_escape(self.home_path)),
267                         host = local.node.hostname,
268                         port = None,
269                         user = local.node.slicename,
270                         agent = None,
271                         ident_key = local.node.ident_path,
272                         server_key = local.node.server_key
273                         )
274                     
275                     if proc.wait():
276                         return
277                     
278                     out = out.strip()
279                     
280                     match = re.match(r"Using +tun: +([-a-zA-Z0-9]*) +.*",out)
281                     if match:
282                         self._if_name = match.group(1)
283         return self._if_name
284     
285     def async_launch(self, check_proto, listen, extra_args=[]):
286         if not self._launcher:
287             self._launcher = threading.Thread(
288                 target = self._launch_and_wait,
289                 args = (check_proto, listen, extra_args))
290             self._launcher.start()
291     
292     def async_launch_wait(self):
293         if self._launcher:
294             self._launcher.join()
295             if not self._started:
296                 raise RuntimeError, "Failed to launch TUN forwarder"
297         elif not self._started:
298             self.launch()
299
300     def checkpid(self):            
301         local = self.local()
302         
303         if not local:
304             raise RuntimeError, "Lost reference to local interface"
305         
306         # Get PID/PPID
307         # NOTE: wait a bit for the pidfile to be created
308         if self._started and not self._pid or not self._ppid:
309             pidtuple = rspawn.remote_check_pid(
310                 os.path.join(self.home_path,'pid'),
311                 host = local.node.hostname,
312                 port = None,
313                 user = local.node.slicename,
314                 agent = None,
315                 ident_key = local.node.ident_path,
316                 server_key = local.node.server_key
317                 )
318             
319             if pidtuple:
320                 self._pid, self._ppid = pidtuple
321     
322     def status(self):
323         local = self.local()
324         
325         if not local:
326             raise RuntimeError, "Lost reference to local interface"
327         
328         self.checkpid()
329         if not self._started:
330             return rspawn.NOT_STARTED
331         elif not self._pid or not self._ppid:
332             return rspawn.NOT_STARTED
333         else:
334             status = rspawn.remote_status(
335                 self._pid, self._ppid,
336                 host = local.node.hostname,
337                 port = None,
338                 user = local.node.slicename,
339                 agent = None,
340                 ident_key = local.node.ident_path
341                 )
342             return status
343     
344     def kill(self):
345         local = self.local()
346         
347         if not local:
348             raise RuntimeError, "Lost reference to local interface"
349         
350         status = self.status()
351         if status == rspawn.RUNNING:
352             # kill by ppid+pid - SIGTERM first, then try SIGKILL
353             rspawn.remote_kill(
354                 self._pid, self._ppid,
355                 host = local.node.hostname,
356                 port = None,
357                 user = local.node.slicename,
358                 agent = None,
359                 ident_key = local.node.ident_path,
360                 server_key = local.node.server_key,
361                 sudo = True,
362                 nowait = True
363                 )
364     
365     def waitkill(self):
366         interval = 1.0
367         for i in xrange(30):
368             status = self.status()
369             if status != rspawn.RUNNING:
370                 break
371             time.sleep(interval)
372             interval = min(30.0, interval * 1.1)
373         
374     def sync_trace(self, local_dir, whichtrace):
375         if whichtrace != 'packets':
376             return None
377         
378         local = self.local()
379         
380         if not local:
381             return None
382         
383         local_path = os.path.join(local_dir, 'capture')
384         
385         # create parent local folders
386         proc = subprocess.Popen(
387             ["mkdir", "-p", os.path.dirname(local_path)],
388             stdout = open("/dev/null","w"),
389             stdin = open("/dev/null","r"))
390
391         if proc.wait():
392             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
393         
394         # sync files
395         (out,err),proc = server.popen_scp(
396             '%s@%s:%s' % (local.node.slicename, local.node.hostname, 
397                 os.path.join(self.home_path, 'capture')),
398             local_path,
399             port = None,
400             agent = None,
401             ident_key = local.node.ident_path,
402             server_key = local.node.server_key
403             )
404         
405         if proc.wait():
406             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
407         
408         return local_path
409         
410         
411     def prepare(self):
412         """
413         First-phase setup
414         
415         eg: set up listening ports
416         """
417         raise NotImplementedError
418     
419     def setup(self):
420         """
421         Second-phase setup
422         
423         eg: connect to peer
424         """
425         raise NotImplementedError
426     
427     def shutdown(self):
428         """
429         Cleanup
430         """
431         raise NotImplementedError
432     
433     def destroy(self):
434         """
435         Second-phase cleanup
436         """
437         pass
438         
439
440 class TunProtoUDP(TunProtoBase):
441     def __init__(self, local, peer, home_path, key, listening):
442         super(TunProtoUDP, self).__init__(local, peer, home_path, key)
443         self.listening = listening
444     
445     def prepare(self):
446         pass
447     
448     def setup(self):
449         self.async_launch('udp', False, ("-u",str(self.port)))
450     
451     def shutdown(self):
452         self.kill()
453
454     def destroy(self):
455         self.waitkill()
456
457     def launch(self, check_proto='udp', listen=False, extra_args=None):
458         if extra_args is None:
459             extra_args = ("-u",str(self.port))
460         super(TunProtoUDP, self).launch(check_proto, listen, extra_args)
461
462 class TunProtoFD(TunProtoBase):
463     def __init__(self, local, peer, home_path, key, listening):
464         super(TunProtoFD, self).__init__(local, peer, home_path, key)
465         self.listening = listening
466     
467     def prepare(self):
468         pass
469     
470     def setup(self):
471         self.async_launch('fd', False)
472     
473     def shutdown(self):
474         self.kill()
475
476     def destroy(self):
477         self.waitkill()
478
479     def launch(self, check_proto='fd', listen=False, extra_args=[]):
480         super(TunProtoFD, self).launch(check_proto, listen, extra_args)
481
482 class TunProtoTCP(TunProtoBase):
483     def __init__(self, local, peer, home_path, key, listening):
484         super(TunProtoTCP, self).__init__(local, peer, home_path, key)
485         self.listening = listening
486     
487     def prepare(self):
488         if self.listening:
489             self.async_launch('tcp', True)
490     
491     def setup(self):
492         if not self.listening:
493             # make sure our peer is ready
494             peer = self.peer()
495             if peer and peer.peer_proto_impl:
496                 peer.peer_proto_impl.async_launch_wait()
497             
498             if not self._started:
499                 self.async_launch('tcp', False)
500         
501         self.checkpid()
502     
503     def shutdown(self):
504         self.kill()
505
506     def destroy(self):
507         self.waitkill()
508
509     def launch(self, check_proto='tcp', listen=None, extra_args=[]):
510         if listen is None:
511             listen = self.listening
512         super(TunProtoTCP, self).launch(check_proto, listen, extra_args)
513
514 class TapProtoUDP(TunProtoUDP):
515     def __init__(self, local, peer, home_path, key, listening):
516         super(TapProtoUDP, self).__init__(local, peer, home_path, key, listening)
517         self.mode = 'pl-tap'
518
519 class TapProtoTCP(TunProtoTCP):
520     def __init__(self, local, peer, home_path, key, listening):
521         super(TapProtoTCP, self).__init__(local, peer, home_path, key, listening)
522         self.mode = 'pl-tap'
523
524 class TapProtoFD(TunProtoFD):
525     def __init__(self, local, peer, home_path, key, listening):
526         super(TapProtoFD, self).__init__(local, peer, home_path, key, listening)
527         self.mode = 'pl-tap'
528
529
530
531 TUN_PROTO_MAP = {
532     'tcp' : TunProtoTCP,
533     'udp' : TunProtoUDP,
534     'fd'  : TunProtoFD,
535 }
536
537 TAP_PROTO_MAP = {
538     'tcp' : TapProtoTCP,
539     'udp' : TapProtoUDP,
540     'fd'  : TapProtoFD,
541 }
542
543