Ticket #45: spanning tree deployment
[nepi.git] / src / nepi / testbeds / planetlab / tunproto.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import weakref
5 import os
6 import os.path
7 import rspawn
8 import subprocess
9 import threading
10 import base64
11 import time
12 import re
13
14 from nepi.util import server
15
16 class TunProtoBase(object):
17     def __init__(self, local, peer, home_path, key):
18         # Weak references, since ifaces do have a reference to the
19         # tunneling protocol implementation - we don't want strong
20         # circular references.
21         self.peer = weakref.ref(peer)
22         self.local = weakref.ref(local)
23         
24         self.port = 15000
25         self.mode = 'pl-tun'
26         self.key = key
27         
28         self.home_path = home_path
29         
30         self._launcher = None
31         self._started = False
32         self._starting = False
33         self._pid = None
34         self._ppid = None
35         self._if_name = None
36
37     def _make_home(self):
38         local = self.local()
39         
40         if not local:
41             raise RuntimeError, "Lost reference to peering interfaces before launching"
42         if not local.node:
43             raise RuntimeError, "Unconnected TUN - missing node"
44         
45         # Make sure all the paths are created where 
46         # they have to be created for deployment
47         # Also remove pidfile, if there is one.
48         # Old pidfiles from previous runs can be troublesome.
49         cmd = "mkdir -p %(home)s ; rm -f %(home)s/pid" % {
50             'home' : server.shell_escape(self.home_path)
51         }
52         (out,err),proc = server.popen_ssh_command(
53             cmd,
54             host = local.node.hostname,
55             port = None,
56             user = local.node.slicename,
57             agent = None,
58             ident_key = local.node.ident_path,
59             server_key = local.node.server_key
60             )
61         
62         if proc.wait():
63             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
64         
65     
66     def _install_scripts(self):
67         local = self.local()
68         
69         if not local:
70             raise RuntimeError, "Lost reference to peering interfaces before launching"
71         if not local.node:
72             raise RuntimeError, "Unconnected TUN - missing node"
73         
74         # Install the tun_connect script and tunalloc utility
75         from nepi.util import tunchannel
76         sources = [
77             os.path.join(os.path.dirname(__file__), 'scripts', 'tun_connect.py'),
78             os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c'),
79             re.sub(r"([.]py)[co]$", r'\1', tunchannel.__file__, 1), # pyc/o files are version-specific
80         ]
81         dest = "%s@%s:%s" % (
82             local.node.slicename, local.node.hostname, 
83             os.path.join(self.home_path,'.'),)
84         (out,err),proc = server.popen_scp(
85             sources,
86             dest,
87             ident_key = local.node.ident_path,
88             server_key = local.node.server_key
89             )
90     
91         if proc.wait():
92             raise RuntimeError, "Failed upload TUN connect script %r: %s %s" % (sources, out,err,)
93         
94         # Make sure all dependencies are satisfied
95         local.node.wait_dependencies()
96
97         cmd = ( (
98             "cd %(home)s && gcc -fPIC -shared tunalloc.c -o tunalloc.so"
99             + ( " && "
100                 "wget -q -c -O python-passfd-src.tar.gz %(passfd_url)s && "
101                 "mkdir -p python-passfd && "
102                 "cd python-passfd && "
103                 "tar xzf ../python-passfd-src.tar.gz --strip-components=1 && "
104                 "python setup.py build && "
105                 "python setup.py install --install-lib .. "
106                 
107                 if local.tun_proto == "fd" else ""
108             ) )
109         % {
110             'home' : server.shell_escape(self.home_path),
111             'passfd_url' : "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-passfd/archive/2a6472c64c87.tar.gz",
112         } )
113         (out,err),proc = server.popen_ssh_command(
114             cmd,
115             host = local.node.hostname,
116             port = None,
117             user = local.node.slicename,
118             agent = None,
119             ident_key = local.node.ident_path,
120             server_key = local.node.server_key
121             )
122         
123         if proc.wait():
124             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
125         
126     def launch(self, check_proto, listen, extra_args=[]):
127         if self._starting:
128             raise AssertionError, "Double start"
129         
130         self._starting = True
131         
132         peer = self.peer()
133         local = self.local()
134         
135         if not peer or not local:
136             raise RuntimeError, "Lost reference to peering interfaces before launching"
137         
138         peer_port = peer.tun_port
139         peer_addr = peer.tun_addr
140         peer_proto= peer.tun_proto
141         
142         local_port = self.port
143         local_cap  = local.capture
144         local_addr = local.address
145         local_mask = local.netprefix
146         local_snat = local.snat
147         local_txq  = local.txqueuelen
148         local_p2p  = local.pointopoint
149         
150         if not local_p2p and hasattr(peer, 'address'):
151             local_p2p = peer.address
152
153         if check_proto != peer_proto:
154             raise RuntimeError, "Peering protocol mismatch: %s != %s" % (check_proto, peer_proto)
155         
156         if not listen and ((peer_proto != 'fd' and not peer_port) or not peer_addr):
157             raise RuntimeError, "Misconfigured peer: %s" % (peer,)
158         
159         if listen and ((peer_proto != 'fd' and not local_port) or not local_addr or not local_mask):
160             raise RuntimeError, "Misconfigured TUN: %s" % (local,)
161         
162         args = ["python", "tun_connect.py", 
163             "-m", str(self.mode),
164             "-A", str(local_addr),
165             "-M", str(local_mask)]
166         
167         if check_proto == 'fd':
168             passfd_arg = str(peer_addr)
169             if passfd_arg.startswith('\x00'):
170                 # cannot shell_encode null characters :(
171                 passfd_arg = "base64:"+base64.b64encode(passfd_arg)
172             else:
173                 passfd_arg = '$HOME/'+server.shell_escape(passfd_arg)
174             args.extend([
175                 "--pass-fd", passfd_arg
176             ])
177         else:
178             args.extend([
179                 "-p", str(local_port if listen else peer_port),
180                 "-k", str(self.key)
181             ])
182         
183         if local_snat:
184             args.append("-S")
185         if local_p2p:
186             args.extend(("-P",str(local_p2p)))
187         if local_txq:
188             args.extend(("-Q",str(local_txq)))
189         if not local_cap:
190             args.append("-N")
191         if extra_args:
192             args.extend(map(str,extra_args))
193         if not listen and check_proto != 'fd':
194             args.append(str(peer_addr))
195         
196         self._make_home()
197         self._install_scripts()
198         
199         # Start process in a "daemonized" way, using nohup and heavy
200         # stdin/out redirection to avoid connection issues
201         (out,err),proc = rspawn.remote_spawn(
202             " ".join(args),
203             
204             pidfile = './pid',
205             home = self.home_path,
206             stdin = '/dev/null',
207             stdout = 'capture',
208             stderr = rspawn.STDOUT,
209             sudo = True,
210             
211             host = local.node.hostname,
212             port = None,
213             user = local.node.slicename,
214             agent = None,
215             ident_key = local.node.ident_path,
216             server_key = local.node.server_key
217             )
218         
219         if proc.wait():
220             raise RuntimeError, "Failed to set up TUN: %s %s" % (out,err,)
221         
222         self._started = True
223     
224     def _launch_and_wait(self, *p, **kw):
225         try:
226             self.__launch_and_wait(*p, **kw)
227         except:
228             if self._launcher:
229                 import sys
230                 self._launcher._exc.append(sys.exc_info())
231             else:
232                 raise
233             
234     def __launch_and_wait(self, *p, **kw):
235         local = self.local()
236         
237         self.launch(*p, **kw)
238         
239         # Wait for the process to be started
240         while self.status() == rspawn.NOT_STARTED:
241             time.sleep(1.0)
242         
243         # Wait for the connection to be established
244         for spin in xrange(30):
245             if self.status() != rspawn.RUNNING:
246                 break
247             
248             (out,err),proc = server.popen_ssh_command(
249                 "cd %(home)s ; grep -c Connected capture" % dict(
250                     home = server.shell_escape(self.home_path)),
251                 host = local.node.hostname,
252                 port = None,
253                 user = local.node.slicename,
254                 agent = None,
255                 ident_key = local.node.ident_path,
256                 server_key = local.node.server_key
257                 )
258             
259             if proc.wait():
260                 break
261             
262             if out.strip() != '0':
263                 break
264             
265             time.sleep(1.0)
266     
267     @property
268     def if_name(self):
269         if not self._if_name:
270             # Inspect the trace to check the assigned iface
271             local = self.local()
272             if local:
273                 for spin in xrange(30):
274                     (out,err),proc = server.popen_ssh_command(
275                         "cd %(home)s ; grep 'Using tun:' capture | head -1" % dict(
276                             home = server.shell_escape(self.home_path)),
277                         host = local.node.hostname,
278                         port = None,
279                         user = local.node.slicename,
280                         agent = None,
281                         ident_key = local.node.ident_path,
282                         server_key = local.node.server_key
283                         )
284                     
285                     if proc.wait():
286                         return
287                     
288                     out = out.strip()
289                     
290                     match = re.match(r"Using +tun: +([-a-zA-Z0-9]*) +.*",out)
291                     if match:
292                         self._if_name = match.group(1)
293         return self._if_name
294     
295     def async_launch(self, check_proto, listen, extra_args=[]):
296         if not self._launcher:
297             self._launcher = threading.Thread(
298                 target = self._launch_and_wait,
299                 args = (check_proto, listen, extra_args))
300             self._launcher._exc = []
301             self._launcher.start()
302     
303     def async_launch_wait(self):
304         if self._launcher:
305             self._launcher.join()
306             if not self._started:
307                 if self._launcher._exc:
308                     exctyp,exval,exctrace = self._launcher._exc[0]
309                     raise exctyp,exval,exctrace
310                 else:
311                     raise RuntimeError, "Failed to launch TUN forwarder"
312         elif not self._started:
313             self.launch()
314
315     def checkpid(self):            
316         local = self.local()
317         
318         if not local:
319             raise RuntimeError, "Lost reference to local interface"
320         
321         # Get PID/PPID
322         # NOTE: wait a bit for the pidfile to be created
323         if self._started and not self._pid or not self._ppid:
324             pidtuple = rspawn.remote_check_pid(
325                 os.path.join(self.home_path,'pid'),
326                 host = local.node.hostname,
327                 port = None,
328                 user = local.node.slicename,
329                 agent = None,
330                 ident_key = local.node.ident_path,
331                 server_key = local.node.server_key
332                 )
333             
334             if pidtuple:
335                 self._pid, self._ppid = pidtuple
336     
337     def status(self):
338         local = self.local()
339         
340         if not local:
341             raise RuntimeError, "Lost reference to local interface"
342         
343         self.checkpid()
344         if not self._started:
345             return rspawn.NOT_STARTED
346         elif not self._pid or not self._ppid:
347             return rspawn.NOT_STARTED
348         else:
349             status = rspawn.remote_status(
350                 self._pid, self._ppid,
351                 host = local.node.hostname,
352                 port = None,
353                 user = local.node.slicename,
354                 agent = None,
355                 ident_key = local.node.ident_path
356                 )
357             return status
358     
359     def kill(self):
360         local = self.local()
361         
362         if not local:
363             raise RuntimeError, "Lost reference to local interface"
364         
365         status = self.status()
366         if status == rspawn.RUNNING:
367             # kill by ppid+pid - SIGTERM first, then try SIGKILL
368             rspawn.remote_kill(
369                 self._pid, self._ppid,
370                 host = local.node.hostname,
371                 port = None,
372                 user = local.node.slicename,
373                 agent = None,
374                 ident_key = local.node.ident_path,
375                 server_key = local.node.server_key,
376                 sudo = True,
377                 nowait = True
378                 )
379     
380     def waitkill(self):
381         interval = 1.0
382         for i in xrange(30):
383             status = self.status()
384             if status != rspawn.RUNNING:
385                 break
386             time.sleep(interval)
387             interval = min(30.0, interval * 1.1)
388         
389     def sync_trace(self, local_dir, whichtrace):
390         if whichtrace != 'packets':
391             return None
392         
393         local = self.local()
394         
395         if not local:
396             return None
397         
398         local_path = os.path.join(local_dir, 'capture')
399         
400         # create parent local folders
401         proc = subprocess.Popen(
402             ["mkdir", "-p", os.path.dirname(local_path)],
403             stdout = open("/dev/null","w"),
404             stdin = open("/dev/null","r"))
405
406         if proc.wait():
407             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
408         
409         # sync files
410         (out,err),proc = server.popen_scp(
411             '%s@%s:%s' % (local.node.slicename, local.node.hostname, 
412                 os.path.join(self.home_path, 'capture')),
413             local_path,
414             port = None,
415             agent = None,
416             ident_key = local.node.ident_path,
417             server_key = local.node.server_key
418             )
419         
420         if proc.wait():
421             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
422         
423         return local_path
424         
425         
426     def prepare(self):
427         """
428         First-phase setup
429         
430         eg: set up listening ports
431         """
432         raise NotImplementedError
433     
434     def setup(self):
435         """
436         Second-phase setup
437         
438         eg: connect to peer
439         """
440         raise NotImplementedError
441     
442     def shutdown(self):
443         """
444         Cleanup
445         """
446         raise NotImplementedError
447     
448     def destroy(self):
449         """
450         Second-phase cleanup
451         """
452         pass
453         
454
455 class TunProtoUDP(TunProtoBase):
456     def __init__(self, local, peer, home_path, key, listening):
457         super(TunProtoUDP, self).__init__(local, peer, home_path, key)
458         self.listening = listening
459     
460     def prepare(self):
461         pass
462     
463     def setup(self):
464         self.async_launch('udp', False, ("-u",str(self.port)))
465     
466     def shutdown(self):
467         self.kill()
468
469     def destroy(self):
470         self.waitkill()
471
472     def launch(self, check_proto='udp', listen=False, extra_args=None):
473         if extra_args is None:
474             extra_args = ("-u",str(self.port))
475         super(TunProtoUDP, self).launch(check_proto, listen, extra_args)
476
477 class TunProtoFD(TunProtoBase):
478     def __init__(self, local, peer, home_path, key, listening):
479         super(TunProtoFD, self).__init__(local, peer, home_path, key)
480         self.listening = listening
481     
482     def prepare(self):
483         pass
484     
485     def setup(self):
486         self.async_launch('fd', False)
487     
488     def shutdown(self):
489         self.kill()
490
491     def destroy(self):
492         self.waitkill()
493
494     def launch(self, check_proto='fd', listen=False, extra_args=[]):
495         super(TunProtoFD, self).launch(check_proto, listen, extra_args)
496
497 class TunProtoTCP(TunProtoBase):
498     def __init__(self, local, peer, home_path, key, listening):
499         super(TunProtoTCP, self).__init__(local, peer, home_path, key)
500         self.listening = listening
501     
502     def prepare(self):
503         if self.listening:
504             self.async_launch('tcp', True)
505     
506     def setup(self):
507         if not self.listening:
508             # make sure our peer is ready
509             peer = self.peer()
510             if peer and peer.peer_proto_impl:
511                 peer.peer_proto_impl.async_launch_wait()
512             
513             if not self._started:
514                 self.async_launch('tcp', False)
515         
516         self.checkpid()
517     
518     def shutdown(self):
519         self.kill()
520
521     def destroy(self):
522         self.waitkill()
523
524     def launch(self, check_proto='tcp', listen=None, extra_args=[]):
525         if listen is None:
526             listen = self.listening
527         super(TunProtoTCP, self).launch(check_proto, listen, extra_args)
528
529 class TapProtoUDP(TunProtoUDP):
530     def __init__(self, local, peer, home_path, key, listening):
531         super(TapProtoUDP, self).__init__(local, peer, home_path, key, listening)
532         self.mode = 'pl-tap'
533
534 class TapProtoTCP(TunProtoTCP):
535     def __init__(self, local, peer, home_path, key, listening):
536         super(TapProtoTCP, self).__init__(local, peer, home_path, key, listening)
537         self.mode = 'pl-tap'
538
539 class TapProtoFD(TunProtoFD):
540     def __init__(self, local, peer, home_path, key, listening):
541         super(TapProtoFD, self).__init__(local, peer, home_path, key, listening)
542         self.mode = 'pl-tap'
543
544
545
546 TUN_PROTO_MAP = {
547     'tcp' : TunProtoTCP,
548     'udp' : TunProtoUDP,
549     'fd'  : TunProtoFD,
550 }
551
552 TAP_PROTO_MAP = {
553     'tcp' : TapProtoTCP,
554     'udp' : TapProtoUDP,
555     'fd'  : TapProtoFD,
556 }
557
558