Lots of cross-connection fixes, TUN synchronization, etc
[nepi.git] / src / nepi / testbeds / planetlab / tunproto.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import weakref
5 import os
6 import os.path
7 import rspawn
8 import subprocess
9 import threading
10 import base64
11 import time
12
13 from nepi.util import server
14
15 class TunProtoBase(object):
16     def __init__(self, local, peer, home_path, key):
17         # Weak references, since ifaces do have a reference to the
18         # tunneling protocol implementation - we don't want strong
19         # circular references.
20         self.peer = weakref.ref(peer)
21         self.local = weakref.ref(local)
22         
23         self.port = 15000
24         self.mode = 'pl-tun'
25         self.key = key
26         
27         self.home_path = home_path
28         
29         self._launcher = None
30         self._started = False
31         self._pid = None
32         self._ppid = None
33
34     def _make_home(self):
35         local = self.local()
36         
37         if not local:
38             raise RuntimeError, "Lost reference to peering interfaces before launching"
39         if not local.node:
40             raise RuntimeError, "Unconnected TUN - missing node"
41         
42         # Make sure all the paths are created where 
43         # they have to be created for deployment
44         cmd = "mkdir -p %s" % (server.shell_escape(self.home_path),)
45         (out,err),proc = server.popen_ssh_command(
46             cmd,
47             host = local.node.hostname,
48             port = None,
49             user = local.node.slicename,
50             agent = None,
51             ident_key = local.node.ident_path,
52             server_key = local.node.server_key
53             )
54         
55         if proc.wait():
56             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
57         
58     
59     def _install_scripts(self):
60         local = self.local()
61         
62         if not local:
63             raise RuntimeError, "Lost reference to peering interfaces before launching"
64         if not local.node:
65             raise RuntimeError, "Unconnected TUN - missing node"
66         
67         # Install the tun_connect script and tunalloc utility
68         sources = [
69             os.path.join(os.path.dirname(__file__), 'scripts', 'tun_connect.py'),
70             os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c'),
71         ]
72         dest = "%s@%s:%s" % (
73             local.node.slicename, local.node.hostname, 
74             os.path.join(self.home_path,'.'),)
75         (out,err),proc = server.popen_scp(
76             sources,
77             dest,
78             ident_key = local.node.ident_path,
79             server_key = local.node.server_key
80             )
81     
82         if proc.wait():
83             raise RuntimeError, "Failed upload TUN connect script %r: %s %s" % (source, out,err,)
84
85         cmd = ( (
86             "cd %(home)s && gcc -fPIC -shared tunalloc.c -o tunalloc.so"
87             + ( " && "
88                 "wget -q -c -O python-passfd-src.tar.gz %(passfd_url)s && "
89                 "mkdir -p python-passfd && "
90                 "cd python-passfd && "
91                 "tar xzf ../python-passfd-src.tar.gz --strip-components=1 && "
92                 "python setup.py build && "
93                 "python setup.py install --install-lib .. "
94                 
95                 if local.tun_proto == "fd" else ""
96             ) )
97         % {
98             'home' : server.shell_escape(self.home_path),
99             'passfd_url' : "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-passfd/archive/2a6472c64c87.tar.gz",
100         } )
101         (out,err),proc = server.popen_ssh_command(
102             cmd,
103             host = local.node.hostname,
104             port = None,
105             user = local.node.slicename,
106             agent = None,
107             ident_key = local.node.ident_path,
108             server_key = local.node.server_key
109             )
110         
111         if proc.wait():
112             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
113         
114     def launch(self, check_proto, listen, extra_args=[]):
115         peer = self.peer()
116         local = self.local()
117         
118         if not peer or not local:
119             raise RuntimeError, "Lost reference to peering interfaces before launching"
120         
121         peer_port = peer.tun_port
122         peer_addr = peer.tun_addr
123         peer_proto= peer.tun_proto
124         
125         local_port = self.port
126         local_cap  = local.capture
127         local_addr = local.address
128         local_mask = local.netprefix
129         local_snat = local.snat
130         local_txq  = local.txqueuelen
131         
132         if check_proto != peer_proto:
133             raise RuntimeError, "Peering protocol mismatch: %s != %s" % (check_proto, peer_proto)
134         
135         if not listen and (not peer_port or not peer_addr):
136             raise RuntimeError, "Misconfigured peer: %s" % (peer,)
137         
138         if listen and (not local_port or not local_addr or not local_mask):
139             raise RuntimeError, "Misconfigured TUN: %s" % (local,)
140         
141         args = ["python", "tun_connect.py", 
142             "-m", str(self.mode),
143             "-A", str(local_addr),
144             "-M", str(local_mask)]
145         
146         if check_proto == 'fd':
147             passfd_arg = str(peer_addr)
148             if passfd_arg.startswith('\x00'):
149                 # cannot shell_encode null characters :(
150                 passfd_arg = "base64:"+base64.b64encode(passfd_arg)
151             else:
152                 passfd_arg = '$HOME/'+server.shell_escape(passfd_arg)
153             args.extend([
154                 "--pass-fd", passfd_arg
155             ])
156         else:
157             args.extend([
158                 "-p", str(local_port if listen else peer_port),
159                 "-k", str(self.key)
160             ])
161         
162         if local_snat:
163             args.append("-S")
164         if local_txq:
165             args.extend(("-Q",str(local_txq)))
166         if extra_args:
167             args.extend(map(str,extra_args))
168         if not listen and check_proto != 'fd':
169             args.append(str(peer_addr))
170         
171         self._make_home()
172         self._install_scripts()
173         
174         # Start process in a "daemonized" way, using nohup and heavy
175         # stdin/out redirection to avoid connection issues
176         (out,err),proc = rspawn.remote_spawn(
177             " ".join(args),
178             
179             pidfile = './pid',
180             home = self.home_path,
181             stdin = '/dev/null',
182             stdout = 'capture' if local_cap else '/dev/null',
183             stderr = rspawn.STDOUT,
184             sudo = True,
185             
186             host = local.node.hostname,
187             port = None,
188             user = local.node.slicename,
189             agent = None,
190             ident_key = local.node.ident_path,
191             server_key = local.node.server_key
192             )
193         
194         if proc.wait():
195             raise RuntimeError, "Failed to set up TUN: %s %s" % (out,err,)
196         
197         self._started = True
198     
199     def _launch_and_wait(self, *p, **kw):
200         local = self.local()
201         
202         self.launch(*p, **kw)
203         
204         # Wait for the process to be started
205         while self.status() == rspawn.NOT_STARTED:
206             time.sleep(1.0)
207         
208         # Wait for the connection to be established
209         if local.capture:
210             for spin in xrange(30):
211                 if self.status() != rspawn.RUNNING:
212                     break
213                 
214                 (out,err),proc = server.popen_ssh_command(
215                     "cd %(home)s ; grep -c Connected capture" % dict(
216                         home = server.shell_escape(self.home_path)),
217                     host = local.node.hostname,
218                     port = None,
219                     user = local.node.slicename,
220                     agent = None,
221                     ident_key = local.node.ident_path,
222                     server_key = local.node.server_key
223                     )
224                 
225                 if proc.wait():
226                     break
227                 
228                 if out.strip() != '0':
229                     break
230                 
231                 time.sleep(1.0)
232     
233     def async_launch(self, check_proto, listen, extra_args=[]):
234         if not self._launcher:
235             self._launcher = threading.Thread(
236                 target = self._launch_and_wait,
237                 args = (check_proto, listen, extra_args))
238             self._launcher.start()
239     
240     def async_launch_wait(self):
241         if self._launcher:
242             self._launcher.join()
243             if not self._started:
244                 raise RuntimeError, "Failed to launch TUN forwarder"
245         elif not self._started:
246             self.launch()
247
248     def checkpid(self):            
249         local = self.local()
250         
251         if not local:
252             raise RuntimeError, "Lost reference to local interface"
253         
254         # Get PID/PPID
255         # NOTE: wait a bit for the pidfile to be created
256         if self._started and not self._pid or not self._ppid:
257             pidtuple = rspawn.remote_check_pid(
258                 os.path.join(self.home_path,'pid'),
259                 host = local.node.hostname,
260                 port = None,
261                 user = local.node.slicename,
262                 agent = None,
263                 ident_key = local.node.ident_path,
264                 server_key = local.node.server_key
265                 )
266             
267             if pidtuple:
268                 self._pid, self._ppid = pidtuple
269     
270     def status(self):
271         local = self.local()
272         
273         if not local:
274             raise RuntimeError, "Lost reference to local interface"
275         
276         self.checkpid()
277         if not self._started:
278             return rspawn.NOT_STARTED
279         elif not self._pid or not self._ppid:
280             return rspawn.NOT_STARTED
281         else:
282             status = rspawn.remote_status(
283                 self._pid, self._ppid,
284                 host = local.node.hostname,
285                 port = None,
286                 user = local.node.slicename,
287                 agent = None,
288                 ident_key = local.node.ident_path
289                 )
290             return status
291     
292     def kill(self):
293         local = self.local()
294         
295         if not local:
296             raise RuntimeError, "Lost reference to local interface"
297         
298         status = self.status()
299         if status == rspawn.RUNNING:
300             # kill by ppid+pid - SIGTERM first, then try SIGKILL
301             rspawn.remote_kill(
302                 self._pid, self._ppid,
303                 host = local.node.hostname,
304                 port = None,
305                 user = local.node.slicename,
306                 agent = None,
307                 ident_key = local.node.ident_path,
308                 server_key = local.node.server_key,
309                 sudo = True
310                 )
311         
312     def sync_trace(self, local_dir, whichtrace):
313         if whichtrace != 'packets':
314             return None
315         
316         local = self.local()
317         
318         if not local:
319             return None
320         
321         local_path = os.path.join(local_dir, 'capture')
322         
323         # create parent local folders
324         proc = subprocess.Popen(
325             ["mkdir", "-p", os.path.dirname(local_path)],
326             stdout = open("/dev/null","w"),
327             stdin = open("/dev/null","r"))
328
329         if proc.wait():
330             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
331         
332         # sync files
333         (out,err),proc = server.popen_scp(
334             '%s@%s:%s' % (local.node.slicename, local.node.hostname, 
335                 os.path.join(self.home_path, 'capture')),
336             local_path,
337             port = None,
338             agent = None,
339             ident_key = local.node.ident_path,
340             server_key = local.node.server_key
341             )
342         
343         if proc.wait():
344             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
345         
346         return local_path
347         
348         
349     def prepare(self):
350         """
351         First-phase setup
352         
353         eg: set up listening ports
354         """
355         raise NotImplementedError
356     
357     def setup(self):
358         """
359         Second-phase setup
360         
361         eg: connect to peer
362         """
363         raise NotImplementedError
364     
365     def shutdown(self):
366         """
367         Cleanup
368         """
369         raise NotImplementedError
370         
371
372 class TunProtoUDP(TunProtoBase):
373     def __init__(self, local, peer, home_path, key, listening):
374         super(TunProtoUDP, self).__init__(local, peer, home_path, key)
375         self.listening = listening
376     
377     def prepare(self):
378         pass
379     
380     def setup(self):
381         self.async_launch('udp', False, ("-u",str(self.port)))
382     
383     def shutdown(self):
384         self.kill()
385
386 class TunProtoFD(TunProtoBase):
387     def __init__(self, local, peer, home_path, key, listening):
388         super(TunProtoFD, self).__init__(local, peer, home_path, key)
389         self.listening = listening
390     
391     def prepare(self):
392         pass
393     
394     def setup(self):
395         self.async_launch('fd', False)
396     
397     def shutdown(self):
398         self.kill()
399
400 class TunProtoTCP(TunProtoBase):
401     def __init__(self, local, peer, home_path, key, listening):
402         super(TunProtoTCP, self).__init__(local, peer, home_path, key)
403         self.listening = listening
404     
405     def prepare(self):
406         if self.listening:
407             self.async_launch('tcp', True)
408     
409     def setup(self):
410         if not self.listening:
411             # make sure our peer is ready
412             peer = self.peer()
413             if peer and peer.peer_proto_impl:
414                 peer.peer_proto_impl.async_launch_wait()
415             
416             if not self._started:
417                 self.launch('tcp', False)
418         else:
419             # make sure WE are ready
420             self.async_launch_wait()
421         
422         self.checkpid()
423     
424     def shutdown(self):
425         self.kill()
426
427 class TapProtoUDP(TunProtoUDP):
428     def __init__(self, local, peer, home_path, key, listening):
429         super(TapProtoUDP, self).__init__(local, peer, home_path, key, listening)
430         self.mode = 'pl-tap'
431
432 class TapProtoTCP(TunProtoTCP):
433     def __init__(self, local, peer, home_path, key, listening):
434         super(TapProtoTCP, self).__init__(local, peer, home_path, key, listening)
435         self.mode = 'pl-tap'
436
437 class TapProtoFD(TunProtoFD):
438     def __init__(self, local, peer, home_path, key, listening):
439         super(TapProtoFD, self).__init__(local, peer, home_path, key, listening)
440         self.mode = 'pl-tap'
441
442
443
444 TUN_PROTO_MAP = {
445     'tcp' : TunProtoTCP,
446     'udp' : TunProtoUDP,
447     'fd'  : TunProtoFD,
448 }
449
450 TAP_PROTO_MAP = {
451     'tcp' : TapProtoTCP,
452     'udp' : TapProtoUDP,
453     'fd'  : TapProtoFD,
454 }
455
456