aeb4e674e8ebf7cbc3ed2246d9c9fdbb22851bec
[nepi.git] / src / nepi / testbeds / planetlab / tunproto.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import weakref
5 import os
6 import os.path
7 import rspawn
8 import subprocess
9 import threading
10 import base64
11 import time
12 import re
13 import sys
14 import logging
15
16 from nepi.util import server
17
18 class TunProtoBase(object):
19     def __init__(self, local, peer, home_path, key):
20         # Weak references, since ifaces do have a reference to the
21         # tunneling protocol implementation - we don't want strong
22         # circular references.
23         self.peer = weakref.ref(peer)
24         self.local = weakref.ref(local)
25         
26         self.port = 15000
27         self.mode = 'pl-tun'
28         self.key = key
29         self.cross_slice = False
30         
31         self.home_path = home_path
32        
33         self._started = False
34
35         self._pid = None
36         self._ppid = None
37         self._if_name = None
38
39         # Logging
40         self._logger = logging.getLogger('nepi.testbeds.planetlab')
41     
42     def __str__(self):
43         local = self.local()
44         if local:
45             return '<%s for %s>' % (self.__class__.__name__, local)
46         else:
47             return super(TunProtoBase,self).__str__()
48
49     def _make_home(self):
50         local = self.local()
51         
52         if not local:
53             raise RuntimeError, "Lost reference to peering interfaces before launching"
54         if not local.node:
55             raise RuntimeError, "Unconnected TUN - missing node"
56         
57         # Make sure all the paths are created where 
58         # they have to be created for deployment
59         # Also remove pidfile, if there is one.
60         # Old pidfiles from previous runs can be troublesome.
61         cmd = "mkdir -p %(home)s ; rm -f %(home)s/pid %(home)s/*.so" % {
62             'home' : server.shell_escape(self.home_path)
63         }
64         (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
65             cmd,
66             host = local.node.hostname,
67             port = None,
68             user = local.node.slicename,
69             agent = None,
70             ident_key = local.node.ident_path,
71             server_key = local.node.server_key,
72             timeout = 60,
73             retry = 3
74             )
75         
76         if proc.wait():
77             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
78     
79     def _install_scripts(self):
80         local = self.local()
81         
82         if not local:
83             raise RuntimeError, "Lost reference to peering interfaces before launching"
84         if not local.node:
85             raise RuntimeError, "Unconnected TUN - missing node"
86         
87         # Install the tun_connect script and tunalloc utility
88         from nepi.util import tunchannel
89         from nepi.util import ipaddr2
90         sources = [
91             os.path.join(os.path.dirname(__file__), 'scripts', 'tun_connect.py'),
92             os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c'),
93             re.sub(r"([.]py)[co]$", r'\1', tunchannel.__file__, 1), # pyc/o files are version-specific
94             re.sub(r"([.]py)[co]$", r'\1', ipaddr2.__file__, 1), # pyc/o files are version-specific
95         ]
96         if local.filter_module:
97             filter_sources = filter(bool,map(str.strip,local.filter_module.module.split()))
98             filter_module = filter_sources[0]
99             
100             # Translate paths to builtin sources
101             for i,source in enumerate(filter_sources):
102                 if not os.path.exists(source):
103                     # Um... try the builtin folder
104                     source = os.path.join(os.path.dirname(__file__), "scripts", source)
105                     if os.path.exists(source):
106                         # Yep... replace
107                         filter_sources[i] = source
108
109             sources.extend(set(filter_sources))
110                 
111         else:
112             filter_module = None
113             filter_sources = None
114         dest = "%s@%s:%s" % (
115             local.node.slicename, local.node.hostname, 
116             os.path.join(self.home_path,'.'),)
117         (out,err),proc = server.eintr_retry(server.popen_scp)(
118             sources,
119             dest,
120             ident_key = local.node.ident_path,
121             server_key = local.node.server_key
122             )
123     
124         if proc.wait():
125             raise RuntimeError, "Failed upload TUN connect script %r: %s %s" % (sources, out,err,)
126         
127         # Make sure all dependencies are satisfied
128         local.node.wait_dependencies()
129
130         cmd = ( (
131             "cd %(home)s && "
132             "gcc -fPIC -shared tunalloc.c -o tunalloc.so && "
133             
134             "wget -q -c -O python-iovec-src.tar.gz %(iovec_url)s && "
135             "mkdir -p python-iovec && "
136             "cd python-iovec && "
137             "tar xzf ../python-iovec-src.tar.gz --strip-components=1 && "
138             "python setup.py build && "
139             "python setup.py install --install-lib .. && "
140             "cd .. "
141             
142             + ( " && "
143                 "gcc -fPIC -shared %(sources)s -o %(module)s.so " % {
144                    'module' : os.path.basename(filter_module).rsplit('.',1)[0],
145                    'sources' : ' '.join(map(os.path.basename,filter_sources))
146                 }
147                 
148                 if filter_module is not None and filter_module.endswith('.c')
149                 else ""
150             )
151             
152             + ( " && "
153                 "wget -q -c -O python-passfd-src.tar.gz %(passfd_url)s && "
154                 "mkdir -p python-passfd && "
155                 "cd python-passfd && "
156                 "tar xzf ../python-passfd-src.tar.gz --strip-components=1 && "
157                 "python setup.py build && "
158                 "python setup.py install --install-lib .. "
159                 
160                 if local.tun_proto == "fd" 
161                 else ""
162             ) 
163           )
164         % {
165             'home' : server.shell_escape(self.home_path),
166             'passfd_url' : "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-passfd/archive/2a6472c64c87.tar.gz",
167             'iovec_url' : "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-iovec/archive/tip.tar.gz",
168         } )
169         (out,err),proc = server.popen_ssh_command(
170             cmd,
171             host = local.node.hostname,
172             port = None,
173             user = local.node.slicename,
174             agent = None,
175             ident_key = local.node.ident_path,
176             server_key = local.node.server_key,
177             timeout = 300
178             )
179         
180         if proc.wait():
181             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
182         
183     def launch(self, check_proto):
184         peer = self.peer()
185         local = self.local()
186         
187         if not peer or not local:
188             raise RuntimeError, "Lost reference to peering interfaces before launching"
189         
190         peer_port = peer.tun_port
191         peer_addr = peer.tun_addr
192         peer_proto = peer.tun_proto
193         peer_cipher = peer.tun_cipher
194         
195         local_port = self.port
196         local_cap  = local.capture
197         local_addr = local.address
198         local_mask = local.netprefix
199         local_snat = local.snat
200         local_txq  = local.txqueuelen
201         local_p2p  = local.pointopoint
202         local_cipher=local.tun_cipher
203         local_mcast= local.multicast
204         local_bwlim= local.bwlimit
205         local_mcastfwd = local.multicast_forwarder
206         
207         if not local_p2p and hasattr(peer, 'address'):
208             local_p2p = peer.address
209
210         if check_proto != peer_proto:
211             raise RuntimeError, "Peering protocol mismatch: %s != %s" % (check_proto, peer_proto)
212         
213         if local_cipher != peer_cipher:
214             raise RuntimeError, "Peering cipher mismatch: %s != %s" % (local_cipher, peer_cipher)
215         
216         if check_proto == 'gre' and local_cipher.lower() != 'plain':
217             raise RuntimeError, "Misconfigured TUN: %s - GRE tunnels do not support encryption. Got %s, you MUST use PLAIN" % (local, local_cipher,)
218
219         if local.filter_module:
220             if check_proto not in ('udp', 'tcp'):
221                 raise RuntimeError, "Miscofnigured TUN: %s - filtered tunnels only work with udp or tcp links" % (local,)
222             filter_module = filter(bool,map(str.strip,local.filter_module.module.split()))
223             filter_module = os.path.join('.',os.path.basename(filter_module[0]))
224             if filter_module.endswith('.c'):
225                 filter_module = filter_module.rsplit('.',1)[0] + '.so'
226             filter_args = local.filter_module.args
227         else:
228             filter_module = None
229             filter_args = None
230         
231         args = ["python", "tun_connect.py", 
232             "-m", str(self.mode),
233             "-t", str(check_proto),
234             "-A", str(local_addr),
235             "-M", str(local_mask),
236             "-C", str(local_cipher),
237             ]
238         
239         if check_proto == 'fd':
240             passfd_arg = str(peer_addr)
241             if passfd_arg.startswith('\x00'):
242                 # cannot shell_encode null characters :(
243                 passfd_arg = "base64:"+base64.b64encode(passfd_arg)
244             else:
245                 passfd_arg = '$HOME/'+server.shell_escape(passfd_arg)
246             args.extend([
247                 "--pass-fd", passfd_arg
248             ])
249         elif check_proto == 'gre':
250             if self.cross_slice:
251                 args.extend([
252                     "-K", str(self.key.strip('='))
253                 ])
254
255             args.extend([
256                 "-a", str(peer_addr),
257             ])
258         # both udp and tcp
259         else:
260             args.extend([
261                 "-P", str(local_port),
262                 "-p", str(peer_port),
263                 "-a", str(peer_addr),
264                 "-k", str(self.key)
265             ])
266         
267         if local_snat:
268             args.append("-S")
269         if local_p2p:
270             args.extend(("-Z",str(local_p2p)))
271         if local_txq:
272             args.extend(("-Q",str(local_txq)))
273         if not local_cap:
274             args.append("-N")
275         elif local_cap == 'pcap':
276             args.extend(('-c','pcap'))
277         if local_bwlim:
278             args.extend(("-b",str(local_bwlim*1024)))
279         if filter_module:
280             args.extend(("--filter", filter_module))
281         if filter_args:
282             args.extend(("--filter-args", filter_args))
283         if local_mcast and local_mcastfwd:
284             args.extend(("--multicast-forwarder", local_mcastfwd))
285
286         self._logger.info("Starting %s", self)
287         
288         self._make_home()
289         self._install_scripts()
290
291         # Start process in a "daemonized" way, using nohup and heavy
292         # stdin/out redirection to avoid connection issues
293         (out,err),proc = rspawn.remote_spawn(
294             " ".join(args),
295             
296             pidfile = './pid',
297             home = self.home_path,
298             stdin = '/dev/null',
299             stdout = 'capture',
300             stderr = rspawn.STDOUT,
301             sudo = True,
302             
303             host = local.node.hostname,
304             port = None,
305             user = local.node.slicename,
306             agent = None,
307             ident_key = local.node.ident_path,
308             server_key = local.node.server_key
309             )
310         
311         if proc.wait():
312             raise RuntimeError, "Failed to set up TUN: %s %s" % (out,err,)
313        
314         self._started = True
315     
316     def recover(self):
317         # Tunnel should be still running in its node
318         # Just check its pidfile and we're done
319         self._started = True
320         self.checkpid()
321     
322     def wait(self):
323         local = self.local()
324         
325         # Wait for the connection to be established
326         retrytime = 2.0
327         for spin in xrange(30):
328             if self.status() != rspawn.RUNNING:
329                 self._logger.warn("FAILED TO CONNECT! %s", self)
330                 break
331             
332             # Connected?
333             (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
334                 "cd %(home)s ; grep -a -c Connected capture" % dict(
335                     home = server.shell_escape(self.home_path)),
336                 host = local.node.hostname,
337                 port = None,
338                 user = local.node.slicename,
339                 agent = None,
340                 ident_key = local.node.ident_path,
341                 server_key = local.node.server_key,
342                 timeout = 60,
343                 err_on_timeout = False
344                 )
345             proc.wait()
346
347             if out.strip() == '1':
348                 break
349
350             # At least listening?
351             (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
352                 "cd %(home)s ; grep -a -c Listening capture" % dict(
353                     home = server.shell_escape(self.home_path)),
354                 host = local.node.hostname,
355                 port = None,
356                 user = local.node.slicename,
357                 agent = None,
358                 ident_key = local.node.ident_path,
359                 server_key = local.node.server_key,
360                 timeout = 60,
361                 err_on_timeout = False
362                 )
363             proc.wait()
364
365             time.sleep(min(30.0, retrytime))
366             retrytime *= 1.1
367         else:
368             (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
369                 "cat %(home)s/capture" % dict(
370                     home = server.shell_escape(self.home_path)),
371                 host = local.node.hostname,
372                 port = None,
373                 user = local.node.slicename,
374                 agent = None,
375                 ident_key = local.node.ident_path,
376                 server_key = local.node.server_key,
377                 timeout = 60,
378                 retry = 3,
379                 err_on_timeout = False
380                 )
381             proc.wait()
382
383             raise RuntimeError, "FAILED TO CONNECT %s: %s%s" % (self,out,err)
384     
385     @property
386     def if_name(self):
387         if not self._if_name:
388             # Inspect the trace to check the assigned iface
389             local = self.local()
390             if local:
391                 cmd = "cd %(home)s ; grep -a 'Using tun:' capture | head -1" % dict(
392                             home = server.shell_escape(self.home_path))
393                 for spin in xrange(30):
394                     (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
395                         cmd,
396                         host = local.node.hostname,
397                         port = None,
398                         user = local.node.slicename,
399                         agent = None,
400                         ident_key = local.node.ident_path,
401                         server_key = local.node.server_key,
402                         timeout = 60,
403                         err_on_timeout = False
404                         )
405                     
406                     if proc.wait():
407                         self._logger.debug("if_name: failed cmd %s", cmd)
408                         time.sleep(1)
409                         continue
410                     
411                     out = out.strip()
412                     
413                     match = re.match(r"Using +tun: +([-a-zA-Z0-9]*).*",out)
414                     if match:
415                         self._if_name = match.group(1)
416                         break
417                     elif out:
418                         self._logger.debug("if_name: %r does not match expected pattern from cmd %s", out, cmd)
419                     else:
420                         self._logger.debug("if_name: empty output from cmd %s", cmd)
421                     time.sleep(3)
422                 else:
423                     self._logger.warn("if_name: Could not get interface name")
424         return self._if_name
425     
426     def if_alive(self):
427         name = self.if_name
428         if name:
429             local = self.local()
430             for i in xrange(30):
431                 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
432                     "ip show %s >/dev/null 2>&1 && echo ALIVE || echo DEAD" % (name,),
433                     host = local.node.hostname,
434                     port = None,
435                     user = local.node.slicename,
436                     agent = None,
437                     ident_key = local.node.ident_path,
438                     server_key = local.node.server_key,
439                     timeout = 60,
440                     err_on_timeout = False
441                     )
442                 
443                 if proc.wait():
444                     time.sleep(1)
445                     continue
446                 
447                 if out.strip() == 'DEAD':
448                     return False
449                 elif out.strip() == 'ALIVE':
450                     return True
451         return False
452     
453     def checkpid(self):            
454         local = self.local()
455         
456         if not local:
457             raise RuntimeError, "Lost reference to local interface"
458         
459         # Get PID/PPID
460         # NOTE: wait a bit for the pidfile to be created
461         if self._started and not self._pid or not self._ppid:
462             pidtuple = rspawn.remote_check_pid(
463                 os.path.join(self.home_path,'pid'),
464                 host = local.node.hostname,
465                 port = None,
466                 user = local.node.slicename,
467                 agent = None,
468                 ident_key = local.node.ident_path,
469                 server_key = local.node.server_key
470                 )
471             
472             if pidtuple:
473                 self._pid, self._ppid = pidtuple
474     
475     def status(self):
476         local = self.local()
477         
478         if not local:
479             raise RuntimeError, "Lost reference to local interface"
480         
481         self.checkpid()
482         if not self._started:
483             return rspawn.NOT_STARTED
484         elif not self._pid or not self._ppid:
485             return rspawn.NOT_STARTED
486         else:
487             status = rspawn.remote_status(
488                 self._pid, self._ppid,
489                 host = local.node.hostname,
490                 port = None,
491                 user = local.node.slicename,
492                 agent = None,
493                 ident_key = local.node.ident_path,
494                 server_key = local.node.server_key
495                 )
496             return status
497     
498     def kill(self, nowait = True):
499         local = self.local()
500         
501         if not local:
502             raise RuntimeError, "Lost reference to local interface"
503         
504         status = self.status()
505         if status == rspawn.RUNNING:
506             self._logger.info("Stopping %s", self)
507             
508             # kill by ppid+pid - SIGTERM first, then try SIGKILL
509             rspawn.remote_kill(
510                 self._pid, self._ppid,
511                 host = local.node.hostname,
512                 port = None,
513                 user = local.node.slicename,
514                 agent = None,
515                 ident_key = local.node.ident_path,
516                 server_key = local.node.server_key,
517                 sudo = True,
518                 nowait = nowait
519                 )
520     
521     def waitkill(self):
522         interval = 1.0
523         for i in xrange(30):
524             status = self.status()
525             if status != rspawn.RUNNING:
526                 self._logger.info("Stopped %s", self)
527                 break
528             time.sleep(interval)
529             interval = min(30.0, interval * 1.1)
530         else:
531             self.kill(nowait=False)
532
533         if self.if_name:
534             for i in xrange(30):
535                 if not self.if_alive():
536                     self._logger.info("Device down %s", self)
537                     break
538                 time.sleep(interval)
539                 interval = min(30.0, interval * 1.1)
540     
541     _TRACEMAP = {
542         # tracename : (remotename, localname)
543         'packets' : ('capture','capture'),
544         'pcap' : ('pcap','capture.pcap'),
545     }
546     
547     def remote_trace_path(self, whichtrace, tracemap = None):
548         tracemap = self._TRACEMAP if not tracemap else tracemap
549         
550         if whichtrace not in tracemap:
551             return None
552         
553         return os.path.join(self.home_path, tracemap[whichtrace][1])
554         
555     def sync_trace(self, local_dir, whichtrace, tracemap = None):
556         tracemap = self._TRACEMAP if not tracemap else tracemap
557         
558         if whichtrace not in tracemap:
559             return None
560         
561         local = self.local()
562         
563         if not local:
564             return None
565         
566         local_path = os.path.join(local_dir, tracemap[whichtrace][1])
567         
568         # create parent local folders
569         if os.path.dirname(local_path):
570             proc = subprocess.Popen(
571                 ["mkdir", "-p", os.path.dirname(local_path)],
572                 stdout = open("/dev/null","w"),
573                 stdin = open("/dev/null","r"))
574
575             if proc.wait():
576                 raise RuntimeError, "Failed to synchronize trace"
577         
578         # sync files
579         (out,err),proc = server.popen_scp(
580             '%s@%s:%s' % (local.node.slicename, local.node.hostname, 
581                 os.path.join(self.home_path, tracemap[whichtrace][0])),
582             local_path,
583             port = None,
584             agent = None,
585             ident_key = local.node.ident_path,
586             server_key = local.node.server_key
587             )
588         
589         if proc.wait():
590             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
591         
592         return local_path
593         
594     def shutdown(self):
595         self.kill()
596     
597     def destroy(self):
598         self.waitkill()
599
600 class TunProtoUDP(TunProtoBase):
601     def __init__(self, local, peer, home_path, key):
602         super(TunProtoUDP, self).__init__(local, peer, home_path, key)
603     
604     def launch(self):
605         super(TunProtoUDP, self).launch('udp')
606
607 class TunProtoFD(TunProtoBase):
608     def __init__(self, local, peer, home_path, key):
609         super(TunProtoFD, self).__init__(local, peer, home_path, key)
610     
611     def launch(self):
612         super(TunProtoFD, self).launch('fd')
613
614 class TunProtoGRE(TunProtoBase):
615     def __init__(self, local, peer, home_path, key):
616         super(TunProtoGRE, self).__init__(local, peer, home_path, key)
617         self.mode = 'pl-gre-ip'
618
619     def launch(self):
620         super(TunProtoGRE, self).launch('gre')
621
622 class TunProtoTCP(TunProtoBase):
623     def __init__(self, local, peer, home_path, key):
624         super(TunProtoTCP, self).__init__(local, peer, home_path, key)
625     
626     def launch(self):
627         super(TunProtoTCP, self).launch('tcp')
628
629 class TapProtoUDP(TunProtoUDP):
630     def __init__(self, local, peer, home_path, key):
631         super(TapProtoUDP, self).__init__(local, peer, home_path, key)
632         self.mode = 'pl-tap'
633
634 class TapProtoTCP(TunProtoTCP):
635     def __init__(self, local, peer, home_path, key):
636         super(TapProtoTCP, self).__init__(local, peer, home_path, key)
637         self.mode = 'pl-tap'
638
639 class TapProtoFD(TunProtoFD):
640     def __init__(self, local, peer, home_path, key):
641         super(TapProtoFD, self).__init__(local, peer, home_path, key)
642         self.mode = 'pl-tap'
643
644 class TapProtoGRE(TunProtoGRE):
645     def __init__(self, local, peer, home_path, key):
646         super(TapProtoGRE, self).__init__(local, peer, home_path, key)
647         self.mode = 'pl-gre-eth'
648
649 TUN_PROTO_MAP = {
650     'tcp' : TunProtoTCP,
651     'udp' : TunProtoUDP,
652     'fd'  : TunProtoFD,
653     'gre' : TunProtoGRE,
654 }
655
656 TAP_PROTO_MAP = {
657     'tcp' : TapProtoTCP,
658     'udp' : TapProtoUDP,
659     'fd'  : TapProtoFD,
660     'gre' : TapProtoGRE,
661 }
662