General cross TUN fixes
[nepi.git] / src / nepi / testbeds / planetlab / tunproto.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import weakref
5 import os
6 import os.path
7 import rspawn
8 import subprocess
9 import threading
10 import base64
11 import time
12 import re
13
14 from nepi.util import server
15
16 class TunProtoBase(object):
17     def __init__(self, local, peer, home_path, key):
18         # Weak references, since ifaces do have a reference to the
19         # tunneling protocol implementation - we don't want strong
20         # circular references.
21         self.peer = weakref.ref(peer)
22         self.local = weakref.ref(local)
23         
24         self.port = 15000
25         self.mode = 'pl-tun'
26         self.key = key
27         
28         self.home_path = home_path
29         
30         self._launcher = None
31         self._started = False
32         self._pid = None
33         self._ppid = None
34
35     def _make_home(self):
36         local = self.local()
37         
38         if not local:
39             raise RuntimeError, "Lost reference to peering interfaces before launching"
40         if not local.node:
41             raise RuntimeError, "Unconnected TUN - missing node"
42         
43         # Make sure all the paths are created where 
44         # they have to be created for deployment
45         cmd = "mkdir -p %s" % (server.shell_escape(self.home_path),)
46         (out,err),proc = server.popen_ssh_command(
47             cmd,
48             host = local.node.hostname,
49             port = None,
50             user = local.node.slicename,
51             agent = None,
52             ident_key = local.node.ident_path,
53             server_key = local.node.server_key
54             )
55         
56         if proc.wait():
57             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
58         
59     
60     def _install_scripts(self):
61         local = self.local()
62         
63         if not local:
64             raise RuntimeError, "Lost reference to peering interfaces before launching"
65         if not local.node:
66             raise RuntimeError, "Unconnected TUN - missing node"
67         
68         # Install the tun_connect script and tunalloc utility
69         from nepi.util import tunchannel
70         sources = [
71             os.path.join(os.path.dirname(__file__), 'scripts', 'tun_connect.py'),
72             os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c'),
73             re.sub(r"([.]py)[co]$", r'\1', tunchannel.__file__, 1), # pyc/o files are version-specific
74         ]
75         dest = "%s@%s:%s" % (
76             local.node.slicename, local.node.hostname, 
77             os.path.join(self.home_path,'.'),)
78         (out,err),proc = server.popen_scp(
79             sources,
80             dest,
81             ident_key = local.node.ident_path,
82             server_key = local.node.server_key
83             )
84     
85         if proc.wait():
86             raise RuntimeError, "Failed upload TUN connect script %r: %s %s" % (sources, out,err,)
87
88         cmd = ( (
89             "cd %(home)s && gcc -fPIC -shared tunalloc.c -o tunalloc.so"
90             + ( " && "
91                 "wget -q -c -O python-passfd-src.tar.gz %(passfd_url)s && "
92                 "mkdir -p python-passfd && "
93                 "cd python-passfd && "
94                 "tar xzf ../python-passfd-src.tar.gz --strip-components=1 && "
95                 "python setup.py build && "
96                 "python setup.py install --install-lib .. "
97                 
98                 if local.tun_proto == "fd" else ""
99             ) )
100         % {
101             'home' : server.shell_escape(self.home_path),
102             'passfd_url' : "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-passfd/archive/2a6472c64c87.tar.gz",
103         } )
104         (out,err),proc = server.popen_ssh_command(
105             cmd,
106             host = local.node.hostname,
107             port = None,
108             user = local.node.slicename,
109             agent = None,
110             ident_key = local.node.ident_path,
111             server_key = local.node.server_key
112             )
113         
114         if proc.wait():
115             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
116         
117     def launch(self, check_proto, listen, extra_args=[]):
118         peer = self.peer()
119         local = self.local()
120         
121         if not peer or not local:
122             raise RuntimeError, "Lost reference to peering interfaces before launching"
123         
124         peer_port = peer.tun_port
125         peer_addr = peer.tun_addr
126         peer_proto= peer.tun_proto
127         
128         local_port = self.port
129         local_cap  = local.capture
130         local_addr = local.address
131         local_mask = local.netprefix
132         local_snat = local.snat
133         local_txq  = local.txqueuelen
134         
135         if check_proto != peer_proto:
136             raise RuntimeError, "Peering protocol mismatch: %s != %s" % (check_proto, peer_proto)
137         
138         if not listen and ((peer_proto != 'fd' and not peer_port) or not peer_addr):
139             raise RuntimeError, "Misconfigured peer: %s" % (peer,)
140         
141         if listen and ((peer_proto != 'fd' and not local_port) or not local_addr or not local_mask):
142             raise RuntimeError, "Misconfigured TUN: %s" % (local,)
143         
144         args = ["python", "tun_connect.py", 
145             "-m", str(self.mode),
146             "-A", str(local_addr),
147             "-M", str(local_mask)]
148         
149         if check_proto == 'fd':
150             passfd_arg = str(peer_addr)
151             if passfd_arg.startswith('\x00'):
152                 # cannot shell_encode null characters :(
153                 passfd_arg = "base64:"+base64.b64encode(passfd_arg)
154             else:
155                 passfd_arg = '$HOME/'+server.shell_escape(passfd_arg)
156             args.extend([
157                 "--pass-fd", passfd_arg
158             ])
159         else:
160             args.extend([
161                 "-p", str(local_port if listen else peer_port),
162                 "-k", str(self.key)
163             ])
164         
165         if local_snat:
166             args.append("-S")
167         if local_txq:
168             args.extend(("-Q",str(local_txq)))
169         if extra_args:
170             args.extend(map(str,extra_args))
171         if not listen and check_proto != 'fd':
172             args.append(str(peer_addr))
173         
174         self._make_home()
175         self._install_scripts()
176         
177         # Start process in a "daemonized" way, using nohup and heavy
178         # stdin/out redirection to avoid connection issues
179         (out,err),proc = rspawn.remote_spawn(
180             " ".join(args),
181             
182             pidfile = './pid',
183             home = self.home_path,
184             stdin = '/dev/null',
185             stdout = 'capture' if local_cap else '/dev/null',
186             stderr = rspawn.STDOUT,
187             sudo = True,
188             
189             host = local.node.hostname,
190             port = None,
191             user = local.node.slicename,
192             agent = None,
193             ident_key = local.node.ident_path,
194             server_key = local.node.server_key
195             )
196         
197         if proc.wait():
198             raise RuntimeError, "Failed to set up TUN: %s %s" % (out,err,)
199         
200         self._started = True
201     
202     def _launch_and_wait(self, *p, **kw):
203         local = self.local()
204         
205         self.launch(*p, **kw)
206         
207         # Wait for the process to be started
208         while self.status() == rspawn.NOT_STARTED:
209             time.sleep(1.0)
210         
211         # Wait for the connection to be established
212         if local.capture:
213             for spin in xrange(30):
214                 if self.status() != rspawn.RUNNING:
215                     break
216                 
217                 (out,err),proc = server.popen_ssh_command(
218                     "cd %(home)s ; grep -c Connected capture" % dict(
219                         home = server.shell_escape(self.home_path)),
220                     host = local.node.hostname,
221                     port = None,
222                     user = local.node.slicename,
223                     agent = None,
224                     ident_key = local.node.ident_path,
225                     server_key = local.node.server_key
226                     )
227                 
228                 if proc.wait():
229                     break
230                 
231                 if out.strip() != '0':
232                     break
233                 
234                 time.sleep(1.0)
235     
236     def async_launch(self, check_proto, listen, extra_args=[]):
237         if not self._launcher:
238             self._launcher = threading.Thread(
239                 target = self._launch_and_wait,
240                 args = (check_proto, listen, extra_args))
241             self._launcher.start()
242     
243     def async_launch_wait(self):
244         if self._launcher:
245             self._launcher.join()
246             if not self._started:
247                 raise RuntimeError, "Failed to launch TUN forwarder"
248         elif not self._started:
249             self.launch()
250
251     def checkpid(self):            
252         local = self.local()
253         
254         if not local:
255             raise RuntimeError, "Lost reference to local interface"
256         
257         # Get PID/PPID
258         # NOTE: wait a bit for the pidfile to be created
259         if self._started and not self._pid or not self._ppid:
260             pidtuple = rspawn.remote_check_pid(
261                 os.path.join(self.home_path,'pid'),
262                 host = local.node.hostname,
263                 port = None,
264                 user = local.node.slicename,
265                 agent = None,
266                 ident_key = local.node.ident_path,
267                 server_key = local.node.server_key
268                 )
269             
270             if pidtuple:
271                 self._pid, self._ppid = pidtuple
272     
273     def status(self):
274         local = self.local()
275         
276         if not local:
277             raise RuntimeError, "Lost reference to local interface"
278         
279         self.checkpid()
280         if not self._started:
281             return rspawn.NOT_STARTED
282         elif not self._pid or not self._ppid:
283             return rspawn.NOT_STARTED
284         else:
285             status = rspawn.remote_status(
286                 self._pid, self._ppid,
287                 host = local.node.hostname,
288                 port = None,
289                 user = local.node.slicename,
290                 agent = None,
291                 ident_key = local.node.ident_path
292                 )
293             return status
294     
295     def kill(self):
296         local = self.local()
297         
298         if not local:
299             raise RuntimeError, "Lost reference to local interface"
300         
301         status = self.status()
302         if status == rspawn.RUNNING:
303             # kill by ppid+pid - SIGTERM first, then try SIGKILL
304             rspawn.remote_kill(
305                 self._pid, self._ppid,
306                 host = local.node.hostname,
307                 port = None,
308                 user = local.node.slicename,
309                 agent = None,
310                 ident_key = local.node.ident_path,
311                 server_key = local.node.server_key,
312                 sudo = True
313                 )
314         
315     def sync_trace(self, local_dir, whichtrace):
316         if whichtrace != 'packets':
317             return None
318         
319         local = self.local()
320         
321         if not local:
322             return None
323         
324         local_path = os.path.join(local_dir, 'capture')
325         
326         # create parent local folders
327         proc = subprocess.Popen(
328             ["mkdir", "-p", os.path.dirname(local_path)],
329             stdout = open("/dev/null","w"),
330             stdin = open("/dev/null","r"))
331
332         if proc.wait():
333             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
334         
335         # sync files
336         (out,err),proc = server.popen_scp(
337             '%s@%s:%s' % (local.node.slicename, local.node.hostname, 
338                 os.path.join(self.home_path, 'capture')),
339             local_path,
340             port = None,
341             agent = None,
342             ident_key = local.node.ident_path,
343             server_key = local.node.server_key
344             )
345         
346         if proc.wait():
347             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
348         
349         return local_path
350         
351         
352     def prepare(self):
353         """
354         First-phase setup
355         
356         eg: set up listening ports
357         """
358         raise NotImplementedError
359     
360     def setup(self):
361         """
362         Second-phase setup
363         
364         eg: connect to peer
365         """
366         raise NotImplementedError
367     
368     def shutdown(self):
369         """
370         Cleanup
371         """
372         raise NotImplementedError
373         
374
375 class TunProtoUDP(TunProtoBase):
376     def __init__(self, local, peer, home_path, key, listening):
377         super(TunProtoUDP, self).__init__(local, peer, home_path, key)
378         self.listening = listening
379     
380     def prepare(self):
381         pass
382     
383     def setup(self):
384         self.async_launch('udp', False, ("-u",str(self.port)))
385     
386     def shutdown(self):
387         self.kill()
388
389 class TunProtoFD(TunProtoBase):
390     def __init__(self, local, peer, home_path, key, listening):
391         super(TunProtoFD, self).__init__(local, peer, home_path, key)
392         self.listening = listening
393     
394     def prepare(self):
395         pass
396     
397     def setup(self):
398         self.async_launch('fd', False)
399     
400     def shutdown(self):
401         self.kill()
402
403 class TunProtoTCP(TunProtoBase):
404     def __init__(self, local, peer, home_path, key, listening):
405         super(TunProtoTCP, self).__init__(local, peer, home_path, key)
406         self.listening = listening
407     
408     def prepare(self):
409         if self.listening:
410             self.async_launch('tcp', True)
411     
412     def setup(self):
413         if not self.listening:
414             # make sure our peer is ready
415             peer = self.peer()
416             if peer and peer.peer_proto_impl:
417                 peer.peer_proto_impl.async_launch_wait()
418             
419             if not self._started:
420                 self.launch('tcp', False)
421         else:
422             # make sure WE are ready
423             self.async_launch_wait()
424         
425         self.checkpid()
426     
427     def shutdown(self):
428         self.kill()
429
430 class TapProtoUDP(TunProtoUDP):
431     def __init__(self, local, peer, home_path, key, listening):
432         super(TapProtoUDP, self).__init__(local, peer, home_path, key, listening)
433         self.mode = 'pl-tap'
434
435 class TapProtoTCP(TunProtoTCP):
436     def __init__(self, local, peer, home_path, key, listening):
437         super(TapProtoTCP, self).__init__(local, peer, home_path, key, listening)
438         self.mode = 'pl-tap'
439
440 class TapProtoFD(TunProtoFD):
441     def __init__(self, local, peer, home_path, key, listening):
442         super(TapProtoFD, self).__init__(local, peer, home_path, key, listening)
443         self.mode = 'pl-tap'
444
445
446
447 TUN_PROTO_MAP = {
448     'tcp' : TunProtoTCP,
449     'udp' : TunProtoUDP,
450     'fd'  : TunProtoFD,
451 }
452
453 TAP_PROTO_MAP = {
454     'tcp' : TapProtoTCP,
455     'udp' : TapProtoUDP,
456     'fd'  : TapProtoFD,
457 }
458
459