Synchronization fix for cross connections: wait for tunnels to be up before starting...
[nepi.git] / src / nepi / testbeds / planetlab / tunproto.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import weakref
5 import os
6 import os.path
7 import rspawn
8 import subprocess
9 import threading
10 import base64
11
12 from nepi.util import server
13
14 class TunProtoBase(object):
15     def __init__(self, local, peer, home_path, key):
16         # Weak references, since ifaces do have a reference to the
17         # tunneling protocol implementation - we don't want strong
18         # circular references.
19         self.peer = weakref.ref(peer)
20         self.local = weakref.ref(local)
21         
22         self.port = 15000
23         self.mode = 'pl-tun'
24         self.key = key
25         
26         self.home_path = home_path
27         
28         self._launcher = None
29         self._started = False
30         self._pid = None
31         self._ppid = None
32
33     def _make_home(self):
34         local = self.local()
35         
36         if not local:
37             raise RuntimeError, "Lost reference to peering interfaces before launching"
38         if not local.node:
39             raise RuntimeError, "Unconnected TUN - missing node"
40         
41         # Make sure all the paths are created where 
42         # they have to be created for deployment
43         cmd = "mkdir -p %s" % (server.shell_escape(self.home_path),)
44         (out,err),proc = server.popen_ssh_command(
45             cmd,
46             host = local.node.hostname,
47             port = None,
48             user = local.node.slicename,
49             agent = None,
50             ident_key = local.node.ident_path,
51             server_key = local.node.server_key
52             )
53         
54         if proc.wait():
55             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
56         
57     
58     def _install_scripts(self):
59         local = self.local()
60         
61         if not local:
62             raise RuntimeError, "Lost reference to peering interfaces before launching"
63         if not local.node:
64             raise RuntimeError, "Unconnected TUN - missing node"
65         
66         # Install the tun_connect script and tunalloc utility
67         sources = [
68             os.path.join(os.path.dirname(__file__), 'scripts', 'tun_connect.py'),
69             os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c'),
70         ]
71         dest = "%s@%s:%s" % (
72             local.node.slicename, local.node.hostname, 
73             os.path.join(self.home_path,'.'),)
74         (out,err),proc = server.popen_scp(
75             sources,
76             dest,
77             ident_key = local.node.ident_path,
78             server_key = local.node.server_key
79             )
80     
81         if proc.wait():
82             raise RuntimeError, "Failed upload TUN connect script %r: %s %s" % (source, out,err,)
83
84         cmd = ( (
85             "cd %(home)s && gcc -shared tunalloc.c -o tunalloc.so"
86             + ( " && "
87                 "wget -q -c -O python-passfd-src.tar.gz %(passfd_url)s && "
88                 "mkdir -p python-passfd && "
89                 "cd python-passfd && "
90                 "tar xzf ../python-passfd-src.tar.gz --strip-components=1 && "
91                 "python setup.py build && "
92                 "python setup.py install --install-lib .. "
93                 
94                 if local.tun_proto == "fd" else ""
95             ) )
96         % {
97             'home' : server.shell_escape(self.home_path),
98             'passfd_url' : "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-passfd/archive/2a6472c64c87.tar.gz",
99         } )
100         (out,err),proc = server.popen_ssh_command(
101             cmd,
102             host = local.node.hostname,
103             port = None,
104             user = local.node.slicename,
105             agent = None,
106             ident_key = local.node.ident_path,
107             server_key = local.node.server_key
108             )
109         
110         if proc.wait():
111             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
112         
113     def launch(self, check_proto, listen, extra_args=[]):
114         peer = self.peer()
115         local = self.local()
116         
117         if not peer or not local:
118             raise RuntimeError, "Lost reference to peering interfaces before launching"
119         
120         peer_port = peer.tun_port
121         peer_addr = peer.tun_addr
122         peer_proto= peer.tun_proto
123         
124         local_port = self.port
125         local_cap  = local.capture
126         local_addr = local.address
127         local_mask = local.netprefix
128         local_snat = local.snat
129         local_txq  = local.txqueuelen
130         
131         if check_proto != peer_proto:
132             raise RuntimeError, "Peering protocol mismatch: %s != %s" % (check_proto, peer_proto)
133         
134         if not listen and (not peer_port or not peer_addr):
135             raise RuntimeError, "Misconfigured peer: %s" % (peer,)
136         
137         if listen and (not local_port or not local_addr or not local_mask):
138             raise RuntimeError, "Misconfigured TUN: %s" % (local,)
139         
140         args = ["python", "tun_connect.py", 
141             "-m", str(self.mode),
142             "-A", str(local_addr),
143             "-M", str(local_mask)]
144         
145         if check_proto == 'fd':
146             passfd_arg = str(peer_addr)
147             if passfd_arg.startswith('\x00'):
148                 # cannot shell_encode null characters :(
149                 passfd_arg = "base64:"+base64.b64encode(passfd_arg)
150             else:
151                 passfd_arg = '$HOME/'+server.shell_escape(passfd_arg)
152             args.extend([
153                 "--pass-fd", passfd_arg
154             ])
155         else:
156             args.extend([
157                 "-p", str(local_port if listen else peer_port),
158                 "-k", str(self.key)
159             ])
160         
161         if local_snat:
162             args.append("-S")
163         if local_txq:
164             args.extend(("-Q",str(local_txq)))
165         if extra_args:
166             args.extend(map(str,extra_args))
167         if not listen and check_proto != 'fd':
168             args.append(str(peer_addr))
169         
170         self._make_home()
171         self._install_scripts()
172         
173         # Start process in a "daemonized" way, using nohup and heavy
174         # stdin/out redirection to avoid connection issues
175         (out,err),proc = rspawn.remote_spawn(
176             " ".join(args),
177             
178             pidfile = './pid',
179             home = self.home_path,
180             stdin = '/dev/null',
181             stdout = 'capture' if local_cap else '/dev/null',
182             stderr = rspawn.STDOUT,
183             sudo = True,
184             
185             host = local.node.hostname,
186             port = None,
187             user = local.node.slicename,
188             agent = None,
189             ident_key = local.node.ident_path,
190             server_key = local.node.server_key
191             )
192         
193         if proc.wait():
194             raise RuntimeError, "Failed to set up TUN: %s %s" % (out,err,)
195
196         self._started = True
197     
198     def async_launch(self, check_proto, listen, extra_args=[]):
199         if not self._launcher:
200             self._launcher = threading.Thread(
201                 target = self.launch,
202                 args = (check_proto, listen, extra_args))
203             self._launcher.start()
204     
205     def async_launch_wait(self):
206         if not self._started:
207             if self._launcher:
208                 self._launcher.join()
209                 if not self._started:
210                     raise RuntimeError, "Failed to launch TUN forwarder"
211             else:
212                 self.launch()
213
214     def checkpid(self):            
215         local = self.local()
216         
217         if not local:
218             raise RuntimeError, "Lost reference to local interface"
219         
220         # Get PID/PPID
221         # NOTE: wait a bit for the pidfile to be created
222         if self._started and not self._pid or not self._ppid:
223             pidtuple = rspawn.remote_check_pid(
224                 os.path.join(self.home_path,'pid'),
225                 host = local.node.hostname,
226                 port = None,
227                 user = local.node.slicename,
228                 agent = None,
229                 ident_key = local.node.ident_path,
230                 server_key = local.node.server_key
231                 )
232             
233             if pidtuple:
234                 self._pid, self._ppid = pidtuple
235     
236     def status(self):
237         local = self.local()
238         
239         if not local:
240             raise RuntimeError, "Lost reference to local interface"
241         
242         self.checkpid()
243         if not self._started:
244             return rspawn.NOT_STARTED
245         elif not self._pid or not self._ppid:
246             return rspawn.NOT_STARTED
247         else:
248             status = rspawn.remote_status(
249                 self._pid, self._ppid,
250                 host = local.node.hostname,
251                 port = None,
252                 user = local.node.slicename,
253                 agent = None,
254                 ident_key = local.node.ident_path
255                 )
256             return status
257     
258     def kill(self):
259         local = self.local()
260         
261         if not local:
262             raise RuntimeError, "Lost reference to local interface"
263         
264         status = self.status()
265         if status == rspawn.RUNNING:
266             # kill by ppid+pid - SIGTERM first, then try SIGKILL
267             rspawn.remote_kill(
268                 self._pid, self._ppid,
269                 host = local.node.hostname,
270                 port = None,
271                 user = local.node.slicename,
272                 agent = None,
273                 ident_key = local.node.ident_path,
274                 server_key = local.node.server_key,
275                 sudo = True
276                 )
277         
278     def sync_trace(self, local_dir, whichtrace):
279         if whichtrace != 'packets':
280             return None
281         
282         local = self.local()
283         
284         if not local:
285             return None
286         
287         local_path = os.path.join(local_dir, 'capture')
288         
289         # create parent local folders
290         proc = subprocess.Popen(
291             ["mkdir", "-p", os.path.dirname(local_path)],
292             stdout = open("/dev/null","w"),
293             stdin = open("/dev/null","r"))
294
295         if proc.wait():
296             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
297         
298         # sync files
299         (out,err),proc = server.popen_scp(
300             '%s@%s:%s' % (local.node.slicename, local.node.hostname, 
301                 os.path.join(self.home_path, 'capture')),
302             local_path,
303             port = None,
304             agent = None,
305             ident_key = local.node.ident_path,
306             server_key = local.node.server_key
307             )
308         
309         if proc.wait():
310             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
311         
312         return local_path
313         
314         
315     def prepare(self):
316         """
317         First-phase setup
318         
319         eg: set up listening ports
320         """
321         raise NotImplementedError
322     
323     def setup(self):
324         """
325         Second-phase setup
326         
327         eg: connect to peer
328         """
329         raise NotImplementedError
330     
331     def shutdown(self):
332         """
333         Cleanup
334         """
335         raise NotImplementedError
336         
337
338 class TunProtoUDP(TunProtoBase):
339     def __init__(self, local, peer, home_path, key, listening):
340         super(TunProtoUDP, self).__init__(local, peer, home_path, key)
341         self.listening = listening
342     
343     def prepare(self):
344         pass
345     
346     def setup(self):
347         self.async_launch('udp', False, ("-u",str(self.port)))
348     
349     def shutdown(self):
350         self.kill()
351
352 class TunProtoFD(TunProtoBase):
353     def __init__(self, local, peer, home_path, key, listening):
354         super(TunProtoFD, self).__init__(local, peer, home_path, key)
355         self.listening = listening
356     
357     def prepare(self):
358         pass
359     
360     def setup(self):
361         self.async_launch('fd', False)
362     
363     def shutdown(self):
364         self.kill()
365
366 class TunProtoTCP(TunProtoBase):
367     def __init__(self, local, peer, home_path, key, listening):
368         super(TunProtoTCP, self).__init__(local, peer, home_path, key)
369         self.listening = listening
370     
371     def prepare(self):
372         if self.listening:
373             self.async_launch('tcp', True)
374     
375     def setup(self):
376         if not self.listening:
377             # make sure our peer is ready
378             peer = self.peer()
379             if peer and peer.peer_proto_impl:
380                 peer.peer_proto_impl.async_launch_wait()
381             
382             if not self._started:
383                 self.launch('tcp', False)
384         else:
385             # make sure WE are ready
386             self.async_launch_wait()
387         
388         self.checkpid()
389     
390     def shutdown(self):
391         self.kill()
392
393 class TapProtoUDP(TunProtoUDP):
394     def __init__(self, local, peer, home_path, key, listening):
395         super(TapProtoUDP, self).__init__(local, peer, home_path, key, listening)
396         self.mode = 'pl-tap'
397
398 class TapProtoTCP(TunProtoTCP):
399     def __init__(self, local, peer, home_path, key, listening):
400         super(TapProtoTCP, self).__init__(local, peer, home_path, key, listening)
401         self.mode = 'pl-tap'
402
403 class TapProtoFD(TunProtoFD):
404     def __init__(self, local, peer, home_path, key, listening):
405         super(TapProtoFD, self).__init__(local, peer, home_path, key, listening)
406         self.mode = 'pl-tap'
407
408
409
410 TUN_PROTO_MAP = {
411     'tcp' : TunProtoTCP,
412     'udp' : TunProtoUDP,
413     'fd'  : TunProtoFD,
414 }
415
416 TAP_PROTO_MAP = {
417     'tcp' : TapProtoTCP,
418     'udp' : TapProtoUDP,
419     'fd'  : TapProtoFD,
420 }
421
422