Wait for node dependencies to be installed before using them (invoking gcc)
[nepi.git] / src / nepi / testbeds / planetlab / tunproto.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import weakref
5 import os
6 import os.path
7 import rspawn
8 import subprocess
9 import threading
10 import base64
11 import time
12 import re
13
14 from nepi.util import server
15
16 class TunProtoBase(object):
17     def __init__(self, local, peer, home_path, key):
18         # Weak references, since ifaces do have a reference to the
19         # tunneling protocol implementation - we don't want strong
20         # circular references.
21         self.peer = weakref.ref(peer)
22         self.local = weakref.ref(local)
23         
24         self.port = 15000
25         self.mode = 'pl-tun'
26         self.key = key
27         
28         self.home_path = home_path
29         
30         self._launcher = None
31         self._started = False
32         self._pid = None
33         self._ppid = None
34
35     def _make_home(self):
36         local = self.local()
37         
38         if not local:
39             raise RuntimeError, "Lost reference to peering interfaces before launching"
40         if not local.node:
41             raise RuntimeError, "Unconnected TUN - missing node"
42         
43         # Make sure all the paths are created where 
44         # they have to be created for deployment
45         cmd = "mkdir -p %s" % (server.shell_escape(self.home_path),)
46         (out,err),proc = server.popen_ssh_command(
47             cmd,
48             host = local.node.hostname,
49             port = None,
50             user = local.node.slicename,
51             agent = None,
52             ident_key = local.node.ident_path,
53             server_key = local.node.server_key
54             )
55         
56         if proc.wait():
57             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
58         
59     
60     def _install_scripts(self):
61         local = self.local()
62         
63         if not local:
64             raise RuntimeError, "Lost reference to peering interfaces before launching"
65         if not local.node:
66             raise RuntimeError, "Unconnected TUN - missing node"
67         
68         # Install the tun_connect script and tunalloc utility
69         from nepi.util import tunchannel
70         sources = [
71             os.path.join(os.path.dirname(__file__), 'scripts', 'tun_connect.py'),
72             os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c'),
73             re.sub(r"([.]py)[co]$", r'\1', tunchannel.__file__, 1), # pyc/o files are version-specific
74         ]
75         dest = "%s@%s:%s" % (
76             local.node.slicename, local.node.hostname, 
77             os.path.join(self.home_path,'.'),)
78         (out,err),proc = server.popen_scp(
79             sources,
80             dest,
81             ident_key = local.node.ident_path,
82             server_key = local.node.server_key
83             )
84     
85         if proc.wait():
86             raise RuntimeError, "Failed upload TUN connect script %r: %s %s" % (sources, out,err,)
87         
88         # Make sure all dependencies are satisfied
89         local.node.wait_dependencies()
90
91         cmd = ( (
92             "cd %(home)s && gcc -fPIC -shared tunalloc.c -o tunalloc.so"
93             + ( " && "
94                 "wget -q -c -O python-passfd-src.tar.gz %(passfd_url)s && "
95                 "mkdir -p python-passfd && "
96                 "cd python-passfd && "
97                 "tar xzf ../python-passfd-src.tar.gz --strip-components=1 && "
98                 "python setup.py build && "
99                 "python setup.py install --install-lib .. "
100                 
101                 if local.tun_proto == "fd" else ""
102             ) )
103         % {
104             'home' : server.shell_escape(self.home_path),
105             'passfd_url' : "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-passfd/archive/2a6472c64c87.tar.gz",
106         } )
107         (out,err),proc = server.popen_ssh_command(
108             cmd,
109             host = local.node.hostname,
110             port = None,
111             user = local.node.slicename,
112             agent = None,
113             ident_key = local.node.ident_path,
114             server_key = local.node.server_key
115             )
116         
117         if proc.wait():
118             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
119         
120     def launch(self, check_proto, listen, extra_args=[]):
121         peer = self.peer()
122         local = self.local()
123         
124         if not peer or not local:
125             raise RuntimeError, "Lost reference to peering interfaces before launching"
126         
127         peer_port = peer.tun_port
128         peer_addr = peer.tun_addr
129         peer_proto= peer.tun_proto
130         
131         local_port = self.port
132         local_cap  = local.capture
133         local_addr = local.address
134         local_mask = local.netprefix
135         local_snat = local.snat
136         local_txq  = local.txqueuelen
137         
138         if check_proto != peer_proto:
139             raise RuntimeError, "Peering protocol mismatch: %s != %s" % (check_proto, peer_proto)
140         
141         if not listen and ((peer_proto != 'fd' and not peer_port) or not peer_addr):
142             raise RuntimeError, "Misconfigured peer: %s" % (peer,)
143         
144         if listen and ((peer_proto != 'fd' and not local_port) or not local_addr or not local_mask):
145             raise RuntimeError, "Misconfigured TUN: %s" % (local,)
146         
147         args = ["python", "tun_connect.py", 
148             "-m", str(self.mode),
149             "-A", str(local_addr),
150             "-M", str(local_mask)]
151         
152         if check_proto == 'fd':
153             passfd_arg = str(peer_addr)
154             if passfd_arg.startswith('\x00'):
155                 # cannot shell_encode null characters :(
156                 passfd_arg = "base64:"+base64.b64encode(passfd_arg)
157             else:
158                 passfd_arg = '$HOME/'+server.shell_escape(passfd_arg)
159             args.extend([
160                 "--pass-fd", passfd_arg
161             ])
162         else:
163             args.extend([
164                 "-p", str(local_port if listen else peer_port),
165                 "-k", str(self.key)
166             ])
167         
168         if local_snat:
169             args.append("-S")
170         if local_txq:
171             args.extend(("-Q",str(local_txq)))
172         if extra_args:
173             args.extend(map(str,extra_args))
174         if not listen and check_proto != 'fd':
175             args.append(str(peer_addr))
176         
177         self._make_home()
178         self._install_scripts()
179         
180         # Start process in a "daemonized" way, using nohup and heavy
181         # stdin/out redirection to avoid connection issues
182         (out,err),proc = rspawn.remote_spawn(
183             " ".join(args),
184             
185             pidfile = './pid',
186             home = self.home_path,
187             stdin = '/dev/null',
188             stdout = 'capture' if local_cap else '/dev/null',
189             stderr = rspawn.STDOUT,
190             sudo = True,
191             
192             host = local.node.hostname,
193             port = None,
194             user = local.node.slicename,
195             agent = None,
196             ident_key = local.node.ident_path,
197             server_key = local.node.server_key
198             )
199         
200         if proc.wait():
201             raise RuntimeError, "Failed to set up TUN: %s %s" % (out,err,)
202         
203         self._started = True
204     
205     def _launch_and_wait(self, *p, **kw):
206         local = self.local()
207         
208         self.launch(*p, **kw)
209         
210         # Wait for the process to be started
211         while self.status() == rspawn.NOT_STARTED:
212             time.sleep(1.0)
213         
214         # Wait for the connection to be established
215         if local.capture:
216             for spin in xrange(30):
217                 if self.status() != rspawn.RUNNING:
218                     break
219                 
220                 (out,err),proc = server.popen_ssh_command(
221                     "cd %(home)s ; grep -c Connected capture" % dict(
222                         home = server.shell_escape(self.home_path)),
223                     host = local.node.hostname,
224                     port = None,
225                     user = local.node.slicename,
226                     agent = None,
227                     ident_key = local.node.ident_path,
228                     server_key = local.node.server_key
229                     )
230                 
231                 if proc.wait():
232                     break
233                 
234                 if out.strip() != '0':
235                     break
236                 
237                 time.sleep(1.0)
238     
239     def async_launch(self, check_proto, listen, extra_args=[]):
240         if not self._launcher:
241             self._launcher = threading.Thread(
242                 target = self._launch_and_wait,
243                 args = (check_proto, listen, extra_args))
244             self._launcher.start()
245     
246     def async_launch_wait(self):
247         if self._launcher:
248             self._launcher.join()
249             if not self._started:
250                 raise RuntimeError, "Failed to launch TUN forwarder"
251         elif not self._started:
252             self.launch()
253
254     def checkpid(self):            
255         local = self.local()
256         
257         if not local:
258             raise RuntimeError, "Lost reference to local interface"
259         
260         # Get PID/PPID
261         # NOTE: wait a bit for the pidfile to be created
262         if self._started and not self._pid or not self._ppid:
263             pidtuple = rspawn.remote_check_pid(
264                 os.path.join(self.home_path,'pid'),
265                 host = local.node.hostname,
266                 port = None,
267                 user = local.node.slicename,
268                 agent = None,
269                 ident_key = local.node.ident_path,
270                 server_key = local.node.server_key
271                 )
272             
273             if pidtuple:
274                 self._pid, self._ppid = pidtuple
275     
276     def status(self):
277         local = self.local()
278         
279         if not local:
280             raise RuntimeError, "Lost reference to local interface"
281         
282         self.checkpid()
283         if not self._started:
284             return rspawn.NOT_STARTED
285         elif not self._pid or not self._ppid:
286             return rspawn.NOT_STARTED
287         else:
288             status = rspawn.remote_status(
289                 self._pid, self._ppid,
290                 host = local.node.hostname,
291                 port = None,
292                 user = local.node.slicename,
293                 agent = None,
294                 ident_key = local.node.ident_path
295                 )
296             return status
297     
298     def kill(self):
299         local = self.local()
300         
301         if not local:
302             raise RuntimeError, "Lost reference to local interface"
303         
304         status = self.status()
305         if status == rspawn.RUNNING:
306             # kill by ppid+pid - SIGTERM first, then try SIGKILL
307             rspawn.remote_kill(
308                 self._pid, self._ppid,
309                 host = local.node.hostname,
310                 port = None,
311                 user = local.node.slicename,
312                 agent = None,
313                 ident_key = local.node.ident_path,
314                 server_key = local.node.server_key,
315                 sudo = True
316                 )
317         
318     def sync_trace(self, local_dir, whichtrace):
319         if whichtrace != 'packets':
320             return None
321         
322         local = self.local()
323         
324         if not local:
325             return None
326         
327         local_path = os.path.join(local_dir, 'capture')
328         
329         # create parent local folders
330         proc = subprocess.Popen(
331             ["mkdir", "-p", os.path.dirname(local_path)],
332             stdout = open("/dev/null","w"),
333             stdin = open("/dev/null","r"))
334
335         if proc.wait():
336             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
337         
338         # sync files
339         (out,err),proc = server.popen_scp(
340             '%s@%s:%s' % (local.node.slicename, local.node.hostname, 
341                 os.path.join(self.home_path, 'capture')),
342             local_path,
343             port = None,
344             agent = None,
345             ident_key = local.node.ident_path,
346             server_key = local.node.server_key
347             )
348         
349         if proc.wait():
350             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
351         
352         return local_path
353         
354         
355     def prepare(self):
356         """
357         First-phase setup
358         
359         eg: set up listening ports
360         """
361         raise NotImplementedError
362     
363     def setup(self):
364         """
365         Second-phase setup
366         
367         eg: connect to peer
368         """
369         raise NotImplementedError
370     
371     def shutdown(self):
372         """
373         Cleanup
374         """
375         raise NotImplementedError
376         
377
378 class TunProtoUDP(TunProtoBase):
379     def __init__(self, local, peer, home_path, key, listening):
380         super(TunProtoUDP, self).__init__(local, peer, home_path, key)
381         self.listening = listening
382     
383     def prepare(self):
384         pass
385     
386     def setup(self):
387         self.async_launch('udp', False, ("-u",str(self.port)))
388     
389     def shutdown(self):
390         self.kill()
391
392 class TunProtoFD(TunProtoBase):
393     def __init__(self, local, peer, home_path, key, listening):
394         super(TunProtoFD, self).__init__(local, peer, home_path, key)
395         self.listening = listening
396     
397     def prepare(self):
398         pass
399     
400     def setup(self):
401         self.async_launch('fd', False)
402     
403     def shutdown(self):
404         self.kill()
405
406 class TunProtoTCP(TunProtoBase):
407     def __init__(self, local, peer, home_path, key, listening):
408         super(TunProtoTCP, self).__init__(local, peer, home_path, key)
409         self.listening = listening
410     
411     def prepare(self):
412         if self.listening:
413             self.async_launch('tcp', True)
414     
415     def setup(self):
416         if not self.listening:
417             # make sure our peer is ready
418             peer = self.peer()
419             if peer and peer.peer_proto_impl:
420                 peer.peer_proto_impl.async_launch_wait()
421             
422             if not self._started:
423                 self.launch('tcp', False)
424         else:
425             # make sure WE are ready
426             self.async_launch_wait()
427         
428         self.checkpid()
429     
430     def shutdown(self):
431         self.kill()
432
433 class TapProtoUDP(TunProtoUDP):
434     def __init__(self, local, peer, home_path, key, listening):
435         super(TapProtoUDP, self).__init__(local, peer, home_path, key, listening)
436         self.mode = 'pl-tap'
437
438 class TapProtoTCP(TunProtoTCP):
439     def __init__(self, local, peer, home_path, key, listening):
440         super(TapProtoTCP, self).__init__(local, peer, home_path, key, listening)
441         self.mode = 'pl-tap'
442
443 class TapProtoFD(TunProtoFD):
444     def __init__(self, local, peer, home_path, key, listening):
445         super(TapProtoFD, self).__init__(local, peer, home_path, key, listening)
446         self.mode = 'pl-tap'
447
448
449
450 TUN_PROTO_MAP = {
451     'tcp' : TunProtoTCP,
452     'udp' : TunProtoUDP,
453     'fd'  : TunProtoFD,
454 }
455
456 TAP_PROTO_MAP = {
457     'tcp' : TapProtoTCP,
458     'udp' : TapProtoUDP,
459     'fd'  : TapProtoFD,
460 }
461
462