resourcealloc improvement: also force selection when the hamming cardinality is 1...
[nepi.git] / src / nepi / testbeds / planetlab / tunproto.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import weakref
5 import os
6 import os.path
7 import rspawn
8 import subprocess
9 import threading
10 import base64
11 import time
12 import re
13
14 from nepi.util import server
15
16 class TunProtoBase(object):
17     def __init__(self, local, peer, home_path, key):
18         # Weak references, since ifaces do have a reference to the
19         # tunneling protocol implementation - we don't want strong
20         # circular references.
21         self.peer = weakref.ref(peer)
22         self.local = weakref.ref(local)
23         
24         self.port = 15000
25         self.mode = 'pl-tun'
26         self.key = key
27         
28         self.home_path = home_path
29         
30         self._launcher = None
31         self._started = False
32         self._pid = None
33         self._ppid = None
34         self._if_name = None
35
36     def _make_home(self):
37         local = self.local()
38         
39         if not local:
40             raise RuntimeError, "Lost reference to peering interfaces before launching"
41         if not local.node:
42             raise RuntimeError, "Unconnected TUN - missing node"
43         
44         # Make sure all the paths are created where 
45         # they have to be created for deployment
46         cmd = "mkdir -p %s" % (server.shell_escape(self.home_path),)
47         (out,err),proc = server.popen_ssh_command(
48             cmd,
49             host = local.node.hostname,
50             port = None,
51             user = local.node.slicename,
52             agent = None,
53             ident_key = local.node.ident_path,
54             server_key = local.node.server_key
55             )
56         
57         if proc.wait():
58             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
59         
60     
61     def _install_scripts(self):
62         local = self.local()
63         
64         if not local:
65             raise RuntimeError, "Lost reference to peering interfaces before launching"
66         if not local.node:
67             raise RuntimeError, "Unconnected TUN - missing node"
68         
69         # Install the tun_connect script and tunalloc utility
70         from nepi.util import tunchannel
71         sources = [
72             os.path.join(os.path.dirname(__file__), 'scripts', 'tun_connect.py'),
73             os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c'),
74             re.sub(r"([.]py)[co]$", r'\1', tunchannel.__file__, 1), # pyc/o files are version-specific
75         ]
76         dest = "%s@%s:%s" % (
77             local.node.slicename, local.node.hostname, 
78             os.path.join(self.home_path,'.'),)
79         (out,err),proc = server.popen_scp(
80             sources,
81             dest,
82             ident_key = local.node.ident_path,
83             server_key = local.node.server_key
84             )
85     
86         if proc.wait():
87             raise RuntimeError, "Failed upload TUN connect script %r: %s %s" % (sources, out,err,)
88         
89         # Make sure all dependencies are satisfied
90         local.node.wait_dependencies()
91
92         cmd = ( (
93             "cd %(home)s && gcc -fPIC -shared tunalloc.c -o tunalloc.so"
94             + ( " && "
95                 "wget -q -c -O python-passfd-src.tar.gz %(passfd_url)s && "
96                 "mkdir -p python-passfd && "
97                 "cd python-passfd && "
98                 "tar xzf ../python-passfd-src.tar.gz --strip-components=1 && "
99                 "python setup.py build && "
100                 "python setup.py install --install-lib .. "
101                 
102                 if local.tun_proto == "fd" else ""
103             ) )
104         % {
105             'home' : server.shell_escape(self.home_path),
106             'passfd_url' : "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-passfd/archive/2a6472c64c87.tar.gz",
107         } )
108         (out,err),proc = server.popen_ssh_command(
109             cmd,
110             host = local.node.hostname,
111             port = None,
112             user = local.node.slicename,
113             agent = None,
114             ident_key = local.node.ident_path,
115             server_key = local.node.server_key
116             )
117         
118         if proc.wait():
119             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
120         
121     def launch(self, check_proto, listen, extra_args=[]):
122         peer = self.peer()
123         local = self.local()
124         
125         if not peer or not local:
126             raise RuntimeError, "Lost reference to peering interfaces before launching"
127         
128         peer_port = peer.tun_port
129         peer_addr = peer.tun_addr
130         peer_proto= peer.tun_proto
131         
132         local_port = self.port
133         local_cap  = local.capture
134         local_addr = local.address
135         local_mask = local.netprefix
136         local_snat = local.snat
137         local_txq  = local.txqueuelen
138         local_p2p  = local.pointopoint
139         
140         if not local_p2p and hasattr(peer, 'address'):
141             local_p2p = peer.address
142
143         if check_proto != peer_proto:
144             raise RuntimeError, "Peering protocol mismatch: %s != %s" % (check_proto, peer_proto)
145         
146         if not listen and ((peer_proto != 'fd' and not peer_port) or not peer_addr):
147             raise RuntimeError, "Misconfigured peer: %s" % (peer,)
148         
149         if listen and ((peer_proto != 'fd' and not local_port) or not local_addr or not local_mask):
150             raise RuntimeError, "Misconfigured TUN: %s" % (local,)
151         
152         args = ["python", "tun_connect.py", 
153             "-m", str(self.mode),
154             "-A", str(local_addr),
155             "-M", str(local_mask)]
156         
157         if check_proto == 'fd':
158             passfd_arg = str(peer_addr)
159             if passfd_arg.startswith('\x00'):
160                 # cannot shell_encode null characters :(
161                 passfd_arg = "base64:"+base64.b64encode(passfd_arg)
162             else:
163                 passfd_arg = '$HOME/'+server.shell_escape(passfd_arg)
164             args.extend([
165                 "--pass-fd", passfd_arg
166             ])
167         else:
168             args.extend([
169                 "-p", str(local_port if listen else peer_port),
170                 "-k", str(self.key)
171             ])
172         
173         if local_snat:
174             args.append("-S")
175         if local_p2p:
176             args.extend(("-P",str(local_p2p)))
177         if local_txq:
178             args.extend(("-Q",str(local_txq)))
179         if not local_cap:
180             args.append("-N")
181         if extra_args:
182             args.extend(map(str,extra_args))
183         if not listen and check_proto != 'fd':
184             args.append(str(peer_addr))
185         
186         self._make_home()
187         self._install_scripts()
188         
189         # Start process in a "daemonized" way, using nohup and heavy
190         # stdin/out redirection to avoid connection issues
191         (out,err),proc = rspawn.remote_spawn(
192             " ".join(args),
193             
194             pidfile = './pid',
195             home = self.home_path,
196             stdin = '/dev/null',
197             stdout = 'capture',
198             stderr = rspawn.STDOUT,
199             sudo = True,
200             
201             host = local.node.hostname,
202             port = None,
203             user = local.node.slicename,
204             agent = None,
205             ident_key = local.node.ident_path,
206             server_key = local.node.server_key
207             )
208         
209         if proc.wait():
210             raise RuntimeError, "Failed to set up TUN: %s %s" % (out,err,)
211         
212         self._started = True
213     
214     def _launch_and_wait(self, *p, **kw):
215         local = self.local()
216         
217         self.launch(*p, **kw)
218         
219         # Wait for the process to be started
220         while self.status() == rspawn.NOT_STARTED:
221             time.sleep(1.0)
222         
223         # Wait for the connection to be established
224         for spin in xrange(30):
225             if self.status() != rspawn.RUNNING:
226                 break
227             
228             (out,err),proc = server.popen_ssh_command(
229                 "cd %(home)s ; grep -c Connected capture" % dict(
230                     home = server.shell_escape(self.home_path)),
231                 host = local.node.hostname,
232                 port = None,
233                 user = local.node.slicename,
234                 agent = None,
235                 ident_key = local.node.ident_path,
236                 server_key = local.node.server_key
237                 )
238             
239             if proc.wait():
240                 break
241             
242             if out.strip() != '0':
243                 break
244             
245             time.sleep(1.0)
246     
247     @property
248     def if_name(self):
249         if not self._if_name:
250             # Inspect the trace to check the assigned iface
251             local = self.local()
252             if local:
253                 for spin in xrange(30):
254                     (out,err),proc = server.popen_ssh_command(
255                         "cd %(home)s ; grep 'Using tun:' capture | head -1" % dict(
256                             home = server.shell_escape(self.home_path)),
257                         host = local.node.hostname,
258                         port = None,
259                         user = local.node.slicename,
260                         agent = None,
261                         ident_key = local.node.ident_path,
262                         server_key = local.node.server_key
263                         )
264                     
265                     if proc.wait():
266                         return
267                     
268                     out = out.strip()
269                     
270                     match = re.match(r"Using +tun: +([-a-zA-Z0-9]*) +.*",out)
271                     if match:
272                         self._if_name = match.group(1)
273         return self._if_name
274     
275     def async_launch(self, check_proto, listen, extra_args=[]):
276         if not self._launcher:
277             self._launcher = threading.Thread(
278                 target = self._launch_and_wait,
279                 args = (check_proto, listen, extra_args))
280             self._launcher.start()
281     
282     def async_launch_wait(self):
283         if self._launcher:
284             self._launcher.join()
285             if not self._started:
286                 raise RuntimeError, "Failed to launch TUN forwarder"
287         elif not self._started:
288             self.launch()
289
290     def checkpid(self):            
291         local = self.local()
292         
293         if not local:
294             raise RuntimeError, "Lost reference to local interface"
295         
296         # Get PID/PPID
297         # NOTE: wait a bit for the pidfile to be created
298         if self._started and not self._pid or not self._ppid:
299             pidtuple = rspawn.remote_check_pid(
300                 os.path.join(self.home_path,'pid'),
301                 host = local.node.hostname,
302                 port = None,
303                 user = local.node.slicename,
304                 agent = None,
305                 ident_key = local.node.ident_path,
306                 server_key = local.node.server_key
307                 )
308             
309             if pidtuple:
310                 self._pid, self._ppid = pidtuple
311     
312     def status(self):
313         local = self.local()
314         
315         if not local:
316             raise RuntimeError, "Lost reference to local interface"
317         
318         self.checkpid()
319         if not self._started:
320             return rspawn.NOT_STARTED
321         elif not self._pid or not self._ppid:
322             return rspawn.NOT_STARTED
323         else:
324             status = rspawn.remote_status(
325                 self._pid, self._ppid,
326                 host = local.node.hostname,
327                 port = None,
328                 user = local.node.slicename,
329                 agent = None,
330                 ident_key = local.node.ident_path
331                 )
332             return status
333     
334     def kill(self):
335         local = self.local()
336         
337         if not local:
338             raise RuntimeError, "Lost reference to local interface"
339         
340         status = self.status()
341         if status == rspawn.RUNNING:
342             # kill by ppid+pid - SIGTERM first, then try SIGKILL
343             rspawn.remote_kill(
344                 self._pid, self._ppid,
345                 host = local.node.hostname,
346                 port = None,
347                 user = local.node.slicename,
348                 agent = None,
349                 ident_key = local.node.ident_path,
350                 server_key = local.node.server_key,
351                 sudo = True
352                 )
353         
354     def sync_trace(self, local_dir, whichtrace):
355         if whichtrace != 'packets':
356             return None
357         
358         local = self.local()
359         
360         if not local:
361             return None
362         
363         local_path = os.path.join(local_dir, 'capture')
364         
365         # create parent local folders
366         proc = subprocess.Popen(
367             ["mkdir", "-p", os.path.dirname(local_path)],
368             stdout = open("/dev/null","w"),
369             stdin = open("/dev/null","r"))
370
371         if proc.wait():
372             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
373         
374         # sync files
375         (out,err),proc = server.popen_scp(
376             '%s@%s:%s' % (local.node.slicename, local.node.hostname, 
377                 os.path.join(self.home_path, 'capture')),
378             local_path,
379             port = None,
380             agent = None,
381             ident_key = local.node.ident_path,
382             server_key = local.node.server_key
383             )
384         
385         if proc.wait():
386             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
387         
388         return local_path
389         
390         
391     def prepare(self):
392         """
393         First-phase setup
394         
395         eg: set up listening ports
396         """
397         raise NotImplementedError
398     
399     def setup(self):
400         """
401         Second-phase setup
402         
403         eg: connect to peer
404         """
405         raise NotImplementedError
406     
407     def shutdown(self):
408         """
409         Cleanup
410         """
411         raise NotImplementedError
412         
413
414 class TunProtoUDP(TunProtoBase):
415     def __init__(self, local, peer, home_path, key, listening):
416         super(TunProtoUDP, self).__init__(local, peer, home_path, key)
417         self.listening = listening
418     
419     def prepare(self):
420         pass
421     
422     def setup(self):
423         self.async_launch('udp', False, ("-u",str(self.port)))
424     
425     def shutdown(self):
426         self.kill()
427
428 class TunProtoFD(TunProtoBase):
429     def __init__(self, local, peer, home_path, key, listening):
430         super(TunProtoFD, self).__init__(local, peer, home_path, key)
431         self.listening = listening
432     
433     def prepare(self):
434         pass
435     
436     def setup(self):
437         self.async_launch('fd', False)
438     
439     def shutdown(self):
440         self.kill()
441
442 class TunProtoTCP(TunProtoBase):
443     def __init__(self, local, peer, home_path, key, listening):
444         super(TunProtoTCP, self).__init__(local, peer, home_path, key)
445         self.listening = listening
446     
447     def prepare(self):
448         if self.listening:
449             self.async_launch('tcp', True)
450     
451     def setup(self):
452         if not self.listening:
453             # make sure our peer is ready
454             peer = self.peer()
455             if peer and peer.peer_proto_impl:
456                 peer.peer_proto_impl.async_launch_wait()
457             
458             if not self._started:
459                 self.launch('tcp', False)
460         else:
461             # make sure WE are ready
462             self.async_launch_wait()
463         
464         self.checkpid()
465     
466     def shutdown(self):
467         self.kill()
468
469 class TapProtoUDP(TunProtoUDP):
470     def __init__(self, local, peer, home_path, key, listening):
471         super(TapProtoUDP, self).__init__(local, peer, home_path, key, listening)
472         self.mode = 'pl-tap'
473
474 class TapProtoTCP(TunProtoTCP):
475     def __init__(self, local, peer, home_path, key, listening):
476         super(TapProtoTCP, self).__init__(local, peer, home_path, key, listening)
477         self.mode = 'pl-tap'
478
479 class TapProtoFD(TunProtoFD):
480     def __init__(self, local, peer, home_path, key, listening):
481         super(TapProtoFD, self).__init__(local, peer, home_path, key, listening)
482         self.mode = 'pl-tap'
483
484
485
486 TUN_PROTO_MAP = {
487     'tcp' : TunProtoTCP,
488     'udp' : TunProtoUDP,
489     'fd'  : TunProtoFD,
490 }
491
492 TAP_PROTO_MAP = {
493     'tcp' : TapProtoTCP,
494     'udp' : TapProtoUDP,
495     'fd'  : TapProtoFD,
496 }
497
498