fixed shebangs in non executable .py files
[nepi.git] / src / nepi / testbeds / planetlab / tunproto.py
1 # -*- coding: utf-8 -*-
2
3 import weakref
4 import os
5 import os.path
6 import rspawn
7 import subprocess
8 import threading
9 import base64
10 import time
11 import re
12 import sys
13 import logging
14
15 from nepi.util import server
16
17 class TunProtoBase(object):
18     def __init__(self, local, peer, home_path, key):
19         # Weak references, since ifaces do have a reference to the
20         # tunneling protocol implementation - we don't want strong
21         # circular references.
22         self.peer = weakref.ref(peer)
23         self.local = weakref.ref(local)
24         
25         self.port = 15000
26         self.mode = 'pl-tun'
27         self.key = key
28         self.cross_slice = False
29         
30         self.home_path = home_path
31        
32         self._started = False
33
34         self._pid = None
35         self._ppid = None
36         self._if_name = None
37
38         # Logging
39         self._logger = logging.getLogger('nepi.testbeds.planetlab')
40     
41     def __str__(self):
42         local = self.local()
43         if local:
44             return '<%s for %s>' % (self.__class__.__name__, local)
45         else:
46             return super(TunProtoBase,self).__str__()
47
48     def _make_home(self):
49         local = self.local()
50         
51         if not local:
52             raise RuntimeError, "Lost reference to peering interfaces before launching"
53         if not local.node:
54             raise RuntimeError, "Unconnected TUN - missing node"
55         
56         # Make sure all the paths are created where 
57         # they have to be created for deployment
58         # Also remove pidfile, if there is one.
59         # Old pidfiles from previous runs can be troublesome.
60         cmd = "mkdir -p %(home)s ; rm -f %(home)s/pid %(home)s/*.so" % {
61             'home' : server.shell_escape(self.home_path)
62         }
63         (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
64             cmd,
65             host = local.node.hostname,
66             port = None,
67             user = local.node.slicename,
68             agent = None,
69             ident_key = local.node.ident_path,
70             server_key = local.node.server_key,
71             timeout = 60,
72             retry = 3
73             )
74         
75         if proc.wait():
76             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
77     
78     def _install_scripts(self):
79         local = self.local()
80         
81         if not local:
82             raise RuntimeError, "Lost reference to peering interfaces before launching"
83         if not local.node:
84             raise RuntimeError, "Unconnected TUN - missing node"
85         
86         # Install the tun_connect script and tunalloc utility
87         from nepi.util import tunchannel
88         from nepi.util import ipaddr2
89         sources = [
90             os.path.join(os.path.dirname(__file__), 'scripts', 'tun_connect.py'),
91             os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c'),
92             re.sub(r"([.]py)[co]$", r'\1', tunchannel.__file__, 1), # pyc/o files are version-specific
93             re.sub(r"([.]py)[co]$", r'\1', ipaddr2.__file__, 1), # pyc/o files are version-specific
94         ]
95         if local.filter_module:
96             filter_sources = filter(bool,map(str.strip,local.filter_module.module.split()))
97             filter_module = filter_sources[0]
98             
99             # Translate paths to builtin sources
100             for i,source in enumerate(filter_sources):
101                 if not os.path.exists(source):
102                     # Um... try the builtin folder
103                     source = os.path.join(os.path.dirname(__file__), "scripts", source)
104                     if os.path.exists(source):
105                         # Yep... replace
106                         filter_sources[i] = source
107
108             sources.extend(set(filter_sources))
109                 
110         else:
111             filter_module = None
112             filter_sources = None
113         dest = "%s@%s:%s" % (
114             local.node.slicename, local.node.hostname, 
115             os.path.join(self.home_path,'.'),)
116         (out,err),proc = server.eintr_retry(server.popen_scp)(
117             sources,
118             dest,
119             ident_key = local.node.ident_path,
120             server_key = local.node.server_key
121             )
122     
123         if proc.wait():
124             raise RuntimeError, "Failed upload TUN connect script %r: %s %s" % (sources, out,err,)
125         
126         # Make sure all dependencies are satisfied
127         local.node.wait_dependencies()
128
129         cmd = ( (
130             "cd %(home)s && "
131             "gcc -fPIC -shared tunalloc.c -o tunalloc.so && "
132             
133             "wget -q -c -O python-iovec-src.tar.gz %(iovec_url)s && "
134             "mkdir -p python-iovec && "
135             "cd python-iovec && "
136             "tar xzf ../python-iovec-src.tar.gz --strip-components=1 && "
137             "python setup.py build && "
138             "python setup.py install --install-lib .. && "
139             "cd .. "
140             
141             + ( " && "
142                 "gcc -fPIC -shared %(sources)s -o %(module)s.so " % {
143                    'module' : os.path.basename(filter_module).rsplit('.',1)[0],
144                    'sources' : ' '.join(map(os.path.basename,filter_sources))
145                 }
146                 
147                 if filter_module is not None and filter_module.endswith('.c')
148                 else ""
149             )
150             
151             + ( " && "
152                 "wget -q -c -O python-passfd-src.tar.gz %(passfd_url)s && "
153                 "mkdir -p python-passfd && "
154                 "cd python-passfd && "
155                 "tar xzf ../python-passfd-src.tar.gz --strip-components=1 && "
156                 "python setup.py build && "
157                 "python setup.py install --install-lib .. "
158                 
159                 if local.tun_proto == "fd" 
160                 else ""
161             ) 
162           )
163         % {
164             'home' : server.shell_escape(self.home_path),
165             'passfd_url' : "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-passfd/archive/2a6472c64c87.tar.gz",
166             'iovec_url' : "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-iovec/archive/tip.tar.gz",
167         } )
168         (out,err),proc = server.popen_ssh_command(
169             cmd,
170             host = local.node.hostname,
171             port = None,
172             user = local.node.slicename,
173             agent = None,
174             ident_key = local.node.ident_path,
175             server_key = local.node.server_key,
176             timeout = 300
177             )
178         
179         if proc.wait():
180             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
181         
182     def launch(self, check_proto):
183         peer = self.peer()
184         local = self.local()
185         
186         if not peer or not local:
187             raise RuntimeError, "Lost reference to peering interfaces before launching"
188         
189         peer_port = peer.tun_port
190         peer_addr = peer.tun_addr
191         peer_proto = peer.tun_proto
192         peer_cipher = peer.tun_cipher
193         
194         local_port = self.port
195         local_cap  = local.capture
196         local_addr = local.address
197         local_mask = local.netprefix
198         local_snat = local.snat
199         local_txq  = local.txqueuelen
200         local_p2p  = local.pointopoint
201         local_cipher=local.tun_cipher
202         local_mcast= local.multicast
203         local_bwlim= local.bwlimit
204         local_mcastfwd = local.multicast_forwarder
205         
206         if not local_p2p and hasattr(peer, 'address'):
207             local_p2p = peer.address
208
209         if check_proto != peer_proto:
210             raise RuntimeError, "Peering protocol mismatch: %s != %s" % (check_proto, peer_proto)
211         
212         if local_cipher != peer_cipher:
213             raise RuntimeError, "Peering cipher mismatch: %s != %s" % (local_cipher, peer_cipher)
214         
215         if check_proto == 'gre' and local_cipher.lower() != 'plain':
216             raise RuntimeError, "Misconfigured TUN: %s - GRE tunnels do not support encryption. Got %s, you MUST use PLAIN" % (local, local_cipher,)
217
218         if local.filter_module:
219             if check_proto not in ('udp', 'tcp'):
220                 raise RuntimeError, "Miscofnigured TUN: %s - filtered tunnels only work with udp or tcp links" % (local,)
221             filter_module = filter(bool,map(str.strip,local.filter_module.module.split()))
222             filter_module = os.path.join('.',os.path.basename(filter_module[0]))
223             if filter_module.endswith('.c'):
224                 filter_module = filter_module.rsplit('.',1)[0] + '.so'
225             filter_args = local.filter_module.args
226         else:
227             filter_module = None
228             filter_args = None
229         
230         args = ["python", "tun_connect.py", 
231             "-m", str(self.mode),
232             "-t", str(check_proto),
233             "-A", str(local_addr),
234             "-M", str(local_mask),
235             "-C", str(local_cipher),
236             ]
237         
238         if check_proto == 'fd':
239             passfd_arg = str(peer_addr)
240             if passfd_arg.startswith('\x00'):
241                 # cannot shell_encode null characters :(
242                 passfd_arg = "base64:"+base64.b64encode(passfd_arg)
243             else:
244                 passfd_arg = '$HOME/'+server.shell_escape(passfd_arg)
245             args.extend([
246                 "--pass-fd", passfd_arg
247             ])
248         elif check_proto == 'gre':
249             if self.cross_slice:
250                 args.extend([
251                     "-K", str(self.key.strip('='))
252                 ])
253
254             args.extend([
255                 "-a", str(peer_addr),
256             ])
257         # both udp and tcp
258         else:
259             args.extend([
260                 "-P", str(local_port),
261                 "-p", str(peer_port),
262                 "-a", str(peer_addr),
263                 "-k", str(self.key)
264             ])
265         
266         if local_snat:
267             args.append("-S")
268         if local_p2p:
269             args.extend(("-Z",str(local_p2p)))
270         if local_txq:
271             args.extend(("-Q",str(local_txq)))
272         if not local_cap:
273             args.append("-N")
274         elif local_cap == 'pcap':
275             args.extend(('-c','pcap'))
276         if local_bwlim:
277             args.extend(("-b",str(local_bwlim*1024)))
278         if filter_module:
279             args.extend(("--filter", filter_module))
280         if filter_args:
281             args.extend(("--filter-args", filter_args))
282         if local_mcast and local_mcastfwd:
283             args.extend(("--multicast-forwarder", local_mcastfwd))
284
285         self._logger.info("Starting %s", self)
286         
287         self._make_home()
288         self._install_scripts()
289
290         # Start process in a "daemonized" way, using nohup and heavy
291         # stdin/out redirection to avoid connection issues
292         (out,err),proc = rspawn.remote_spawn(
293             " ".join(args),
294             
295             pidfile = './pid',
296             home = self.home_path,
297             stdin = '/dev/null',
298             stdout = 'capture',
299             stderr = rspawn.STDOUT,
300             sudo = True,
301             
302             host = local.node.hostname,
303             port = None,
304             user = local.node.slicename,
305             agent = None,
306             ident_key = local.node.ident_path,
307             server_key = local.node.server_key
308             )
309         
310         if proc.wait():
311             raise RuntimeError, "Failed to set up TUN: %s %s" % (out,err,)
312        
313         self._started = True
314     
315     def recover(self):
316         # Tunnel should be still running in its node
317         # Just check its pidfile and we're done
318         self._started = True
319         self.checkpid()
320     
321     def wait(self):
322         local = self.local()
323         
324         # Wait for the connection to be established
325         retrytime = 2.0
326         for spin in xrange(30):
327             if self.status() != rspawn.RUNNING:
328                 self._logger.warn("FAILED TO CONNECT! %s", self)
329                 break
330             
331             # Connected?
332             (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
333                 "cd %(home)s ; grep -a -c Connected capture" % dict(
334                     home = server.shell_escape(self.home_path)),
335                 host = local.node.hostname,
336                 port = None,
337                 user = local.node.slicename,
338                 agent = None,
339                 ident_key = local.node.ident_path,
340                 server_key = local.node.server_key,
341                 timeout = 60,
342                 err_on_timeout = False
343                 )
344             proc.wait()
345
346             if out.strip() == '1':
347                 break
348
349             # At least listening?
350             (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
351                 "cd %(home)s ; grep -a -c Listening capture" % dict(
352                     home = server.shell_escape(self.home_path)),
353                 host = local.node.hostname,
354                 port = None,
355                 user = local.node.slicename,
356                 agent = None,
357                 ident_key = local.node.ident_path,
358                 server_key = local.node.server_key,
359                 timeout = 60,
360                 err_on_timeout = False
361                 )
362             proc.wait()
363
364             time.sleep(min(30.0, retrytime))
365             retrytime *= 1.1
366         else:
367             (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
368                 "cat %(home)s/capture" % dict(
369                     home = server.shell_escape(self.home_path)),
370                 host = local.node.hostname,
371                 port = None,
372                 user = local.node.slicename,
373                 agent = None,
374                 ident_key = local.node.ident_path,
375                 server_key = local.node.server_key,
376                 timeout = 60,
377                 retry = 3,
378                 err_on_timeout = False
379                 )
380             proc.wait()
381
382             raise RuntimeError, "FAILED TO CONNECT %s: %s%s" % (self,out,err)
383     
384     @property
385     def if_name(self):
386         if not self._if_name:
387             # Inspect the trace to check the assigned iface
388             local = self.local()
389             if local:
390                 cmd = "cd %(home)s ; grep -a 'Using tun:' capture | head -1" % dict(
391                             home = server.shell_escape(self.home_path))
392                 for spin in xrange(30):
393                     (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
394                         cmd,
395                         host = local.node.hostname,
396                         port = None,
397                         user = local.node.slicename,
398                         agent = None,
399                         ident_key = local.node.ident_path,
400                         server_key = local.node.server_key,
401                         timeout = 60,
402                         err_on_timeout = False
403                         )
404                     
405                     if proc.wait():
406                         self._logger.debug("if_name: failed cmd %s", cmd)
407                         time.sleep(1)
408                         continue
409                     
410                     out = out.strip()
411                     
412                     match = re.match(r"Using +tun: +([-a-zA-Z0-9]*).*",out)
413                     if match:
414                         self._if_name = match.group(1)
415                         break
416                     elif out:
417                         self._logger.debug("if_name: %r does not match expected pattern from cmd %s", out, cmd)
418                     else:
419                         self._logger.debug("if_name: empty output from cmd %s", cmd)
420                     time.sleep(3)
421                 else:
422                     self._logger.warn("if_name: Could not get interface name")
423         return self._if_name
424     
425     def if_alive(self):
426         name = self.if_name
427         if name:
428             local = self.local()
429             for i in xrange(30):
430                 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
431                     "ip link show %s >/dev/null 2>&1 && echo ALIVE || echo DEAD" % (name,),
432                     host = local.node.hostname,
433                     port = None,
434                     user = local.node.slicename,
435                     agent = None,
436                     ident_key = local.node.ident_path,
437                     server_key = local.node.server_key,
438                     timeout = 60,
439                     err_on_timeout = False
440                     )
441                 
442                 if proc.wait():
443                     time.sleep(1)
444                     continue
445                 
446                 if out.strip() == 'DEAD':
447                     return False
448                 elif out.strip() == 'ALIVE':
449                     return True
450         return False
451     
452     def checkpid(self):            
453         local = self.local()
454         
455         if not local:
456             raise RuntimeError, "Lost reference to local interface"
457         
458         # Get PID/PPID
459         # NOTE: wait a bit for the pidfile to be created
460         if self._started and not self._pid or not self._ppid:
461             pidtuple = rspawn.remote_check_pid(
462                 os.path.join(self.home_path,'pid'),
463                 host = local.node.hostname,
464                 port = None,
465                 user = local.node.slicename,
466                 agent = None,
467                 ident_key = local.node.ident_path,
468                 server_key = local.node.server_key
469                 )
470             
471             if pidtuple:
472                 self._pid, self._ppid = pidtuple
473     
474     def status(self):
475         local = self.local()
476         
477         if not local:
478             raise RuntimeError, "Lost reference to local interface"
479         
480         self.checkpid()
481         if not self._started:
482             return rspawn.NOT_STARTED
483         elif not self._pid or not self._ppid:
484             return rspawn.NOT_STARTED
485         else:
486             status = rspawn.remote_status(
487                 self._pid, self._ppid,
488                 host = local.node.hostname,
489                 port = None,
490                 user = local.node.slicename,
491                 agent = None,
492                 ident_key = local.node.ident_path,
493                 server_key = local.node.server_key
494                 )
495             return status
496     
497     def kill(self, nowait = True):
498         local = self.local()
499         
500         if not local:
501             raise RuntimeError, "Lost reference to local interface"
502         
503         status = self.status()
504         if status == rspawn.RUNNING:
505             self._logger.info("Stopping %s", self)
506             
507             # kill by ppid+pid - SIGTERM first, then try SIGKILL
508             rspawn.remote_kill(
509                 self._pid, self._ppid,
510                 host = local.node.hostname,
511                 port = None,
512                 user = local.node.slicename,
513                 agent = None,
514                 ident_key = local.node.ident_path,
515                 server_key = local.node.server_key,
516                 sudo = True,
517                 nowait = nowait
518                 )
519     
520     def waitkill(self):
521         interval = 1.0
522         for i in xrange(30):
523             status = self.status()
524             if status != rspawn.RUNNING:
525                 self._logger.info("Stopped %s", self)
526                 break
527             time.sleep(interval)
528             interval = min(30.0, interval * 1.1)
529         else:
530             self.kill(nowait=False)
531
532         if self.if_name:
533             for i in xrange(30):
534                 if not self.if_alive():
535                     self._logger.info("Device down %s", self)
536                     break
537                 time.sleep(interval)
538                 interval = min(30.0, interval * 1.1)
539             else:
540                 local = self.local()
541                 
542                 if local:
543                     # Forcibly shut down interface
544                     (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
545                         "sudo -S bash -c 'echo %s > /vsys/vif_down.in'" % (self.if_name,),
546                         host = local.node.hostname,
547                         port = None,
548                         user = local.node.slicename,
549                         agent = None,
550                         ident_key = local.node.ident_path,
551                         server_key = local.node.server_key,
552                         timeout = 60,
553                         err_on_timeout = False
554                         )
555                     proc.wait()    
556     _TRACEMAP = {
557         # tracename : (remotename, localname)
558         'packets' : ('capture','capture'),
559         'pcap' : ('pcap','capture.pcap'),
560     }
561     
562     def remote_trace_path(self, whichtrace, tracemap = None):
563         tracemap = self._TRACEMAP if not tracemap else tracemap
564         
565         if whichtrace not in tracemap:
566             return None
567         
568         return os.path.join(self.home_path, tracemap[whichtrace][1])
569         
570     def sync_trace(self, local_dir, whichtrace, tracemap = None):
571         tracemap = self._TRACEMAP if not tracemap else tracemap
572         
573         if whichtrace not in tracemap:
574             return None
575         
576         local = self.local()
577         
578         if not local:
579             return None
580         
581         local_path = os.path.join(local_dir, tracemap[whichtrace][1])
582         
583         # create parent local folders
584         if os.path.dirname(local_path):
585             proc = subprocess.Popen(
586                 ["mkdir", "-p", os.path.dirname(local_path)],
587                 stdout = open("/dev/null","w"),
588                 stdin = open("/dev/null","r"))
589
590             if proc.wait():
591                 raise RuntimeError, "Failed to synchronize trace"
592         
593         # sync files
594         (out,err),proc = server.popen_scp(
595             '%s@%s:%s' % (local.node.slicename, local.node.hostname, 
596                 os.path.join(self.home_path, tracemap[whichtrace][0])),
597             local_path,
598             port = None,
599             agent = None,
600             ident_key = local.node.ident_path,
601             server_key = local.node.server_key
602             )
603         
604         if proc.wait():
605             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
606         
607         return local_path
608         
609     def shutdown(self):
610         self.kill()
611     
612     def destroy(self):
613         self.waitkill()
614
615 class TunProtoUDP(TunProtoBase):
616     def __init__(self, local, peer, home_path, key):
617         super(TunProtoUDP, self).__init__(local, peer, home_path, key)
618     
619     def launch(self):
620         super(TunProtoUDP, self).launch('udp')
621
622 class TunProtoFD(TunProtoBase):
623     def __init__(self, local, peer, home_path, key):
624         super(TunProtoFD, self).__init__(local, peer, home_path, key)
625     
626     def launch(self):
627         super(TunProtoFD, self).launch('fd')
628
629 class TunProtoGRE(TunProtoBase):
630     def __init__(self, local, peer, home_path, key):
631         super(TunProtoGRE, self).__init__(local, peer, home_path, key)
632         self.mode = 'pl-gre-ip'
633
634     def launch(self):
635         super(TunProtoGRE, self).launch('gre')
636
637 class TunProtoTCP(TunProtoBase):
638     def __init__(self, local, peer, home_path, key):
639         super(TunProtoTCP, self).__init__(local, peer, home_path, key)
640     
641     def launch(self):
642         super(TunProtoTCP, self).launch('tcp')
643
644 class TapProtoUDP(TunProtoUDP):
645     def __init__(self, local, peer, home_path, key):
646         super(TapProtoUDP, self).__init__(local, peer, home_path, key)
647         self.mode = 'pl-tap'
648
649 class TapProtoTCP(TunProtoTCP):
650     def __init__(self, local, peer, home_path, key):
651         super(TapProtoTCP, self).__init__(local, peer, home_path, key)
652         self.mode = 'pl-tap'
653
654 class TapProtoFD(TunProtoFD):
655     def __init__(self, local, peer, home_path, key):
656         super(TapProtoFD, self).__init__(local, peer, home_path, key)
657         self.mode = 'pl-tap'
658
659 class TapProtoGRE(TunProtoGRE):
660     def __init__(self, local, peer, home_path, key):
661         super(TapProtoGRE, self).__init__(local, peer, home_path, key)
662         self.mode = 'pl-gre-eth'
663
664 TUN_PROTO_MAP = {
665     'tcp' : TunProtoTCP,
666     'udp' : TunProtoUDP,
667     'fd'  : TunProtoFD,
668     'gre' : TunProtoGRE,
669 }
670
671 TAP_PROTO_MAP = {
672     'tcp' : TapProtoTCP,
673     'udp' : TapProtoUDP,
674     'fd'  : TapProtoFD,
675     'gre' : TapProtoGRE,
676 }
677