PL Point-to-point link support (TUNs automatically set their interfaces for P2P,...
[nepi.git] / src / nepi / testbeds / planetlab / tunproto.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import weakref
5 import os
6 import os.path
7 import rspawn
8 import subprocess
9 import threading
10 import base64
11 import time
12 import re
13
14 from nepi.util import server
15
16 class TunProtoBase(object):
17     def __init__(self, local, peer, home_path, key):
18         # Weak references, since ifaces do have a reference to the
19         # tunneling protocol implementation - we don't want strong
20         # circular references.
21         self.peer = weakref.ref(peer)
22         self.local = weakref.ref(local)
23         
24         self.port = 15000
25         self.mode = 'pl-tun'
26         self.key = key
27         
28         self.home_path = home_path
29         
30         self._launcher = None
31         self._started = False
32         self._pid = None
33         self._ppid = None
34
35     def _make_home(self):
36         local = self.local()
37         
38         if not local:
39             raise RuntimeError, "Lost reference to peering interfaces before launching"
40         if not local.node:
41             raise RuntimeError, "Unconnected TUN - missing node"
42         
43         # Make sure all the paths are created where 
44         # they have to be created for deployment
45         cmd = "mkdir -p %s" % (server.shell_escape(self.home_path),)
46         (out,err),proc = server.popen_ssh_command(
47             cmd,
48             host = local.node.hostname,
49             port = None,
50             user = local.node.slicename,
51             agent = None,
52             ident_key = local.node.ident_path,
53             server_key = local.node.server_key
54             )
55         
56         if proc.wait():
57             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
58         
59     
60     def _install_scripts(self):
61         local = self.local()
62         
63         if not local:
64             raise RuntimeError, "Lost reference to peering interfaces before launching"
65         if not local.node:
66             raise RuntimeError, "Unconnected TUN - missing node"
67         
68         # Install the tun_connect script and tunalloc utility
69         from nepi.util import tunchannel
70         sources = [
71             os.path.join(os.path.dirname(__file__), 'scripts', 'tun_connect.py'),
72             os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c'),
73             re.sub(r"([.]py)[co]$", r'\1', tunchannel.__file__, 1), # pyc/o files are version-specific
74         ]
75         dest = "%s@%s:%s" % (
76             local.node.slicename, local.node.hostname, 
77             os.path.join(self.home_path,'.'),)
78         (out,err),proc = server.popen_scp(
79             sources,
80             dest,
81             ident_key = local.node.ident_path,
82             server_key = local.node.server_key
83             )
84     
85         if proc.wait():
86             raise RuntimeError, "Failed upload TUN connect script %r: %s %s" % (sources, out,err,)
87         
88         # Make sure all dependencies are satisfied
89         local.node.wait_dependencies()
90
91         cmd = ( (
92             "cd %(home)s && gcc -fPIC -shared tunalloc.c -o tunalloc.so"
93             + ( " && "
94                 "wget -q -c -O python-passfd-src.tar.gz %(passfd_url)s && "
95                 "mkdir -p python-passfd && "
96                 "cd python-passfd && "
97                 "tar xzf ../python-passfd-src.tar.gz --strip-components=1 && "
98                 "python setup.py build && "
99                 "python setup.py install --install-lib .. "
100                 
101                 if local.tun_proto == "fd" else ""
102             ) )
103         % {
104             'home' : server.shell_escape(self.home_path),
105             'passfd_url' : "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-passfd/archive/2a6472c64c87.tar.gz",
106         } )
107         (out,err),proc = server.popen_ssh_command(
108             cmd,
109             host = local.node.hostname,
110             port = None,
111             user = local.node.slicename,
112             agent = None,
113             ident_key = local.node.ident_path,
114             server_key = local.node.server_key
115             )
116         
117         if proc.wait():
118             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
119         
120     def launch(self, check_proto, listen, extra_args=[]):
121         peer = self.peer()
122         local = self.local()
123         
124         if not peer or not local:
125             raise RuntimeError, "Lost reference to peering interfaces before launching"
126         
127         peer_port = peer.tun_port
128         peer_addr = peer.tun_addr
129         peer_proto= peer.tun_proto
130         
131         local_port = self.port
132         local_cap  = local.capture
133         local_addr = local.address
134         local_mask = local.netprefix
135         local_snat = local.snat
136         local_txq  = local.txqueuelen
137         local_p2p  = local.pointopoint
138         
139         if not local_p2p and hasattr(peer, 'address'):
140             local_p2p = peer.address
141         
142         if check_proto != peer_proto:
143             raise RuntimeError, "Peering protocol mismatch: %s != %s" % (check_proto, peer_proto)
144         
145         if not listen and ((peer_proto != 'fd' and not peer_port) or not peer_addr):
146             raise RuntimeError, "Misconfigured peer: %s" % (peer,)
147         
148         if listen and ((peer_proto != 'fd' and not local_port) or not local_addr or not local_mask):
149             raise RuntimeError, "Misconfigured TUN: %s" % (local,)
150         
151         args = ["python", "tun_connect.py", 
152             "-m", str(self.mode),
153             "-A", str(local_addr),
154             "-M", str(local_mask)]
155         
156         if check_proto == 'fd':
157             passfd_arg = str(peer_addr)
158             if passfd_arg.startswith('\x00'):
159                 # cannot shell_encode null characters :(
160                 passfd_arg = "base64:"+base64.b64encode(passfd_arg)
161             else:
162                 passfd_arg = '$HOME/'+server.shell_escape(passfd_arg)
163             args.extend([
164                 "--pass-fd", passfd_arg
165             ])
166         else:
167             args.extend([
168                 "-p", str(local_port if listen else peer_port),
169                 "-k", str(self.key)
170             ])
171         
172         if local_snat:
173             args.append("-S")
174         if local_p2p:
175             args.extend(("-P",str(local_p2p)))
176         if local_txq:
177             args.extend(("-Q",str(local_txq)))
178         if extra_args:
179             args.extend(map(str,extra_args))
180         if not listen and check_proto != 'fd':
181             args.append(str(peer_addr))
182         
183         self._make_home()
184         self._install_scripts()
185         
186         # Start process in a "daemonized" way, using nohup and heavy
187         # stdin/out redirection to avoid connection issues
188         (out,err),proc = rspawn.remote_spawn(
189             " ".join(args),
190             
191             pidfile = './pid',
192             home = self.home_path,
193             stdin = '/dev/null',
194             stdout = 'capture' if local_cap else '/dev/null',
195             stderr = rspawn.STDOUT,
196             sudo = True,
197             
198             host = local.node.hostname,
199             port = None,
200             user = local.node.slicename,
201             agent = None,
202             ident_key = local.node.ident_path,
203             server_key = local.node.server_key
204             )
205         
206         if proc.wait():
207             raise RuntimeError, "Failed to set up TUN: %s %s" % (out,err,)
208         
209         self._started = True
210     
211     def _launch_and_wait(self, *p, **kw):
212         local = self.local()
213         
214         self.launch(*p, **kw)
215         
216         # Wait for the process to be started
217         while self.status() == rspawn.NOT_STARTED:
218             time.sleep(1.0)
219         
220         # Wait for the connection to be established
221         if local.capture:
222             for spin in xrange(30):
223                 if self.status() != rspawn.RUNNING:
224                     break
225                 
226                 (out,err),proc = server.popen_ssh_command(
227                     "cd %(home)s ; grep -c Connected capture" % dict(
228                         home = server.shell_escape(self.home_path)),
229                     host = local.node.hostname,
230                     port = None,
231                     user = local.node.slicename,
232                     agent = None,
233                     ident_key = local.node.ident_path,
234                     server_key = local.node.server_key
235                     )
236                 
237                 if proc.wait():
238                     break
239                 
240                 if out.strip() != '0':
241                     break
242                 
243                 time.sleep(1.0)
244     
245     def async_launch(self, check_proto, listen, extra_args=[]):
246         if not self._launcher:
247             self._launcher = threading.Thread(
248                 target = self._launch_and_wait,
249                 args = (check_proto, listen, extra_args))
250             self._launcher.start()
251     
252     def async_launch_wait(self):
253         if self._launcher:
254             self._launcher.join()
255             if not self._started:
256                 raise RuntimeError, "Failed to launch TUN forwarder"
257         elif not self._started:
258             self.launch()
259
260     def checkpid(self):            
261         local = self.local()
262         
263         if not local:
264             raise RuntimeError, "Lost reference to local interface"
265         
266         # Get PID/PPID
267         # NOTE: wait a bit for the pidfile to be created
268         if self._started and not self._pid or not self._ppid:
269             pidtuple = rspawn.remote_check_pid(
270                 os.path.join(self.home_path,'pid'),
271                 host = local.node.hostname,
272                 port = None,
273                 user = local.node.slicename,
274                 agent = None,
275                 ident_key = local.node.ident_path,
276                 server_key = local.node.server_key
277                 )
278             
279             if pidtuple:
280                 self._pid, self._ppid = pidtuple
281     
282     def status(self):
283         local = self.local()
284         
285         if not local:
286             raise RuntimeError, "Lost reference to local interface"
287         
288         self.checkpid()
289         if not self._started:
290             return rspawn.NOT_STARTED
291         elif not self._pid or not self._ppid:
292             return rspawn.NOT_STARTED
293         else:
294             status = rspawn.remote_status(
295                 self._pid, self._ppid,
296                 host = local.node.hostname,
297                 port = None,
298                 user = local.node.slicename,
299                 agent = None,
300                 ident_key = local.node.ident_path
301                 )
302             return status
303     
304     def kill(self):
305         local = self.local()
306         
307         if not local:
308             raise RuntimeError, "Lost reference to local interface"
309         
310         status = self.status()
311         if status == rspawn.RUNNING:
312             # kill by ppid+pid - SIGTERM first, then try SIGKILL
313             rspawn.remote_kill(
314                 self._pid, self._ppid,
315                 host = local.node.hostname,
316                 port = None,
317                 user = local.node.slicename,
318                 agent = None,
319                 ident_key = local.node.ident_path,
320                 server_key = local.node.server_key,
321                 sudo = True
322                 )
323         
324     def sync_trace(self, local_dir, whichtrace):
325         if whichtrace != 'packets':
326             return None
327         
328         local = self.local()
329         
330         if not local:
331             return None
332         
333         local_path = os.path.join(local_dir, 'capture')
334         
335         # create parent local folders
336         proc = subprocess.Popen(
337             ["mkdir", "-p", os.path.dirname(local_path)],
338             stdout = open("/dev/null","w"),
339             stdin = open("/dev/null","r"))
340
341         if proc.wait():
342             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
343         
344         # sync files
345         (out,err),proc = server.popen_scp(
346             '%s@%s:%s' % (local.node.slicename, local.node.hostname, 
347                 os.path.join(self.home_path, 'capture')),
348             local_path,
349             port = None,
350             agent = None,
351             ident_key = local.node.ident_path,
352             server_key = local.node.server_key
353             )
354         
355         if proc.wait():
356             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
357         
358         return local_path
359         
360         
361     def prepare(self):
362         """
363         First-phase setup
364         
365         eg: set up listening ports
366         """
367         raise NotImplementedError
368     
369     def setup(self):
370         """
371         Second-phase setup
372         
373         eg: connect to peer
374         """
375         raise NotImplementedError
376     
377     def shutdown(self):
378         """
379         Cleanup
380         """
381         raise NotImplementedError
382         
383
384 class TunProtoUDP(TunProtoBase):
385     def __init__(self, local, peer, home_path, key, listening):
386         super(TunProtoUDP, self).__init__(local, peer, home_path, key)
387         self.listening = listening
388     
389     def prepare(self):
390         pass
391     
392     def setup(self):
393         self.async_launch('udp', False, ("-u",str(self.port)))
394     
395     def shutdown(self):
396         self.kill()
397
398 class TunProtoFD(TunProtoBase):
399     def __init__(self, local, peer, home_path, key, listening):
400         super(TunProtoFD, self).__init__(local, peer, home_path, key)
401         self.listening = listening
402     
403     def prepare(self):
404         pass
405     
406     def setup(self):
407         self.async_launch('fd', False)
408     
409     def shutdown(self):
410         self.kill()
411
412 class TunProtoTCP(TunProtoBase):
413     def __init__(self, local, peer, home_path, key, listening):
414         super(TunProtoTCP, self).__init__(local, peer, home_path, key)
415         self.listening = listening
416     
417     def prepare(self):
418         if self.listening:
419             self.async_launch('tcp', True)
420     
421     def setup(self):
422         if not self.listening:
423             # make sure our peer is ready
424             peer = self.peer()
425             if peer and peer.peer_proto_impl:
426                 peer.peer_proto_impl.async_launch_wait()
427             
428             if not self._started:
429                 self.launch('tcp', False)
430         else:
431             # make sure WE are ready
432             self.async_launch_wait()
433         
434         self.checkpid()
435     
436     def shutdown(self):
437         self.kill()
438
439 class TapProtoUDP(TunProtoUDP):
440     def __init__(self, local, peer, home_path, key, listening):
441         super(TapProtoUDP, self).__init__(local, peer, home_path, key, listening)
442         self.mode = 'pl-tap'
443
444 class TapProtoTCP(TunProtoTCP):
445     def __init__(self, local, peer, home_path, key, listening):
446         super(TapProtoTCP, self).__init__(local, peer, home_path, key, listening)
447         self.mode = 'pl-tap'
448
449 class TapProtoFD(TunProtoFD):
450     def __init__(self, local, peer, home_path, key, listening):
451         super(TapProtoFD, self).__init__(local, peer, home_path, key, listening)
452         self.mode = 'pl-tap'
453
454
455
456 TUN_PROTO_MAP = {
457     'tcp' : TunProtoTCP,
458     'udp' : TunProtoUDP,
459     'fd'  : TunProtoFD,
460 }
461
462 TAP_PROTO_MAP = {
463     'tcp' : TapProtoTCP,
464     'udp' : TapProtoUDP,
465     'fd'  : TapProtoFD,
466 }
467
468