Better progress messages
[nepi.git] / src / nepi / testbeds / planetlab / tunproto.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import weakref
5 import os
6 import os.path
7 import rspawn
8 import subprocess
9 import threading
10 import base64
11 import time
12 import re
13 import sys
14
15 from nepi.util import server
16
17 class TunProtoBase(object):
18     def __init__(self, local, peer, home_path, key):
19         # Weak references, since ifaces do have a reference to the
20         # tunneling protocol implementation - we don't want strong
21         # circular references.
22         self.peer = weakref.ref(peer)
23         self.local = weakref.ref(local)
24         
25         self.port = 15000
26         self.mode = 'pl-tun'
27         self.key = key
28         
29         self.home_path = home_path
30         
31         self._launcher = None
32         self._started = False
33         self._starting = False
34         self._pid = None
35         self._ppid = None
36         self._if_name = None
37     
38     def __str__(self):
39         local = self.local()
40         if local:
41             return '<%s for %s>' % (self.__class__.__name__, local)
42         else:
43             return super(TunProtoBase,self).__str__()
44
45     def _make_home(self):
46         local = self.local()
47         
48         if not local:
49             raise RuntimeError, "Lost reference to peering interfaces before launching"
50         if not local.node:
51             raise RuntimeError, "Unconnected TUN - missing node"
52         
53         # Make sure all the paths are created where 
54         # they have to be created for deployment
55         # Also remove pidfile, if there is one.
56         # Old pidfiles from previous runs can be troublesome.
57         cmd = "mkdir -p %(home)s ; rm -f %(home)s/pid" % {
58             'home' : server.shell_escape(self.home_path)
59         }
60         (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
61             cmd,
62             host = local.node.hostname,
63             port = None,
64             user = local.node.slicename,
65             agent = None,
66             ident_key = local.node.ident_path,
67             server_key = local.node.server_key
68             )
69         
70         if proc.wait():
71             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
72         
73     
74     def _install_scripts(self):
75         local = self.local()
76         
77         if not local:
78             raise RuntimeError, "Lost reference to peering interfaces before launching"
79         if not local.node:
80             raise RuntimeError, "Unconnected TUN - missing node"
81         
82         # Install the tun_connect script and tunalloc utility
83         from nepi.util import tunchannel
84         sources = [
85             os.path.join(os.path.dirname(__file__), 'scripts', 'tun_connect.py'),
86             os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c'),
87             re.sub(r"([.]py)[co]$", r'\1', tunchannel.__file__, 1), # pyc/o files are version-specific
88         ]
89         dest = "%s@%s:%s" % (
90             local.node.slicename, local.node.hostname, 
91             os.path.join(self.home_path,'.'),)
92         (out,err),proc = server.eintr_retry(server.popen_scp)(
93             sources,
94             dest,
95             ident_key = local.node.ident_path,
96             server_key = local.node.server_key
97             )
98     
99         if proc.wait():
100             raise RuntimeError, "Failed upload TUN connect script %r: %s %s" % (sources, out,err,)
101         
102         # Make sure all dependencies are satisfied
103         local.node.wait_dependencies()
104
105         cmd = ( (
106             "cd %(home)s && gcc -fPIC -shared tunalloc.c -o tunalloc.so"
107             + ( " && "
108                 "wget -q -c -O python-passfd-src.tar.gz %(passfd_url)s && "
109                 "mkdir -p python-passfd && "
110                 "cd python-passfd && "
111                 "tar xzf ../python-passfd-src.tar.gz --strip-components=1 && "
112                 "python setup.py build && "
113                 "python setup.py install --install-lib .. "
114                 
115                 if local.tun_proto == "fd" else ""
116             ) )
117         % {
118             'home' : server.shell_escape(self.home_path),
119             'passfd_url' : "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-passfd/archive/2a6472c64c87.tar.gz",
120         } )
121         (out,err),proc = server.popen_ssh_command(
122             cmd,
123             host = local.node.hostname,
124             port = None,
125             user = local.node.slicename,
126             agent = None,
127             ident_key = local.node.ident_path,
128             server_key = local.node.server_key
129             )
130         
131         if proc.wait():
132             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
133         
134     def launch(self, check_proto, listen, extra_args=[]):
135         if self._starting:
136             raise AssertionError, "Double start"
137         
138         self._starting = True
139         
140         peer = self.peer()
141         local = self.local()
142         
143         if not peer or not local:
144             raise RuntimeError, "Lost reference to peering interfaces before launching"
145         
146         peer_port = peer.tun_port
147         peer_addr = peer.tun_addr
148         peer_proto= peer.tun_proto
149         
150         local_port = self.port
151         local_cap  = local.capture
152         local_addr = local.address
153         local_mask = local.netprefix
154         local_snat = local.snat
155         local_txq  = local.txqueuelen
156         local_p2p  = local.pointopoint
157         
158         if not local_p2p and hasattr(peer, 'address'):
159             local_p2p = peer.address
160
161         if check_proto != peer_proto:
162             raise RuntimeError, "Peering protocol mismatch: %s != %s" % (check_proto, peer_proto)
163         
164         if not listen and ((peer_proto != 'fd' and not peer_port) or not peer_addr):
165             raise RuntimeError, "Misconfigured peer: %s" % (peer,)
166         
167         if listen and ((peer_proto != 'fd' and not local_port) or not local_addr or not local_mask):
168             raise RuntimeError, "Misconfigured TUN: %s" % (local,)
169         
170         args = ["python", "tun_connect.py", 
171             "-m", str(self.mode),
172             "-A", str(local_addr),
173             "-M", str(local_mask)]
174         
175         if check_proto == 'fd':
176             passfd_arg = str(peer_addr)
177             if passfd_arg.startswith('\x00'):
178                 # cannot shell_encode null characters :(
179                 passfd_arg = "base64:"+base64.b64encode(passfd_arg)
180             else:
181                 passfd_arg = '$HOME/'+server.shell_escape(passfd_arg)
182             args.extend([
183                 "--pass-fd", passfd_arg
184             ])
185         else:
186             args.extend([
187                 "-p", str(local_port if listen else peer_port),
188                 "-k", str(self.key)
189             ])
190         
191         if local_snat:
192             args.append("-S")
193         if local_p2p:
194             args.extend(("-P",str(local_p2p)))
195         if local_txq:
196             args.extend(("-Q",str(local_txq)))
197         if not local_cap:
198             args.append("-N")
199         if extra_args:
200             args.extend(map(str,extra_args))
201         if not listen and check_proto != 'fd':
202             args.append(str(peer_addr))
203
204         print >>sys.stderr, "Starting", self
205         
206         self._make_home()
207         self._install_scripts()
208
209         # Start process in a "daemonized" way, using nohup and heavy
210         # stdin/out redirection to avoid connection issues
211         (out,err),proc = rspawn.remote_spawn(
212             " ".join(args),
213             
214             pidfile = './pid',
215             home = self.home_path,
216             stdin = '/dev/null',
217             stdout = 'capture',
218             stderr = rspawn.STDOUT,
219             sudo = True,
220             
221             host = local.node.hostname,
222             port = None,
223             user = local.node.slicename,
224             agent = None,
225             ident_key = local.node.ident_path,
226             server_key = local.node.server_key
227             )
228         
229         if proc.wait():
230             raise RuntimeError, "Failed to set up TUN: %s %s" % (out,err,)
231         
232         self._started = True
233     
234     def _launch_and_wait(self, *p, **kw):
235         try:
236             self.__launch_and_wait(*p, **kw)
237         except:
238             if self._launcher:
239                 import sys
240                 self._launcher._exc.append(sys.exc_info())
241             else:
242                 raise
243             
244     def __launch_and_wait(self, *p, **kw):
245         local = self.local()
246         
247         self.launch(*p, **kw)
248         
249         # Wait for the process to be started
250         while self.status() == rspawn.NOT_STARTED:
251             time.sleep(1.0)
252         
253         # Wait for the connection to be established
254         for spin in xrange(30):
255             if self.status() != rspawn.RUNNING:
256                 print >>sys.stderr, "FAILED TO CONNECT! ", self
257                 break
258             
259             (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
260                 "cd %(home)s ; grep -c Connected capture" % dict(
261                     home = server.shell_escape(self.home_path)),
262                 host = local.node.hostname,
263                 port = None,
264                 user = local.node.slicename,
265                 agent = None,
266                 ident_key = local.node.ident_path,
267                 server_key = local.node.server_key
268                 )
269             
270             if proc.wait():
271                 break
272             
273             if out.strip() != '0':
274                 break
275             
276             time.sleep(1.0)
277         else:
278             print >>sys.stderr, "FAILED TO CONNECT! ", self
279     
280     @property
281     def if_name(self):
282         if not self._if_name:
283             # Inspect the trace to check the assigned iface
284             local = self.local()
285             if local:
286                 for spin in xrange(30):
287                     (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
288                         "cd %(home)s ; grep 'Using tun:' capture | head -1" % dict(
289                             home = server.shell_escape(self.home_path)),
290                         host = local.node.hostname,
291                         port = None,
292                         user = local.node.slicename,
293                         agent = None,
294                         ident_key = local.node.ident_path,
295                         server_key = local.node.server_key
296                         )
297                     
298                     if proc.wait():
299                         return
300                     
301                     out = out.strip()
302                     
303                     match = re.match(r"Using +tun: +([-a-zA-Z0-9]*) +.*",out)
304                     if match:
305                         self._if_name = match.group(1)
306         return self._if_name
307     
308     def async_launch(self, check_proto, listen, extra_args=[]):
309         if not self._launcher:
310             self._launcher = threading.Thread(
311                 target = self._launch_and_wait,
312                 args = (check_proto, listen, extra_args))
313             self._launcher._exc = []
314             self._launcher.start()
315     
316     def async_launch_wait(self):
317         if self._launcher:
318             self._launcher.join()
319             if not self._started:
320                 if self._launcher._exc:
321                     exctyp,exval,exctrace = self._launcher._exc[0]
322                     raise exctyp,exval,exctrace
323                 else:
324                     raise RuntimeError, "Failed to launch TUN forwarder"
325         elif not self._started:
326             print >>sys.stderr, "SYNC", 
327             self.launch()
328
329     def checkpid(self):            
330         local = self.local()
331         
332         if not local:
333             raise RuntimeError, "Lost reference to local interface"
334         
335         # Get PID/PPID
336         # NOTE: wait a bit for the pidfile to be created
337         if self._started and not self._pid or not self._ppid:
338             pidtuple = rspawn.remote_check_pid(
339                 os.path.join(self.home_path,'pid'),
340                 host = local.node.hostname,
341                 port = None,
342                 user = local.node.slicename,
343                 agent = None,
344                 ident_key = local.node.ident_path,
345                 server_key = local.node.server_key
346                 )
347             
348             if pidtuple:
349                 self._pid, self._ppid = pidtuple
350     
351     def status(self):
352         local = self.local()
353         
354         if not local:
355             raise RuntimeError, "Lost reference to local interface"
356         
357         self.checkpid()
358         if not self._started:
359             return rspawn.NOT_STARTED
360         elif not self._pid or not self._ppid:
361             return rspawn.NOT_STARTED
362         else:
363             status = rspawn.remote_status(
364                 self._pid, self._ppid,
365                 host = local.node.hostname,
366                 port = None,
367                 user = local.node.slicename,
368                 agent = None,
369                 ident_key = local.node.ident_path,
370                 server_key = local.node.server_key
371                 )
372             return status
373     
374     def kill(self):
375         local = self.local()
376         
377         if not local:
378             raise RuntimeError, "Lost reference to local interface"
379         
380         status = self.status()
381         if status == rspawn.RUNNING:
382             print >>sys.stderr, "Stopping", self
383             
384             # kill by ppid+pid - SIGTERM first, then try SIGKILL
385             rspawn.remote_kill(
386                 self._pid, self._ppid,
387                 host = local.node.hostname,
388                 port = None,
389                 user = local.node.slicename,
390                 agent = None,
391                 ident_key = local.node.ident_path,
392                 server_key = local.node.server_key,
393                 sudo = True,
394                 nowait = True
395                 )
396     
397     def waitkill(self):
398         interval = 1.0
399         for i in xrange(30):
400             status = self.status()
401             if status != rspawn.RUNNING:
402                 print >>sys.stderr, "Stopped", self
403                 break
404             time.sleep(interval)
405             interval = min(30.0, interval * 1.1)
406     
407     def remote_trace_path(self, whichtrace):
408         if whichtrace != 'packets':
409             return None
410         
411         return os.path.join(self.home_path, 'capture')
412         
413     def sync_trace(self, local_dir, whichtrace):
414         if whichtrace != 'packets':
415             return None
416         
417         local = self.local()
418         
419         if not local:
420             return None
421         
422         local_path = os.path.join(local_dir, 'capture')
423         
424         # create parent local folders
425         if os.path.dirname(local_path):
426             proc = subprocess.Popen(
427                 ["mkdir", "-p", os.path.dirname(local_path)],
428                 stdout = open("/dev/null","w"),
429                 stdin = open("/dev/null","r"))
430
431             if proc.wait():
432                 raise RuntimeError, "Failed to synchronize trace"
433         
434         # sync files
435         (out,err),proc = server.popen_scp(
436             '%s@%s:%s' % (local.node.slicename, local.node.hostname, 
437                 os.path.join(self.home_path, 'capture')),
438             local_path,
439             port = None,
440             agent = None,
441             ident_key = local.node.ident_path,
442             server_key = local.node.server_key
443             )
444         
445         if proc.wait():
446             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
447         
448         return local_path
449         
450         
451     def prepare(self):
452         """
453         First-phase setup
454         
455         eg: set up listening ports
456         """
457         raise NotImplementedError
458     
459     def setup(self):
460         """
461         Second-phase setup
462         
463         eg: connect to peer
464         """
465         raise NotImplementedError
466     
467     def shutdown(self):
468         """
469         Cleanup
470         """
471         raise NotImplementedError
472     
473     def destroy(self):
474         """
475         Second-phase cleanup
476         """
477         pass
478         
479
480 class TunProtoUDP(TunProtoBase):
481     def __init__(self, local, peer, home_path, key, listening):
482         super(TunProtoUDP, self).__init__(local, peer, home_path, key)
483         self.listening = listening
484     
485     def prepare(self):
486         pass
487     
488     def setup(self):
489         self.async_launch('udp', False, ("-u",str(self.port)))
490     
491     def shutdown(self):
492         self.kill()
493
494     def destroy(self):
495         self.waitkill()
496
497     def launch(self, check_proto='udp', listen=False, extra_args=None):
498         if extra_args is None:
499             extra_args = ("-u",str(self.port))
500         super(TunProtoUDP, self).launch(check_proto, listen, extra_args)
501
502 class TunProtoFD(TunProtoBase):
503     def __init__(self, local, peer, home_path, key, listening):
504         super(TunProtoFD, self).__init__(local, peer, home_path, key)
505         self.listening = listening
506     
507     def prepare(self):
508         pass
509     
510     def setup(self):
511         self.async_launch('fd', False)
512     
513     def shutdown(self):
514         self.kill()
515
516     def destroy(self):
517         self.waitkill()
518
519     def launch(self, check_proto='fd', listen=False, extra_args=[]):
520         super(TunProtoFD, self).launch(check_proto, listen, extra_args)
521
522 class TunProtoTCP(TunProtoBase):
523     def __init__(self, local, peer, home_path, key, listening):
524         super(TunProtoTCP, self).__init__(local, peer, home_path, key)
525         self.listening = listening
526     
527     def prepare(self):
528         if self.listening:
529             self.async_launch('tcp', True)
530     
531     def setup(self):
532         if not self.listening:
533             # make sure our peer is ready
534             peer = self.peer()
535             if peer and peer.peer_proto_impl:
536                 peer.peer_proto_impl.async_launch_wait()
537             
538             if not self._started:
539                 self.async_launch('tcp', False)
540         
541         self.checkpid()
542     
543     def shutdown(self):
544         self.kill()
545
546     def destroy(self):
547         self.waitkill()
548
549     def launch(self, check_proto='tcp', listen=None, extra_args=[]):
550         if listen is None:
551             listen = self.listening
552         super(TunProtoTCP, self).launch(check_proto, listen, extra_args)
553
554 class TapProtoUDP(TunProtoUDP):
555     def __init__(self, local, peer, home_path, key, listening):
556         super(TapProtoUDP, self).__init__(local, peer, home_path, key, listening)
557         self.mode = 'pl-tap'
558
559 class TapProtoTCP(TunProtoTCP):
560     def __init__(self, local, peer, home_path, key, listening):
561         super(TapProtoTCP, self).__init__(local, peer, home_path, key, listening)
562         self.mode = 'pl-tap'
563
564 class TapProtoFD(TunProtoFD):
565     def __init__(self, local, peer, home_path, key, listening):
566         super(TapProtoFD, self).__init__(local, peer, home_path, key, listening)
567         self.mode = 'pl-tap'
568
569
570
571 TUN_PROTO_MAP = {
572     'tcp' : TunProtoTCP,
573     'udp' : TunProtoUDP,
574     'fd'  : TunProtoFD,
575 }
576
577 TAP_PROTO_MAP = {
578     'tcp' : TapProtoTCP,
579     'udp' : TapProtoUDP,
580     'fd'  : TapProtoFD,
581 }
582
583