Synchronization fix for cross connections: wait for tunnels to be up before starting...
[nepi.git] / src / nepi / testbeds / planetlab / interfaces.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 import nepi.util.ipaddr2 as ipaddr2
6 import nepi.util.server as server
7 import plcapi
8 import subprocess
9 import os
10 import os.path
11 import random
12
13 import tunproto
14
15 class NodeIface(object):
16     def __init__(self, api=None):
17         if not api:
18             api = plcapi.PLCAPI()
19         self._api = api
20         
21         # Attributes
22         self.primary = True
23
24         # These get initialized at configuration time
25         self.address = None
26         self.lladdr = None
27         self.netprefix = None
28         self.netmask = None
29         self.broadcast = True
30         self._interface_id = None
31
32         # These get initialized when the iface is connected to its node
33         self.node = None
34
35         # These get initialized when the iface is connected to the internet
36         self.has_internet = False
37
38     def __str__(self):
39         return "%s<ip:%s/%s up mac:%s>" % (
40             self.__class__.__name__,
41             self.address, self.netmask,
42             self.lladdr,
43         )
44
45     def add_address(self, address, netprefix, broadcast):
46         raise RuntimeError, "Cannot add explicit addresses to public interface"
47     
48     def pick_iface(self, siblings):
49         """
50         Picks an interface using the PLCAPI to query information about the node.
51         
52         Needs an assigned node.
53         
54         Params:
55             siblings: other NodeIface elements attached to the same node
56         """
57         
58         if self.node is None or self.node._node_id is None:
59             raise RuntimeError, "Cannot pick interface without an assigned node"
60         
61         avail = self._api.GetInterfaces(
62             node_id=self.node._node_id, 
63             is_primary=self.primary,
64             fields=('interface_id','mac','netmask','ip') )
65         
66         used = set([sibling._interface_id for sibling in siblings
67                     if sibling._interface_id is not None])
68         
69         for candidate in avail:
70             candidate_id = candidate['interface_id']
71             if candidate_id not in used:
72                 # pick it!
73                 self._interface_id = candidate_id
74                 self.address = candidate['ip']
75                 self.lladdr = candidate['mac']
76                 self.netprefix = candidate['netmask']
77                 self.netmask = ipaddr2.ipv4_dot2mask(self.netprefix) if self.netprefix else None
78                 return
79         else:
80             raise RuntimeError, "Cannot configure interface: cannot find suitable interface in PlanetLab node"
81
82     def validate(self):
83         if not self.has_internet:
84             raise RuntimeError, "All external interface devices must be connected to the Internet"
85     
86
87 class _CrossIface(object):
88     def __init__(self, proto, addr, port):
89         self.tun_proto = proto
90         self.tun_addr = addr
91         self.tun_port = port
92
93 class TunIface(object):
94     _PROTO_MAP = tunproto.TUN_PROTO_MAP
95     _KIND = 'TUN'
96
97     def __init__(self, api=None):
98         if not api:
99             api = plcapi.PLCAPI()
100         self._api = api
101         
102         # Attributes
103         self.address = None
104         self.netprefix = None
105         self.netmask = None
106         
107         self.up = None
108         self.device_name = None
109         self.mtu = None
110         self.snat = False
111         self.txqueuelen = None
112         
113         # Enabled traces
114         self.capture = False
115
116         # These get initialized when the iface is connected to its node
117         self.node = None
118         
119         # These get initialized when the iface is configured
120         self.external_iface = None
121         
122         # These get initialized when the iface is configured
123         # They're part of the TUN standard attribute set
124         self.tun_port = None
125         self.tun_addr = None
126         
127         # These get initialized when the iface is connected to its peer
128         self.peer_iface = None
129         self.peer_proto = None
130         self.peer_proto_impl = None
131
132         # same as peer proto, but for execute-time standard attribute lookups
133         self.tun_proto = None 
134         
135         
136         # Generate an initial random cryptographic key to use for tunnelling
137         # Upon connection, both endpoints will agree on a common one based on
138         # this one.
139         self.tun_key = ( ''.join(map(chr, [ 
140                     r.getrandbits(8) 
141                     for i in xrange(32) 
142                     for r in (random.SystemRandom(),) ])
143                 ).encode("base64").strip() )        
144         
145
146     def __str__(self):
147         return "%s<ip:%s/%s %s%s>" % (
148             self.__class__.__name__,
149             self.address, self.netprefix,
150             " up" if self.up else " down",
151             " snat" if self.snat else "",
152         )
153
154     def add_address(self, address, netprefix, broadcast):
155         if (self.address or self.netprefix or self.netmask) is not None:
156             raise RuntimeError, "Cannot add more than one address to %s interfaces" % (self._KIND,)
157         if broadcast:
158             raise ValueError, "%s interfaces cannot broadcast in PlanetLab" % (self._KIND,)
159         
160         self.address = address
161         self.netprefix = netprefix
162         self.netmask = ipaddr2.ipv4_mask2dot(netprefix)
163     
164     def validate(self):
165         if not self.node:
166             raise RuntimeError, "Unconnected %s iface - missing node" % (self._KIND,)
167         if self.peer_iface and self.peer_proto not in self._PROTO_MAP:
168             raise RuntimeError, "Unsupported tunnelling protocol: %s" % (self.peer_proto,)
169         if not self.address or not self.netprefix or not self.netmask:
170             raise RuntimeError, "Misconfigured %s iface - missing address" % (self._KIND,)
171     
172     def _impl_instance(self, home_path, listening):
173         impl = self._PROTO_MAP[self.peer_proto](
174             self, self.peer_iface, home_path, self.tun_key, listening)
175         impl.port = self.tun_port
176         return impl
177     
178     def prepare(self, home_path, listening):
179         if not self.peer_iface and (self.peer_proto and (listening or (self.peer_addr and self.peer_port))):
180             # Ad-hoc peer_iface
181             self.peer_iface = _CrossIface(
182                 self.peer_proto,
183                 self.peer_addr,
184                 self.peer_port)
185         if self.peer_iface:
186             if not self.peer_proto_impl:
187                 self.peer_proto_impl = self._impl_instance(home_path, listening)
188             self.peer_proto_impl.prepare()
189     
190     def setup(self):
191         if self.peer_proto_impl:
192             self.peer_proto_impl.setup()
193     
194     def cleanup(self):
195         if self.peer_proto_impl:
196             self.peer_proto_impl.shutdown()
197             self.peer_proto_impl = None
198
199     def async_launch_wait(self):
200         if self.peer_proto_impl:
201             self.peer_proto_impl.async_launch_wait()
202
203     def sync_trace(self, local_dir, whichtrace):
204         if self.peer_proto_impl:
205             return self.peer_proto_impl.sync_trace(local_dir, whichtrace)
206         else:
207             return None
208
209 class TapIface(TunIface):
210     _PROTO_MAP = tunproto.TAP_PROTO_MAP
211     _KIND = 'TAP'
212
213 # Yep, it does nothing - yet
214 class Internet(object):
215     def __init__(self, api=None):
216         if not api:
217             api = plcapi.PLCAPI()
218         self._api = api
219
220 class NetPipe(object):
221     def __init__(self, api=None):
222         if not api:
223             api = plcapi.PLCAPI()
224         self._api = api
225
226         # Attributes
227         self.mode = None
228         self.addrList = None
229         self.portList = None
230         
231         self.plrIn = None
232         self.bwIn = None
233         self.delayIn = None
234
235         self.plrOut = None
236         self.bwOut = None
237         self.delayOut = None
238         
239         # These get initialized when the pipe is connected to its node
240         self.node = None
241         self.configured = False
242     
243     def validate(self):
244         if not self.mode:
245             raise RuntimeError, "Undefined NetPipe mode"
246         if not self.portList:
247             raise RuntimeError, "Undefined NetPipe port list - must always define the scope"
248         if not (self.plrIn or self.bwIn or self.delayIn):
249             raise RuntimeError, "Undefined NetPipe inbound characteristics"
250         if not (self.plrOut or self.bwOut or self.delayOut):
251             raise RuntimeError, "Undefined NetPipe outbound characteristics"
252         if not self.node:
253             raise RuntimeError, "Unconnected NetPipe"
254     
255     def _add_pipedef(self, bw, plr, delay, options):
256         if delay:
257             options.extend(("delay","%dms" % (delay,)))
258         if bw:
259             options.extend(("bw","%.8fMbit/s" % (bw,)))
260         if plr:
261             options.extend(("plr","%.8f" % (plr,)))
262     
263     def _get_ruledef(self):
264         scope = "%s%s%s" % (
265             self.portList,
266             "@" if self.addrList else "",
267             self.addrList or "",
268         )
269         
270         options = []
271         if self.bwIn or self.plrIn or self.delayIn:
272             options.append("IN")
273             self._add_pipedef(self.bwIn, self.plrIn, self.delayIn, options)
274         if self.bwOut or self.plrOut or self.delayOut:
275             options.append("OUT")
276             self._add_pipedef(self.bwOut, self.plrOut, self.delayOut, options)
277         options = ' '.join(options)
278         
279         return (scope,options)
280
281     def configure(self):
282         # set up rule
283         scope, options = self._get_ruledef()
284         command = "sudo -S netconfig config %s %s %s" % (self.mode, scope, options)
285         
286         (out,err),proc = server.popen_ssh_command(
287             command,
288             host = self.node.hostname,
289             port = None,
290             user = self.node.slicename,
291             agent = None,
292             ident_key = self.node.ident_path,
293             server_key = self.node.server_key
294             )
295     
296         if proc.wait():
297             raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
298         
299         # we have to clean up afterwards
300         self.configured = True
301     
302     def refresh(self):
303         if self.configured:
304             # refresh rule
305             scope, options = self._get_ruledef()
306             command = "sudo -S netconfig refresh %s %s %s" % (self.mode, scope, options)
307             
308             (out,err),proc = server.popen_ssh_command(
309                 command,
310                 host = self.node.hostname,
311                 port = None,
312                 user = self.node.slicename,
313                 agent = None,
314                 ident_key = self.node.ident_path,
315                 server_key = self.node.server_key
316                 )
317         
318             if proc.wait():
319                 raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
320     
321     def cleanup(self):
322         if self.configured:
323             # remove rule
324             scope, options = self._get_ruledef()
325             command = "sudo -S netconfig delete %s %s" % (self.mode, scope)
326             
327             (out,err),proc = server.popen_ssh_command(
328                 command,
329                 host = self.node.hostname,
330                 port = None,
331                 user = self.node.slicename,
332                 agent = None,
333                 ident_key = self.node.ident_path,
334                 server_key = self.node.server_key
335                 )
336         
337             if proc.wait():
338                 raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
339             
340             self.configured = False
341     
342     def sync_trace(self, local_dir, whichtrace):
343         if whichtrace != 'netpipeStats':
344             raise ValueError, "Unsupported trace %s" % (whichtrace,)
345         
346         local_path = os.path.join(local_dir, "netpipe_stats_%s" % (self.mode,))
347         
348         # create parent local folders
349         proc = subprocess.Popen(
350             ["mkdir", "-p", os.path.dirname(local_path)],
351             stdout = open("/dev/null","w"),
352             stdin = open("/dev/null","r"))
353
354         if proc.wait():
355             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
356         
357         (out,err),proc = server.popen_ssh_command(
358             "echo 'Rules:' ; sudo -S netconfig show rules ; echo 'Pipes:' ; sudo -S netconfig show pipes",
359             host = self.node.hostname,
360             port = None,
361             user = self.node.slicename,
362             agent = None,
363             ident_key = self.node.ident_path,
364             server_key = self.node.server_key
365             )
366         
367         if proc.wait():
368             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
369         
370         # dump results to file
371         f = open(local_path, "wb")
372         f.write(err or "")
373         f.write(out or "")
374         f.close()
375         
376         return local_path
377