5ba9e1cc5308c39d95193b0117a821fa658588ca
[nepi.git] / src / nepi / testbeds / planetlab / tunproto.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import weakref
5 import os
6 import os.path
7 import rspawn
8 import subprocess
9 import threading
10
11 from nepi.util import server
12
13 class TunProtoBase(object):
14     def __init__(self, local, peer, home_path, key):
15         # Weak references, since ifaces do have a reference to the
16         # tunneling protocol implementation - we don't want strong
17         # circular references.
18         self.peer = weakref.ref(peer)
19         self.local = weakref.ref(local)
20         
21         self.port = 15000
22         self.mode = 'pl-tun'
23         self.key = key
24         
25         self.home_path = home_path
26         
27         self._launcher = None
28         self._started = False
29         self._pid = None
30         self._ppid = None
31
32     def _make_home(self):
33         local = self.local()
34         
35         if not local:
36             raise RuntimeError, "Lost reference to peering interfaces before launching"
37         if not local.node:
38             raise RuntimeError, "Unconnected TUN - missing node"
39         
40         # Make sure all the paths are created where 
41         # they have to be created for deployment
42         cmd = "mkdir -p %s" % (server.shell_escape(self.home_path),)
43         (out,err),proc = server.popen_ssh_command(
44             cmd,
45             host = local.node.hostname,
46             port = None,
47             user = local.node.slicename,
48             agent = None,
49             ident_key = local.node.ident_path,
50             server_key = local.node.server_key
51             )
52         
53         if proc.wait():
54             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
55         
56     
57     def _install_scripts(self):
58         local = self.local()
59         
60         if not local:
61             raise RuntimeError, "Lost reference to peering interfaces before launching"
62         if not local.node:
63             raise RuntimeError, "Unconnected TUN - missing node"
64         
65         # Install the tun_connect script and tunalloc utility
66         sources = [
67             os.path.join(os.path.dirname(__file__), 'scripts', 'tun_connect.py'),
68             os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c'),
69         ]
70         dest = "%s@%s:%s" % (
71             local.node.slicename, local.node.hostname, 
72             os.path.join(self.home_path,'.'),)
73         (out,err),proc = server.popen_scp(
74             sources,
75             dest,
76             ident_key = local.node.ident_path,
77             server_key = local.node.server_key
78             )
79     
80         if proc.wait():
81             raise RuntimeError, "Failed upload TUN connect script %r: %s %s" % (source, out,err,)
82
83         cmd = ( (
84             "cd %(home)s && gcc -shared tunalloc.c -o tunalloc.so"
85             + ( " && "
86                 "wget -q -c -O python-passfd-src.tar.gz %(passfd_url)s && "
87                 "mkdir -p python-passfd && "
88                 "cd python-passfd && "
89                 "tar xzf ../python-passfd-src.tar.gz --strip-components=1 && "
90                 "python setup.py build && "
91                 "python setup.py install --install-lib .. "
92                 
93                 if local.tun_proto == "fd" else ""
94             ) )
95         % {
96             'home' : server.shell_escape(self.home_path),
97             'passfd_url' : "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-passfd/archive/2a6472c64c87.tar.gz",
98         } )
99         (out,err),proc = server.popen_ssh_command(
100             cmd,
101             host = local.node.hostname,
102             port = None,
103             user = local.node.slicename,
104             agent = None,
105             ident_key = local.node.ident_path,
106             server_key = local.node.server_key
107             )
108         
109         if proc.wait():
110             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
111         
112     def launch(self, check_proto, listen, extra_args=[]):
113         peer = self.peer()
114         local = self.local()
115         
116         if not peer or not local:
117             raise RuntimeError, "Lost reference to peering interfaces before launching"
118         
119         peer_port = peer.tun_port
120         peer_addr = peer.tun_addr
121         peer_proto= peer.tun_proto
122         
123         local_port = self.port
124         local_cap  = local.capture
125         local_addr = local.address
126         local_mask = local.netprefix
127         local_snat = local.snat
128         local_txq  = local.txqueuelen
129         
130         if check_proto != peer_proto:
131             raise RuntimeError, "Peering protocol mismatch: %s != %s" % (check_proto, peer_proto)
132         
133         if not listen and (not peer_port or not peer_addr):
134             raise RuntimeError, "Misconfigured peer: %s" % (peer,)
135         
136         if listen and (not local_port or not local_addr or not local_mask):
137             raise RuntimeError, "Misconfigured TUN: %s" % (local,)
138         
139         args = ["python", "tun_connect.py", 
140             "-m", str(self.mode)]
141         
142         if check_proto == 'fd':
143             args.extend([
144                 "--pass-fd", str(peer_addr)])
145         else:
146             args.extend([
147                 "-p", str(local_port if listen else peer_port),
148                 "-A", str(local_addr),
149                 "-M", str(local_mask),
150                 "-k", str(self.key)])
151         
152         if local_snat:
153             args.append("-S")
154         if local_txq:
155             args.extend(("-Q",str(local_txq)))
156         if extra_args:
157             args.extend(map(str,extra_args))
158         if not listen:
159             args.append(str(peer_addr))
160         
161         self._make_home()
162         self._install_scripts()
163         
164         # Start process in a "daemonized" way, using nohup and heavy
165         # stdin/out redirection to avoid connection issues
166         (out,err),proc = rspawn.remote_spawn(
167             " ".join(args),
168             
169             pidfile = './pid',
170             home = self.home_path,
171             stdin = '/dev/null',
172             stdout = 'capture' if local_cap else '/dev/null',
173             stderr = rspawn.STDOUT,
174             sudo = True,
175             
176             host = local.node.hostname,
177             port = None,
178             user = local.node.slicename,
179             agent = None,
180             ident_key = local.node.ident_path,
181             server_key = local.node.server_key
182             )
183         
184         if proc.wait():
185             raise RuntimeError, "Failed to set up TUN: %s %s" % (out,err,)
186
187         self._started = True
188     
189     def async_launch(self, check_proto, listen, extra_args=[]):
190         if not self._launcher:
191             self._launcher = threading.Thread(
192                 target = self.launch,
193                 args = (check_proto, listen, extra_args))
194             self._launcher.start()
195     
196     def async_launch_wait(self):
197         if not self._started:
198             if self._launcher:
199                 self._launcher.join()
200                 if not self._started:
201                     raise RuntimeError, "Failed to launch TUN forwarder"
202             else:
203                 self.launch()
204
205     def checkpid(self):            
206         local = self.local()
207         
208         if not local:
209             raise RuntimeError, "Lost reference to local interface"
210         
211         # Get PID/PPID
212         # NOTE: wait a bit for the pidfile to be created
213         if self._started and not self._pid or not self._ppid:
214             pidtuple = rspawn.remote_check_pid(
215                 os.path.join(self.home_path,'pid'),
216                 host = local.node.hostname,
217                 port = None,
218                 user = local.node.slicename,
219                 agent = None,
220                 ident_key = local.node.ident_path,
221                 server_key = local.node.server_key
222                 )
223             
224             if pidtuple:
225                 self._pid, self._ppid = pidtuple
226     
227     def status(self):
228         local = self.local()
229         
230         if not local:
231             raise RuntimeError, "Lost reference to local interface"
232         
233         self.checkpid()
234         if not self._started:
235             return rspawn.NOT_STARTED
236         elif not self._pid or not self._ppid:
237             return rspawn.NOT_STARTED
238         else:
239             status = rspawn.remote_status(
240                 self._pid, self._ppid,
241                 host = local.node.hostname,
242                 port = None,
243                 user = local.node.slicename,
244                 agent = None,
245                 ident_key = local.node.ident_path
246                 )
247             return status
248     
249     def kill(self):
250         local = self.local()
251         
252         if not local:
253             raise RuntimeError, "Lost reference to local interface"
254         
255         status = self.status()
256         if status == rspawn.RUNNING:
257             # kill by ppid+pid - SIGTERM first, then try SIGKILL
258             rspawn.remote_kill(
259                 self._pid, self._ppid,
260                 host = local.node.hostname,
261                 port = None,
262                 user = local.node.slicename,
263                 agent = None,
264                 ident_key = local.node.ident_path,
265                 server_key = local.node.server_key,
266                 sudo = True
267                 )
268         
269     def sync_trace(self, local_dir, whichtrace):
270         if whichtrace != 'packets':
271             return None
272         
273         local = self.local()
274         
275         if not local:
276             return None
277         
278         local_path = os.path.join(local_dir, 'capture')
279         
280         # create parent local folders
281         proc = subprocess.Popen(
282             ["mkdir", "-p", os.path.dirname(local_path)],
283             stdout = open("/dev/null","w"),
284             stdin = open("/dev/null","r"))
285
286         if proc.wait():
287             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
288         
289         # sync files
290         (out,err),proc = server.popen_scp(
291             '%s@%s:%s' % (local.node.slicename, local.node.hostname, 
292                 os.path.join(self.home_path, 'capture')),
293             local_path,
294             port = None,
295             agent = None,
296             ident_key = local.node.ident_path,
297             server_key = local.node.server_key
298             )
299         
300         if proc.wait():
301             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
302         
303         return local_path
304         
305         
306     def prepare(self):
307         """
308         First-phase setup
309         
310         eg: set up listening ports
311         """
312         raise NotImplementedError
313     
314     def setup(self):
315         """
316         Second-phase setup
317         
318         eg: connect to peer
319         """
320         raise NotImplementedError
321     
322     def shutdown(self):
323         """
324         Cleanup
325         """
326         raise NotImplementedError
327         
328
329 class TunProtoUDP(TunProtoBase):
330     def __init__(self, local, peer, home_path, key, listening):
331         super(TunProtoUDP, self).__init__(local, peer, home_path, key)
332         self.listening = listening
333     
334     def prepare(self):
335         pass
336     
337     def setup(self):
338         self.async_launch('udp', False, ("-u",str(self.port)))
339     
340     def shutdown(self):
341         self.kill()
342
343 class TunProtoFD(TunProtoBase):
344     def __init__(self, local, peer, home_path, key, listening):
345         super(TunProtoFD, self).__init__(local, peer, home_path, key)
346         self.listening = listening
347     
348     def prepare(self):
349         pass
350     
351     def setup(self):
352         self.async_launch('fd', False)
353     
354     def shutdown(self):
355         self.kill()
356
357 class TunProtoTCP(TunProtoBase):
358     def __init__(self, local, peer, home_path, key, listening):
359         super(TunProtoTCP, self).__init__(local, peer, home_path, key)
360         self.listening = listening
361     
362     def prepare(self):
363         if self.listening:
364             self.async_launch('tcp', True)
365     
366     def setup(self):
367         if not self.listening:
368             # make sure our peer is ready
369             peer = self.peer()
370             if peer and peer.peer_proto_impl:
371                 peer.peer_proto_impl.async_launch_wait()
372             
373             self.launch('tcp', False)
374         else:
375             # make sure WE are ready
376             self.async_launch_wait()
377         
378         self.checkpid()
379     
380     def shutdown(self):
381         self.kill()
382
383 class TapProtoUDP(TunProtoUDP):
384     def __init__(self, local, peer, home_path, key, listening):
385         super(TapProtoUDP, self).__init__(local, peer, home_path, key, listening)
386         self.mode = 'pl-tap'
387
388 class TapProtoTCP(TunProtoTCP):
389     def __init__(self, local, peer, home_path, key, listening):
390         super(TapProtoTCP, self).__init__(local, peer, home_path, key, listening)
391         self.mode = 'pl-tap'
392
393 class TapProtoFD(TunProtoFD):
394     def __init__(self, local, peer, home_path, key, listening):
395         super(TapProtoUDP, self).__init__(local, peer, home_path, key, listening)
396         self.mode = 'pl-tap'
397
398
399
400 TUN_PROTO_MAP = {
401     'tcp' : TunProtoTCP,
402     'udp' : TunProtoUDP,
403 }
404
405 TAP_PROTO_MAP = {
406     'tcp' : TapProtoTCP,
407     'udp' : TapProtoUDP,
408 }
409
410