Async setup of TUNs and APPs, for much quicker deployment.
[nepi.git] / src / nepi / testbeds / planetlab / tunproto.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import weakref
5 import os
6 import os.path
7 import rspawn
8 import subprocess
9 import threading
10
11 from nepi.util import server
12
13 class TunProtoBase(object):
14     def __init__(self, local, peer, home_path):
15         # Weak references, since ifaces do have a reference to the
16         # tunneling protocol implementation - we don't want strong
17         # circular references.
18         self.peer = weakref.ref(peer)
19         self.local = weakref.ref(local)
20         
21         self.port = 15000
22         self.mode = 'pl-tun'
23         
24         self.home_path = home_path
25         
26         self._launcher = None
27         self._started = False
28         self._pid = None
29         self._ppid = None
30
31     def _make_home(self):
32         local = self.local()
33         
34         if not local:
35             raise RuntimeError, "Lost reference to peering interfaces before launching"
36         if not local.node:
37             raise RuntimeError, "Unconnected TUN - missing node"
38         
39         # Make sure all the paths are created where 
40         # they have to be created for deployment
41         cmd = "mkdir -p %s" % (server.shell_escape(self.home_path),)
42         (out,err),proc = server.popen_ssh_command(
43             cmd,
44             host = local.node.hostname,
45             port = None,
46             user = local.node.slicename,
47             agent = None,
48             ident_key = local.node.ident_path,
49             server_key = local.node.server_key
50             )
51         
52         if proc.wait():
53             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
54         
55     
56     def _install_scripts(self):
57         local = self.local()
58         
59         if not local:
60             raise RuntimeError, "Lost reference to peering interfaces before launching"
61         if not local.node:
62             raise RuntimeError, "Unconnected TUN - missing node"
63         
64         # Install the tun_connect script and tunalloc utility
65         sources = [
66             os.path.join(os.path.dirname(__file__), 'scripts', 'tun_connect.py'),
67             os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c'),
68         ]
69         dest = "%s@%s:%s" % (
70             local.node.slicename, local.node.hostname, 
71             os.path.join(self.home_path,'.'),)
72         (out,err),proc = server.popen_scp(
73             sources,
74             dest,
75             ident_key = local.node.ident_path,
76             server_key = local.node.server_key
77             )
78     
79         if proc.wait():
80             raise RuntimeError, "Failed upload TUN connect script %r: %s %s" % (source, out,err,)
81
82         cmd = "cd %s && gcc -shared tunalloc.c -o tunalloc.so" % (server.shell_escape(self.home_path),)
83         (out,err),proc = server.popen_ssh_command(
84             cmd,
85             host = local.node.hostname,
86             port = None,
87             user = local.node.slicename,
88             agent = None,
89             ident_key = local.node.ident_path,
90             server_key = local.node.server_key
91             )
92         
93         if proc.wait():
94             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
95         
96     def launch(self, check_proto, listen, extra_args=[]):
97         peer = self.peer()
98         local = self.local()
99         
100         if not peer or not local:
101             raise RuntimeError, "Lost reference to peering interfaces before launching"
102         
103         peer_port = peer.tun_port
104         peer_addr = peer.tun_addr
105         peer_proto= peer.tun_proto
106         
107         local_port = self.port
108         local_cap  = local.capture
109         local_addr = local.address
110         local_mask = local.netprefix
111         local_snat = local.snat
112         local_txq  = local.txqueuelen
113         
114         if check_proto != peer_proto:
115             raise RuntimeError, "Peering protocol mismatch: %s != %s" % (check_proto, peer_proto)
116         
117         if not listen and (not peer_port or not peer_addr):
118             raise RuntimeError, "Misconfigured peer: %s" % (peer,)
119         
120         if listen and (not local_port or not local_addr or not local_mask):
121             raise RuntimeError, "Misconfigured TUN: %s" % (local,)
122         
123         args = ["python", "tun_connect.py", 
124             "-m", str(self.mode),
125             "-p", str(local_port if listen else peer_port),
126             "-A", str(local_addr),
127             "-M", str(local_mask)]
128         
129         if local_snat:
130             args.append("-S")
131         if local_txq:
132             args.extend(("-Q",str(local_txq)))
133         if extra_args:
134             args.extend(map(str,extra_args))
135         if not listen:
136             args.append(str(peer_addr))
137         
138         self._make_home()
139         self._install_scripts()
140         
141         # Start process in a "daemonized" way, using nohup and heavy
142         # stdin/out redirection to avoid connection issues
143         (out,err),proc = rspawn.remote_spawn(
144             " ".join(args),
145             
146             pidfile = './pid',
147             home = self.home_path,
148             stdin = '/dev/null',
149             stdout = 'capture' if local_cap else '/dev/null',
150             stderr = rspawn.STDOUT,
151             sudo = True,
152             
153             host = local.node.hostname,
154             port = None,
155             user = local.node.slicename,
156             agent = None,
157             ident_key = local.node.ident_path,
158             server_key = local.node.server_key
159             )
160         
161         if proc.wait():
162             raise RuntimeError, "Failed to set up TUN: %s %s" % (out,err,)
163
164         self._started = True
165     
166     def async_launch(self, check_proto, listen, extra_args=[]):
167         if not self._launcher:
168             self._launcher = threading.Thread(
169                 target = self.launch,
170                 args = (check_proto, listen, extra_args))
171             self._launcher.start()
172     
173     def async_launch_wait(self):
174         if not self._started:
175             if self._launcher:
176                 self._launcher.join()
177                 if not self._started:
178                     raise RuntimeError, "Failed to launch TUN forwarder"
179             else:
180                 self.launch()
181
182     def checkpid(self):            
183         local = self.local()
184         
185         if not local:
186             raise RuntimeError, "Lost reference to local interface"
187         
188         # Get PID/PPID
189         # NOTE: wait a bit for the pidfile to be created
190         if self._started and not self._pid or not self._ppid:
191             pidtuple = rspawn.remote_check_pid(
192                 os.path.join(self.home_path,'pid'),
193                 host = local.node.hostname,
194                 port = None,
195                 user = local.node.slicename,
196                 agent = None,
197                 ident_key = local.node.ident_path,
198                 server_key = local.node.server_key
199                 )
200             
201             if pidtuple:
202                 self._pid, self._ppid = pidtuple
203     
204     def status(self):
205         local = self.local()
206         
207         if not local:
208             raise RuntimeError, "Lost reference to local interface"
209         
210         self.checkpid()
211         if not self._started:
212             return rspawn.NOT_STARTED
213         elif not self._pid or not self._ppid:
214             return rspawn.NOT_STARTED
215         else:
216             status = rspawn.remote_status(
217                 self._pid, self._ppid,
218                 host = local.node.hostname,
219                 port = None,
220                 user = local.node.slicename,
221                 agent = None,
222                 ident_key = local.node.ident_path
223                 )
224             return status
225     
226     def kill(self):
227         local = self.local()
228         
229         if not local:
230             raise RuntimeError, "Lost reference to local interface"
231         
232         status = self.status()
233         if status == rspawn.RUNNING:
234             # kill by ppid+pid - SIGTERM first, then try SIGKILL
235             rspawn.remote_kill(
236                 self._pid, self._ppid,
237                 host = local.node.hostname,
238                 port = None,
239                 user = local.node.slicename,
240                 agent = None,
241                 ident_key = local.node.ident_path,
242                 server_key = local.node.server_key,
243                 sudo = True
244                 )
245         
246     def sync_trace(self, local_dir, whichtrace):
247         if whichtrace != 'packets':
248             return None
249         
250         local = self.local()
251         
252         if not local:
253             return None
254         
255         local_path = os.path.join(local_dir, 'capture')
256         
257         # create parent local folders
258         proc = subprocess.Popen(
259             ["mkdir", "-p", os.path.dirname(local_path)],
260             stdout = open("/dev/null","w"),
261             stdin = open("/dev/null","r"))
262
263         if proc.wait():
264             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
265         
266         # sync files
267         (out,err),proc = server.popen_scp(
268             '%s@%s:%s' % (local.node.slicename, local.node.hostname, 
269                 os.path.join(self.home_path, 'capture')),
270             local_path,
271             port = None,
272             agent = None,
273             ident_key = local.node.ident_path,
274             server_key = local.node.server_key
275             )
276         
277         if proc.wait():
278             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
279         
280         return local_path
281         
282         
283     def prepare(self):
284         """
285         First-phase setup
286         
287         eg: set up listening ports
288         """
289         raise NotImplementedError
290     
291     def setup(self):
292         """
293         Second-phase setup
294         
295         eg: connect to peer
296         """
297         raise NotImplementedError
298     
299     def shutdown(self):
300         """
301         Cleanup
302         """
303         raise NotImplementedError
304         
305
306 class TunProtoUDP(TunProtoBase):
307     def __init__(self, local, peer, home_path, listening):
308         super(TunProtoTCP, self).__init__(local, peer, home_path)
309         self.listening = listening
310     
311     def prepare(self):
312         pass
313     
314     def setup(self):
315         self.launch_async('udp', False, ("-U",))
316     
317     def shutdown(self):
318         self.kill()
319
320 class TunProtoTCP(TunProtoBase):
321     def __init__(self, local, peer, home_path, listening):
322         super(TunProtoTCP, self).__init__(local, peer, home_path)
323         self.listening = listening
324     
325     def prepare(self):
326         if self.listening:
327             self.async_launch('tcp', True)
328     
329     def setup(self):
330         if not self.listening:
331             # make sure our peer is ready
332             peer = self.peer()
333             if peer and peer.peer_proto_impl:
334                 peer.peer_proto_impl.async_launch_wait()
335             
336             self.launch('tcp', False)
337         else:
338             # make sure WE are ready
339             self.async_launch_wait()
340         
341         self.checkpid()
342     
343     def shutdown(self):
344         self.kill()
345
346 PROTO_MAP = {
347     'tcp' : TunProtoTCP,
348     'udp' : TunProtoUDP,
349 }
350
351