server_scp can copy multiple files at once.
[nepi.git] / src / nepi / testbeds / planetlab / tunproto.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import weakref
5 import os
6 import os.path
7 import rspawn
8 import subprocess
9
10 from nepi.util import server
11
12 class TunProtoBase(object):
13     def __init__(self, local, peer, home_path):
14         # Weak references, since ifaces do have a reference to the
15         # tunneling protocol implementation - we don't want strong
16         # circular references.
17         self.peer = weakref.ref(peer)
18         self.local = weakref.ref(local)
19         
20         self.port = 15000
21         self.mode = 'pl-tun'
22         
23         self.home_path = home_path
24         
25         self._started = False
26         self._pid = None
27         self._ppid = None
28
29     def _make_home(self):
30         local = self.local()
31         
32         if not local:
33             raise RuntimeError, "Lost reference to peering interfaces before launching"
34         if not local.node:
35             raise RuntimeError, "Unconnected TUN - missing node"
36         
37         # Make sure all the paths are created where 
38         # they have to be created for deployment
39         cmd = "mkdir -p %s" % (server.shell_escape(self.home_path),)
40         (out,err),proc = server.popen_ssh_command(
41             cmd,
42             host = local.node.hostname,
43             port = None,
44             user = local.node.slicename,
45             agent = None,
46             ident_key = local.node.ident_path,
47             server_key = local.node.server_key
48             )
49         
50         if proc.wait():
51             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
52         
53     
54     def _install_scripts(self):
55         local = self.local()
56         
57         if not local:
58             raise RuntimeError, "Lost reference to peering interfaces before launching"
59         if not local.node:
60             raise RuntimeError, "Unconnected TUN - missing node"
61         
62         # Install the tun_connect script and tunalloc utility
63         sources = [
64             os.path.join(os.path.dirname(__file__), 'scripts', 'tun_connect.py'),
65             os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c'),
66         ]
67         dest = "%s@%s:%s" % (
68             local.node.slicename, local.node.hostname, 
69             os.path.join(self.home_path,'.'),)
70         (out,err),proc = server.popen_scp(
71             sources,
72             dest,
73             ident_key = local.node.ident_path,
74             server_key = local.node.server_key
75             )
76     
77         if proc.wait():
78             raise RuntimeError, "Failed upload TUN connect script %r: %s %s" % (source, out,err,)
79
80         cmd = "cd %s && gcc -shared tunalloc.c -o tunalloc.so" % (server.shell_escape(self.home_path),)
81         (out,err),proc = server.popen_ssh_command(
82             cmd,
83             host = local.node.hostname,
84             port = None,
85             user = local.node.slicename,
86             agent = None,
87             ident_key = local.node.ident_path,
88             server_key = local.node.server_key
89             )
90         
91         if proc.wait():
92             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
93         
94     
95     def launch(self, check_proto, listen, extra_args=[]):
96         peer = self.peer()
97         local = self.local()
98         
99         if not peer or not local:
100             raise RuntimeError, "Lost reference to peering interfaces before launching"
101         
102         peer_port = peer.tun_port
103         peer_addr = peer.tun_addr
104         peer_proto= peer.tun_proto
105         
106         local_port = self.port
107         local_cap  = local.capture
108         local_addr = local.address
109         local_mask = local.netprefix
110         local_snat = local.snat
111         local_txq  = local.txqueuelen
112         
113         if check_proto != peer_proto:
114             raise RuntimeError, "Peering protocol mismatch: %s != %s" % (check_proto, peer_proto)
115         
116         if not listen and (not peer_port or not peer_addr):
117             raise RuntimeError, "Misconfigured peer: %s" % (peer,)
118         
119         if listen and (not local_port or not local_addr or not local_mask):
120             raise RuntimeError, "Misconfigured TUN: %s" % (local,)
121         
122         args = ["python", "tun_connect.py", 
123             "-m", str(self.mode),
124             "-p", str(local_port if listen else peer_port),
125             "-A", str(local_addr),
126             "-M", str(local_mask)]
127         
128         if local_snat:
129             args.append("-S")
130         if local_txq:
131             args.extend(("-Q",str(local_txq)))
132         if extra_args:
133             args.extend(map(str,extra_args))
134         if not listen:
135             args.append(str(peer_addr))
136         
137         self._make_home()
138         self._install_scripts()
139         
140         # Start process in a "daemonized" way, using nohup and heavy
141         # stdin/out redirection to avoid connection issues
142         (out,err),proc = rspawn.remote_spawn(
143             " ".join(args),
144             
145             pidfile = './pid',
146             home = self.home_path,
147             stdin = '/dev/null',
148             stdout = 'capture' if local_cap else '/dev/null',
149             stderr = rspawn.STDOUT,
150             sudo = True,
151             
152             host = local.node.hostname,
153             port = None,
154             user = local.node.slicename,
155             agent = None,
156             ident_key = local.node.ident_path,
157             server_key = local.node.server_key
158             )
159         
160         if proc.wait():
161             raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
162
163         self._started = True
164
165     def checkpid(self):            
166         local = self.local()
167         
168         if not local:
169             raise RuntimeError, "Lost reference to local interface"
170         
171         # Get PID/PPID
172         # NOTE: wait a bit for the pidfile to be created
173         if self._started and not self._pid or not self._ppid:
174             pidtuple = rspawn.remote_check_pid(
175                 os.path.join(self.home_path,'pid'),
176                 host = local.node.hostname,
177                 port = None,
178                 user = local.node.slicename,
179                 agent = None,
180                 ident_key = local.node.ident_path,
181                 server_key = local.node.server_key
182                 )
183             
184             if pidtuple:
185                 self._pid, self._ppid = pidtuple
186     
187     def status(self):
188         local = self.local()
189         
190         if not local:
191             raise RuntimeError, "Lost reference to local interface"
192         
193         self.checkpid()
194         if not self._started:
195             return rspawn.NOT_STARTED
196         elif not self._pid or not self._ppid:
197             return rspawn.NOT_STARTED
198         else:
199             status = rspawn.remote_status(
200                 self._pid, self._ppid,
201                 host = local.node.hostname,
202                 port = None,
203                 user = local.node.slicename,
204                 agent = None,
205                 ident_key = local.node.ident_path
206                 )
207             return status
208     
209     def kill(self):
210         local = self.local()
211         
212         if not local:
213             raise RuntimeError, "Lost reference to local interface"
214         
215         status = self.status()
216         if status == rspawn.RUNNING:
217             # kill by ppid+pid - SIGTERM first, then try SIGKILL
218             rspawn.remote_kill(
219                 self._pid, self._ppid,
220                 host = local.node.hostname,
221                 port = None,
222                 user = local.node.slicename,
223                 agent = None,
224                 ident_key = local.node.ident_path,
225                 server_key = local.node.server_key,
226                 sudo = True
227                 )
228         
229     def sync_trace(self, local_dir, whichtrace):
230         if whichtrace != 'packets':
231             return None
232         
233         local = self.local()
234         
235         if not local:
236             return None
237         
238         local_path = os.path.join(local_dir, 'capture')
239         
240         # create parent local folders
241         proc = subprocess.Popen(
242             ["mkdir", "-p", os.path.dirname(local_path)],
243             stdout = open("/dev/null","w"),
244             stdin = open("/dev/null","r"))
245
246         if proc.wait():
247             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
248         
249         # sync files
250         (out,err),proc = server.popen_scp(
251             '%s@%s:%s' % (local.node.slicename, local.node.hostname, 
252                 os.path.join(self.home_path, 'capture')),
253             local_path,
254             port = None,
255             agent = None,
256             ident_key = local.node.ident_path,
257             server_key = local.node.server_key
258             )
259         
260         if proc.wait():
261             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
262         
263         return local_path
264         
265         
266     def prepare(self):
267         """
268         First-phase setup
269         
270         eg: set up listening ports
271         """
272         raise NotImplementedError
273     
274     def setup(self):
275         """
276         Second-phase setup
277         
278         eg: connect to peer
279         """
280         raise NotImplementedError
281     
282     def shutdown(self):
283         """
284         Cleanup
285         """
286         raise NotImplementedError
287         
288
289 class TunProtoUDP(TunProtoBase):
290     def __init__(self, local, peer, home_path, listening):
291         super(TunProtoTCP, self).__init__(local, peer, home_path)
292         self.listening = listening
293     
294     def prepare(self):
295         pass
296     
297     def setup(self):
298         self.launch('udp', False, ("-U",))
299     
300     def shutdown(self):
301         self.kill()
302
303 class TunProtoTCP(TunProtoBase):
304     def __init__(self, local, peer, home_path, listening):
305         super(TunProtoTCP, self).__init__(local, peer, home_path)
306         self.listening = listening
307     
308     def prepare(self):
309         if self.listening:
310             self.launch('tcp', True)
311     
312     def setup(self):
313         if not self.listening:
314             self.launch('tcp', False)
315         
316         self.checkpid()
317     
318     def shutdown(self):
319         self.kill()
320
321 PROTO_MAP = {
322     'tcp' : TunProtoTCP,
323     'udp' : TunProtoUDP,
324 }
325
326