Tunchannel fixes galore:
[nepi.git] / src / nepi / testbeds / planetlab / tunproto.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import weakref
5 import os
6 import os.path
7 import rspawn
8 import subprocess
9 import threading
10 import base64
11 import time
12 import re
13 import sys
14 import logging
15
16 from nepi.util import server
17
18 class TunProtoBase(object):
19     def __init__(self, local, peer, home_path, key):
20         # Weak references, since ifaces do have a reference to the
21         # tunneling protocol implementation - we don't want strong
22         # circular references.
23         self.peer = weakref.ref(peer)
24         self.local = weakref.ref(local)
25         
26         self.port = 15000
27         self.mode = 'pl-tun'
28         self.key = key
29         
30         self.home_path = home_path
31         
32         self._launcher = None
33         self._started = False
34         self._started_listening = False
35         self._starting = False
36         self._pid = None
37         self._ppid = None
38         self._if_name = None
39
40         # Logging
41         self._logger = logging.getLogger('nepi.testbeds.planetlab')
42     
43     def __str__(self):
44         local = self.local()
45         if local:
46             return '<%s for %s>' % (self.__class__.__name__, local)
47         else:
48             return super(TunProtoBase,self).__str__()
49
50     def _make_home(self):
51         local = self.local()
52         
53         if not local:
54             raise RuntimeError, "Lost reference to peering interfaces before launching"
55         if not local.node:
56             raise RuntimeError, "Unconnected TUN - missing node"
57         
58         # Make sure all the paths are created where 
59         # they have to be created for deployment
60         # Also remove pidfile, if there is one.
61         # Old pidfiles from previous runs can be troublesome.
62         cmd = "mkdir -p %(home)s ; rm -f %(home)s/pid %(home)s/*.so" % {
63             'home' : server.shell_escape(self.home_path)
64         }
65         (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
66             cmd,
67             host = local.node.hostname,
68             port = None,
69             user = local.node.slicename,
70             agent = None,
71             ident_key = local.node.ident_path,
72             server_key = local.node.server_key
73             )
74         
75         if proc.wait():
76             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
77         
78     
79     def _install_scripts(self):
80         local = self.local()
81         
82         if not local:
83             raise RuntimeError, "Lost reference to peering interfaces before launching"
84         if not local.node:
85             raise RuntimeError, "Unconnected TUN - missing node"
86         
87         # Install the tun_connect script and tunalloc utility
88         from nepi.util import tunchannel
89         sources = [
90             os.path.join(os.path.dirname(__file__), 'scripts', 'tun_connect.py'),
91             os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c'),
92             re.sub(r"([.]py)[co]$", r'\1', tunchannel.__file__, 1), # pyc/o files are version-specific
93         ]
94         if local.filter_module:
95             filter_sources = filter(bool,map(str.strip,local.filter_module.module.split()))
96             filter_module = filter_sources[0]
97             sources.extend(set(filter_sources))
98         else:
99             filter_module = None
100             filter_sources = None
101         dest = "%s@%s:%s" % (
102             local.node.slicename, local.node.hostname, 
103             os.path.join(self.home_path,'.'),)
104         (out,err),proc = server.eintr_retry(server.popen_scp)(
105             sources,
106             dest,
107             ident_key = local.node.ident_path,
108             server_key = local.node.server_key
109             )
110     
111         if proc.wait():
112             raise RuntimeError, "Failed upload TUN connect script %r: %s %s" % (sources, out,err,)
113         
114         # Make sure all dependencies are satisfied
115         local.node.wait_dependencies()
116
117         cmd = ( (
118             "cd %(home)s && "
119             "gcc -fPIC -shared tunalloc.c -o tunalloc.so && "
120             
121             "wget -q -c -O python-iovec-src.tar.gz %(iovec_url)s && "
122             "mkdir -p python-iovec && "
123             "cd python-iovec && "
124             "tar xzf ../python-iovec-src.tar.gz --strip-components=1 && "
125             "python setup.py build && "
126             "python setup.py install --install-lib .. && "
127             "cd .. "
128             
129             + ( " && "
130                 "gcc -fPIC -shared %(sources)s -o %(module)s.so " % {
131                    'module' : os.path.basename(filter_module).rsplit('.',1)[0],
132                    'sources' : ' '.join(map(os.path.basename,filter_sources))
133                 }
134                 
135                 if filter_module is not None and filter_module.endswith('.c')
136                 else ""
137             )
138             
139             + ( " && "
140                 "wget -q -c -O python-passfd-src.tar.gz %(passfd_url)s && "
141                 "mkdir -p python-passfd && "
142                 "cd python-passfd && "
143                 "tar xzf ../python-passfd-src.tar.gz --strip-components=1 && "
144                 "python setup.py build && "
145                 "python setup.py install --install-lib .. "
146                 
147                 if local.tun_proto == "fd" 
148                 else ""
149             ) 
150           )
151         % {
152             'home' : server.shell_escape(self.home_path),
153             'passfd_url' : "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-passfd/archive/2a6472c64c87.tar.gz",
154             'iovec_url' : "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-iovec/archive/tip.tar.gz",
155         } )
156         (out,err),proc = server.popen_ssh_command(
157             cmd,
158             host = local.node.hostname,
159             port = None,
160             user = local.node.slicename,
161             agent = None,
162             ident_key = local.node.ident_path,
163             server_key = local.node.server_key
164             )
165         
166         if proc.wait():
167             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
168         
169     def launch(self, check_proto, listen, extra_args=[]):
170         if self._starting:
171             raise AssertionError, "Double start"
172         
173         self._starting = True
174         
175         peer = self.peer()
176         local = self.local()
177         
178         if not peer or not local:
179             raise RuntimeError, "Lost reference to peering interfaces before launching"
180         
181         peer_port = peer.tun_port
182         peer_addr = peer.tun_addr
183         peer_proto= peer.tun_proto
184         peer_cipher=peer.tun_cipher
185         
186         local_port = self.port
187         local_cap  = local.capture
188         local_addr = local.address
189         local_mask = local.netprefix
190         local_snat = local.snat
191         local_txq  = local.txqueuelen
192         local_p2p  = local.pointopoint
193         local_cipher=local.tun_cipher
194         
195         if not local_p2p and hasattr(peer, 'address'):
196             local_p2p = peer.address
197
198         if check_proto != peer_proto:
199             raise RuntimeError, "Peering protocol mismatch: %s != %s" % (check_proto, peer_proto)
200         
201         if local_cipher != peer_cipher:
202             raise RuntimeError, "Peering cipher mismatch: %s != %s" % (local_cipher, peer_cipher)
203         
204         if not listen and ((peer_proto != 'fd' and not peer_port) or not peer_addr):
205             raise RuntimeError, "Misconfigured peer: %s" % (peer,)
206         
207         if listen and ((peer_proto != 'fd' and not local_port) or not local_addr or not local_mask):
208             raise RuntimeError, "Misconfigured TUN: %s" % (local,)
209
210         if check_proto == 'gre' and local_cipher.lower() != 'plain':
211             raise RuntimeError, "Misconfigured TUN: %s - GRE tunnels do not support encryption. Got %s, you MUST use PLAIN" % (local, local_cipher,)
212
213         if local.filter_module:
214             if check_proto not in ('udp', 'tcp'):
215                 raise RuntimeError, "Miscofnigured TUN: %s - filtered tunnels only work with udp or tcp links" % (local,)
216             filter_module = filter(bool,map(str.strip,local.filter_module.module.split()))
217             filter_module = os.path.join('.',os.path.basename(filter_module[0]))
218             if filter_module.endswith('.c'):
219                 filter_module = filter_module.rsplit('.',1)[0] + '.so'
220         else:
221             filter_module = None
222         
223         args = ["python", "tun_connect.py", 
224             "-m", str(self.mode),
225             "-A", str(local_addr),
226             "-M", str(local_mask),
227             "-C", str(local_cipher)]
228         
229         if check_proto == 'fd':
230             passfd_arg = str(peer_addr)
231             if passfd_arg.startswith('\x00'):
232                 # cannot shell_encode null characters :(
233                 passfd_arg = "base64:"+base64.b64encode(passfd_arg)
234             else:
235                 passfd_arg = '$HOME/'+server.shell_escape(passfd_arg)
236             args.extend([
237                 "--pass-fd", passfd_arg
238             ])
239         elif check_proto == 'gre':
240             args.extend([
241                 "-K", str(min(local_port, peer_port))
242             ])
243         else:
244             args.extend([
245                 "-p", str(local_port if listen else peer_port),
246                 "-k", str(self.key)
247             ])
248         
249         if local_snat:
250             args.append("-S")
251         if local_p2p:
252             args.extend(("-P",str(local_p2p)))
253         if local_txq:
254             args.extend(("-Q",str(local_txq)))
255         if not local_cap:
256             args.append("-N")
257         elif local_cap == 'pcap':
258             args.extend(('-c','pcap'))
259         if extra_args:
260             args.extend(map(str,extra_args))
261         if not listen and check_proto != 'fd':
262             args.append(str(peer_addr))
263         if filter_module:
264             args.extend(("--filter", filter_module))
265
266         self._logger.info("Starting %s", self)
267         
268         self._make_home()
269         self._install_scripts()
270
271         # Start process in a "daemonized" way, using nohup and heavy
272         # stdin/out redirection to avoid connection issues
273         (out,err),proc = rspawn.remote_spawn(
274             " ".join(args),
275             
276             pidfile = './pid',
277             home = self.home_path,
278             stdin = '/dev/null',
279             stdout = 'capture',
280             stderr = rspawn.STDOUT,
281             sudo = True,
282             
283             host = local.node.hostname,
284             port = None,
285             user = local.node.slicename,
286             agent = None,
287             ident_key = local.node.ident_path,
288             server_key = local.node.server_key
289             )
290         
291         if proc.wait():
292             raise RuntimeError, "Failed to set up TUN: %s %s" % (out,err,)
293         
294         self._started = True
295     
296     def recover(self):
297         # Tunnel should be still running in its node
298         # Just check its pidfile and we're done
299         self._started = True
300         self._started_listening = True
301         self.checkpid()
302     
303     def _launch_and_wait(self, *p, **kw):
304         try:
305             self.__launch_and_wait(*p, **kw)
306         except:
307             if self._launcher:
308                 import sys
309                 self._launcher._exc.append(sys.exc_info())
310             else:
311                 raise
312             
313     def __launch_and_wait(self, *p, **kw):
314         local = self.local()
315         
316         self.launch(*p, **kw)
317         
318         # Wait for the process to be started
319         while self.status() == rspawn.NOT_STARTED:
320             time.sleep(1.0)
321         
322         # Wait for the connection to be established
323         retrytime = 2.0
324         for spin in xrange(30):
325             if self.status() != rspawn.RUNNING:
326                 self._logger.warn("FAILED TO CONNECT! %s", self)
327                 break
328             
329             # Connected?
330             (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
331                 "cd %(home)s ; grep -c Connected capture" % dict(
332                     home = server.shell_escape(self.home_path)),
333                 host = local.node.hostname,
334                 port = None,
335                 user = local.node.slicename,
336                 agent = None,
337                 ident_key = local.node.ident_path,
338                 server_key = local.node.server_key
339                 )
340             proc.wait()
341
342             if out.strip() == '1':
343                 break
344
345             # At least listening?
346             (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
347                 "cd %(home)s ; grep -c Listening capture" % dict(
348                     home = server.shell_escape(self.home_path)),
349                 host = local.node.hostname,
350                 port = None,
351                 user = local.node.slicename,
352                 agent = None,
353                 ident_key = local.node.ident_path,
354                 server_key = local.node.server_key
355                 )
356             proc.wait()
357
358             if out.strip() == '1':
359                 self._started_listening = True
360             
361             time.sleep(min(30.0, retrytime))
362             retrytime *= 1.1
363         else:
364             (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
365                 "cat %(home)s/capture" % dict(
366                     home = server.shell_escape(self.home_path)),
367                 host = local.node.hostname,
368                 port = None,
369                 user = local.node.slicename,
370                 agent = None,
371                 ident_key = local.node.ident_path,
372                 server_key = local.node.server_key
373                 )
374             proc.wait()
375
376             raise RuntimeError, "FAILED TO CONNECT %s: %s%s" % (self,out,err)
377     
378     @property
379     def if_name(self):
380         if not self._if_name:
381             # Inspect the trace to check the assigned iface
382             local = self.local()
383             if local:
384                 cmd = "cd %(home)s ; grep 'Using tun:' capture | head -1" % dict(
385                             home = server.shell_escape(self.home_path))
386                 for spin in xrange(30):
387                     (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
388                         cmd,
389                         host = local.node.hostname,
390                         port = None,
391                         user = local.node.slicename,
392                         agent = None,
393                         ident_key = local.node.ident_path,
394                         server_key = local.node.server_key
395                         )
396                     
397                     if proc.wait():
398                         self._logger.debug("if_name: failed cmd %s", cmd)
399                         time.sleep(1)
400                         continue
401                     
402                     out = out.strip()
403                     
404                     match = re.match(r"Using +tun: +([-a-zA-Z0-9]*).*",out)
405                     if match:
406                         self._if_name = match.group(1)
407                         break
408                     elif out:
409                         self._logger.debug("if_name: %r does not match expected pattern from cmd %s", out, cmd)
410                     else:
411                         self._logger.debug("if_name: empty output from cmd %s", cmd)
412                     time.sleep(1)
413                 else:
414                     self._logger.warn("if_name: Could not get interface name")
415         return self._if_name
416     
417     def if_alive(self):
418         name = self.if_name
419         if name:
420             local = self.local()
421             for i in xrange(30):
422                 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
423                     "ip show %s >/dev/null 2>&1 && echo ALIVE || echo DEAD" % (name,),
424                     host = local.node.hostname,
425                     port = None,
426                     user = local.node.slicename,
427                     agent = None,
428                     ident_key = local.node.ident_path,
429                     server_key = local.node.server_key
430                     )
431                 
432                 if proc.wait():
433                     time.sleep(1)
434                     continue
435                 
436                 if out.strip() == 'DEAD':
437                     return False
438                 elif out.strip() == 'ALIVE':
439                     return True
440         return False
441     
442     def async_launch(self, check_proto, listen, extra_args=[]):
443         if not self._started and not self._launcher:
444             self._launcher = threading.Thread(
445                 target = self._launch_and_wait,
446                 args = (check_proto, listen, extra_args))
447             self._launcher._exc = []
448             self._launcher.start()
449     
450     def async_launch_wait(self):
451         if self._launcher:
452             self._launcher.join()
453
454             if self._launcher._exc:
455                 exctyp,exval,exctrace = self._launcher._exc[0]
456                 raise exctyp,exval,exctrace
457             elif not self._started:
458                 raise RuntimeError, "Failed to launch TUN forwarder"
459         elif not self._started:
460             self.launch()
461
462     def async_launch_wait_listening(self):
463         if self._launcher:
464             for x in xrange(180):
465                 if self._launcher._exc:
466                     exctyp,exval,exctrace = self._launcher._exc[0]
467                     raise exctyp,exval,exctrace
468                 elif self._started and self._started_listening:
469                     break
470                 time.sleep(1)
471         elif not self._started:
472             self.launch()
473
474     def checkpid(self):            
475         local = self.local()
476         
477         if not local:
478             raise RuntimeError, "Lost reference to local interface"
479         
480         # Get PID/PPID
481         # NOTE: wait a bit for the pidfile to be created
482         if self._started and not self._pid or not self._ppid:
483             pidtuple = rspawn.remote_check_pid(
484                 os.path.join(self.home_path,'pid'),
485                 host = local.node.hostname,
486                 port = None,
487                 user = local.node.slicename,
488                 agent = None,
489                 ident_key = local.node.ident_path,
490                 server_key = local.node.server_key
491                 )
492             
493             if pidtuple:
494                 self._pid, self._ppid = pidtuple
495     
496     def status(self):
497         local = self.local()
498         
499         if not local:
500             raise RuntimeError, "Lost reference to local interface"
501         
502         self.checkpid()
503         if not self._started:
504             return rspawn.NOT_STARTED
505         elif not self._pid or not self._ppid:
506             return rspawn.NOT_STARTED
507         else:
508             status = rspawn.remote_status(
509                 self._pid, self._ppid,
510                 host = local.node.hostname,
511                 port = None,
512                 user = local.node.slicename,
513                 agent = None,
514                 ident_key = local.node.ident_path,
515                 server_key = local.node.server_key
516                 )
517             return status
518     
519     def kill(self, nowait = True):
520         local = self.local()
521         
522         if not local:
523             raise RuntimeError, "Lost reference to local interface"
524         
525         status = self.status()
526         if status == rspawn.RUNNING:
527             self._logger.info("Stopping %s", self)
528             
529             # kill by ppid+pid - SIGTERM first, then try SIGKILL
530             rspawn.remote_kill(
531                 self._pid, self._ppid,
532                 host = local.node.hostname,
533                 port = None,
534                 user = local.node.slicename,
535                 agent = None,
536                 ident_key = local.node.ident_path,
537                 server_key = local.node.server_key,
538                 sudo = True,
539                 nowait = nowait
540                 )
541     
542     def waitkill(self):
543         interval = 1.0
544         for i in xrange(30):
545             status = self.status()
546             if status != rspawn.RUNNING:
547                 self._logger.info("Stopped %s", self)
548                 break
549             time.sleep(interval)
550             interval = min(30.0, interval * 1.1)
551         else:
552             self.kill(nowait=False)
553
554         if self.if_name:
555             for i in xrange(30):
556                 if not self.if_alive():
557                     self._logger.info("Device down %s", self)
558                     break
559                 time.sleep(interval)
560                 interval = min(30.0, interval * 1.1)
561     
562     _TRACEMAP = {
563         # tracename : (remotename, localname)
564         'packets' : ('capture','capture'),
565         'pcap' : ('pcap','capture.pcap'),
566     }
567     
568     def remote_trace_path(self, whichtrace):
569         tracemap = self._TRACEMAP
570         
571         if whichtrace not in tracemap:
572             return None
573         
574         return os.path.join(self.home_path, tracemap[whichtrace][1])
575         
576     def sync_trace(self, local_dir, whichtrace):
577         tracemap = self._TRACEMAP
578         
579         if whichtrace not in tracemap:
580             return None
581         
582         local = self.local()
583         
584         if not local:
585             return None
586         
587         local_path = os.path.join(local_dir, tracemap[whichtrace][1])
588         
589         # create parent local folders
590         if os.path.dirname(local_path):
591             proc = subprocess.Popen(
592                 ["mkdir", "-p", os.path.dirname(local_path)],
593                 stdout = open("/dev/null","w"),
594                 stdin = open("/dev/null","r"))
595
596             if proc.wait():
597                 raise RuntimeError, "Failed to synchronize trace"
598         
599         # sync files
600         (out,err),proc = server.popen_scp(
601             '%s@%s:%s' % (local.node.slicename, local.node.hostname, 
602                 os.path.join(self.home_path, tracemap[whichtrace][0])),
603             local_path,
604             port = None,
605             agent = None,
606             ident_key = local.node.ident_path,
607             server_key = local.node.server_key
608             )
609         
610         if proc.wait():
611             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
612         
613         return local_path
614         
615         
616     def prepare(self):
617         """
618         First-phase setup
619         
620         eg: set up listening ports
621         """
622         raise NotImplementedError
623     
624     def setup(self):
625         """
626         Second-phase setup
627         
628         eg: connect to peer
629         """
630         raise NotImplementedError
631     
632     def shutdown(self):
633         """
634         Cleanup
635         """
636         raise NotImplementedError
637     
638     def destroy(self):
639         """
640         Second-phase cleanup
641         """
642         pass
643         
644
645 class TunProtoUDP(TunProtoBase):
646     def __init__(self, local, peer, home_path, key, listening):
647         super(TunProtoUDP, self).__init__(local, peer, home_path, key)
648         self.listening = listening
649     
650     def prepare(self):
651         pass
652     
653     def setup(self):
654         self.async_launch('udp', False, ("-u",str(self.port)))
655     
656     def shutdown(self):
657         self.kill()
658
659     def destroy(self):
660         self.waitkill()
661
662     def launch(self, check_proto='udp', listen=False, extra_args=None):
663         if extra_args is None:
664             extra_args = ("-u",str(self.port))
665         super(TunProtoUDP, self).launch(check_proto, listen, extra_args)
666
667 class TunProtoFD(TunProtoBase):
668     def __init__(self, local, peer, home_path, key, listening):
669         super(TunProtoFD, self).__init__(local, peer, home_path, key)
670         self.listening = listening
671     
672     def prepare(self):
673         pass
674     
675     def setup(self):
676         self.async_launch('fd', False)
677     
678     def shutdown(self):
679         self.kill()
680
681     def destroy(self):
682         self.waitkill()
683
684     def launch(self, check_proto='fd', listen=False, extra_args=[]):
685         super(TunProtoFD, self).launch(check_proto, listen, extra_args)
686
687 class TunProtoGRE(TunProtoBase):
688     def __init__(self, local, peer, home_path, key, listening):
689         super(TunProtoGRE, self).__init__(local, peer, home_path, key)
690         self.listening = listening
691         self.mode = 'pl-gre-ip'
692     
693     def prepare(self):
694         pass
695     
696     def setup(self):
697         self.async_launch('gre', False)
698     
699     def shutdown(self):
700         self.kill()
701
702     def destroy(self):
703         self.waitkill()
704
705     def launch(self, check_proto='gre', listen=False, extra_args=[]):
706         super(TunProtoGRE, self).launch(check_proto, listen, extra_args)
707
708 class TunProtoTCP(TunProtoBase):
709     def __init__(self, local, peer, home_path, key, listening):
710         super(TunProtoTCP, self).__init__(local, peer, home_path, key)
711         self.listening = listening
712     
713     def prepare(self):
714         if self.listening:
715             self.async_launch('tcp', True)
716     
717     def setup(self):
718         if not self.listening:
719             # make sure our peer is ready
720             peer = self.peer()
721             if peer and peer.peer_proto_impl:
722                 peer.peer_proto_impl.async_launch_wait_listening()
723             
724             if not self._started:
725                 self.async_launch('tcp', False)
726         
727         self.checkpid()
728     
729     def shutdown(self):
730         self.kill()
731
732     def destroy(self):
733         self.waitkill()
734
735     def launch(self, check_proto='tcp', listen=None, extra_args=[]):
736         if listen is None:
737             listen = self.listening
738         super(TunProtoTCP, self).launch(check_proto, listen, extra_args)
739
740 class TapProtoUDP(TunProtoUDP):
741     def __init__(self, local, peer, home_path, key, listening):
742         super(TapProtoUDP, self).__init__(local, peer, home_path, key, listening)
743         self.mode = 'pl-tap'
744
745 class TapProtoTCP(TunProtoTCP):
746     def __init__(self, local, peer, home_path, key, listening):
747         super(TapProtoTCP, self).__init__(local, peer, home_path, key, listening)
748         self.mode = 'pl-tap'
749
750 class TapProtoFD(TunProtoFD):
751     def __init__(self, local, peer, home_path, key, listening):
752         super(TapProtoFD, self).__init__(local, peer, home_path, key, listening)
753         self.mode = 'pl-tap'
754
755 class TapProtoGRE(TunProtoGRE):
756     def __init__(self, local, peer, home_path, key, listening):
757         super(TapProtoGRE, self).__init__(local, peer, home_path, key, listening)
758         self.mode = 'pl-gre-eth'
759
760
761
762 TUN_PROTO_MAP = {
763     'tcp' : TunProtoTCP,
764     'udp' : TunProtoUDP,
765     'fd'  : TunProtoFD,
766     'gre' : TunProtoGRE,
767 }
768
769 TAP_PROTO_MAP = {
770     'tcp' : TapProtoTCP,
771     'udp' : TapProtoUDP,
772     'fd'  : TapProtoFD,
773     'gre' : TapProtoGRE,
774 }
775
776