24937db24f58e3cbc202ff9e909a21c59e8574b8
[nepi.git] / src / nepi / testbeds / planetlab / interfaces.py
1 # -*- coding: utf-8 -*-
2
3 from constants import TESTBED_ID
4 import nepi.util.ipaddr2 as ipaddr2
5 import nepi.util.server as server
6 import plcapi
7 import subprocess
8 import os
9 import os.path
10 import random
11 import ipaddr
12 import functools
13
14 import tunproto
15
16 class NodeIface(object):
17     def __init__(self, api=None):
18         if not api:
19             api = plcapi.PLCAPI()
20         self._api = api
21         
22         # Attributes
23         self.primary = True
24
25         # These get initialized at configuration time
26         self.address = None
27         self.lladdr = None
28         self.netprefix = None
29         self.netmask = None
30         self.broadcast = True
31         self._interface_id = None
32
33         # These get initialized when the iface is connected to its node
34         self.node = None
35
36         # These get initialized when the iface is connected to the internet
37         self.has_internet = False
38
39     def __str__(self):
40         return "%s<ip:%s/%s up mac:%s>" % (
41             self.__class__.__name__,
42             self.address, self.netmask,
43             self.lladdr,
44         )
45     
46     __repr__ = __str__
47
48     def add_address(self, address, netprefix, broadcast):
49         raise RuntimeError, "Cannot add explicit addresses to public interface"
50     
51     def pick_iface(self, siblings):
52         """
53         Picks an interface using the PLCAPI to query information about the node.
54         
55         Needs an assigned node.
56         
57         Params:
58             siblings: other NodeIface elements attached to the same node
59         """
60         
61         if self.node is None or self.node._node_id is None:
62             raise RuntimeError, "Cannot pick interface without an assigned node"
63       
64         # HACK: SFA doesnt give the node_id!!
65         if not isinstance(self.node._node_id, int):
66             node_data = self._api.GetNodes(filters={'hostname':self.node.hostname}, fields=('node_id',))[0]
67             node_id = node_data['node_id']
68         else:
69             node_id = self.node._node_id
70
71         avail = self._api.GetInterfaces(
72             node_id=node_id, 
73             is_primary=self.primary,
74             fields=('interface_id','mac','netmask','ip') )
75         
76         used = set([sibling._interface_id for sibling in siblings
77                     if sibling._interface_id is not None])
78         
79         for candidate in avail:
80             candidate_id = candidate['interface_id']
81             if candidate_id not in used:
82                 # pick it!
83                 self._interface_id = candidate_id
84                 self.address = candidate['ip']
85                 self.lladdr = candidate['mac']
86                 self.netprefix = candidate['netmask']
87                 self.netmask = ipaddr2.ipv4_dot2mask(self.netprefix) if self.netprefix else None
88                 return
89         else:
90             raise RuntimeError, "Cannot configure interface: cannot find suitable interface in PlanetLab node"
91
92     def validate(self):
93         if not self.has_internet:
94             raise RuntimeError, "All external interface devices must be connected to the Internet"
95     
96
97 class _CrossIface(object):
98     def __init__(self, proto, addr, port, cipher):
99         self.tun_proto = proto
100         self.tun_addr = addr
101         self.tun_port = port
102         self.tun_cipher = cipher
103
104         # Attributes
105         self.address = None
106         self.netprefix = None
107         self.netmask = None
108  
109         # Cannot access cross peers
110         self.peer_proto_impl = None
111     
112     def __str__(self):
113         return "%s%r" % (
114             self.__class__.__name__,
115             ( self.tun_proto,
116               self.tun_addr,
117               self.tun_port,
118               self.tun_cipher ) 
119         )
120     
121     __repr__ = __str__
122
123 class TunIface(object):
124     _PROTO_MAP = tunproto.TUN_PROTO_MAP
125     _KIND = 'TUN'
126
127     def __init__(self, api=None):
128         if not api:
129             api = plcapi.PLCAPI()
130         self._api = api
131         
132         # Attributes
133         self.address = None
134         self.netprefix = None
135         self.netmask = None
136         
137         self.up = None
138         self.mtu = None
139         self.snat = False
140         self.txqueuelen = 1000
141         self.pointopoint = None
142         self.multicast = False
143         self.bwlimit = None
144         
145         # Enabled traces
146         self.capture = False
147
148         # These get initialized when the iface is connected to its node
149         self.node = None
150         
151         # These get initialized when the iface is connected to any filter
152         self.filter_module = None
153         self.multicast_forwarder = None
154         
155         # These get initialized when the iface is configured
156         self.external_iface = None
157         
158         # These get initialized when the iface is configured
159         # They're part of the TUN standard attribute set
160         self.tun_port = None
161         self.tun_addr = None
162         self.tun_cipher = "AES"
163         
164         # These get initialized when the iface is connected to its peer
165         self.peer_iface = None
166         self.peer_proto = None
167         self.peer_addr = None
168         self.peer_port = None
169         self.peer_proto_impl = None
170         self._delay_recover = False
171
172         # same as peer proto, but for execute-time standard attribute lookups
173         self.tun_proto = None 
174         
175         
176         # Generate an initial random cryptographic key to use for tunnelling
177         # Upon connection, both endpoints will agree on a common one based on
178         # this one.
179         self.tun_key = ( ''.join(map(chr, [ 
180                     r.getrandbits(8) 
181                     for i in xrange(32) 
182                     for r in (random.SystemRandom(),) ])
183                 ).encode("base64").strip() )        
184         
185
186     def __str__(self):
187         return "%s<ip:%s/%s %s%s%s>" % (
188             self.__class__.__name__,
189             self.address, self.netprefix,
190             " up" if self.up else " down",
191             " snat" if self.snat else "",
192             (" p2p %s" % (self.pointopoint,)) if self.pointopoint else "",
193         )
194     
195     __repr__ = __str__
196     
197     @property
198     def if_name(self):
199         if self.peer_proto_impl:
200             return self.peer_proto_impl.if_name
201
202     def if_up(self):
203         if self.peer_proto_impl:
204             return self.peer_proto_impl.if_up()
205
206     def if_down(self):
207         if self.peer_proto_impl:
208             return self.peer_proto_impl.if_down()
209
210     def routes_here(self, route):
211         """
212         Returns True if the route should be attached to this interface
213         (ie, it references a gateway in this interface's network segment)
214         """
215         if self.address and self.netprefix:
216             addr, prefix = self.address, self.netprefix
217             pointopoint = self.pointopoint
218             if not pointopoint and self.peer_iface:
219                 pointopoint = self.peer_iface.address
220             
221             if pointopoint:
222                 prefix = 32
223                 
224             dest, destprefix, nexthop, metric = route
225             
226             myNet = ipaddr.IPv4Network("%s/%d" % (addr, prefix))
227             gwIp = ipaddr.IPv4Network(nexthop)
228             
229             if pointopoint:
230                 peerIp = ipaddr.IPv4Network(pointopoint)
231                 
232                 if gwIp == peerIp:
233                     return True
234             else:
235                 if gwIp in myNet:
236                     return True
237         return False
238     
239     def add_address(self, address, netprefix, broadcast):
240         if (self.address or self.netprefix or self.netmask) is not None:
241             raise RuntimeError, "Cannot add more than one address to %s interfaces" % (self._KIND,)
242         if broadcast:
243             raise ValueError, "%s interfaces cannot broadcast in PlanetLab (%s)" % (self._KIND,broadcast)
244         
245         self.address = address
246         self.netprefix = netprefix
247         self.netmask = ipaddr2.ipv4_mask2dot(netprefix)
248     
249     def validate(self):
250         if not self.node:
251             raise RuntimeError, "Unconnected %s iface - missing node" % (self._KIND,)
252         if self.peer_iface and self.peer_proto not in self._PROTO_MAP:
253             raise RuntimeError, "Unsupported tunnelling protocol: %s" % (self.peer_proto,)
254         if not self.address or not self.netprefix or not self.netmask:
255             raise RuntimeError, "Misconfigured %s iface - missing address" % (self._KIND,)
256         if self.filter_module and self.peer_proto not in ('udp','tcp',None):
257             raise RuntimeError, "Miscofnigured TUN: %s - filtered tunnels only work with udp or tcp links" % (self,)
258         if self.tun_cipher != 'PLAIN' and self.peer_proto not in ('udp','tcp',None):
259             raise RuntimeError, "Miscofnigured TUN: %s - ciphered tunnels only work with udp or tcp links" % (self,)
260     
261     def _impl_instance(self, home_path):
262         impl = self._PROTO_MAP[self.peer_proto](
263             self, self.peer_iface, home_path, self.tun_key)
264         impl.port = self.tun_port
265         impl.cross_slice = not self.peer_iface or isinstance(self.peer_iface, _CrossIface)
266         return impl
267     
268     def recover(self):
269         if self.peer_proto:
270             self.peer_proto_impl = self._impl_instance(
271                 self._home_path)
272             self.peer_proto_impl.recover()
273         else:
274             self._delay_recover = True
275     
276     def prepare(self, home_path):
277         if not self.peer_iface and (self.peer_proto and self.peer_addr):
278             # Ad-hoc peer_iface
279             self.peer_iface = _CrossIface(
280                 self.peer_proto,
281                 self.peer_addr,
282                 self.peer_port,
283                 self.peer_cipher)
284         if self.peer_iface:
285             if not self.peer_proto_impl:
286                 self.peer_proto_impl = self._impl_instance(home_path)
287             if self._delay_recover:
288                 self.peer_proto_impl.recover()
289     
290     def launch(self):
291         if self.peer_proto_impl:
292             self.peer_proto_impl.launch()
293     
294     def cleanup(self):
295         if self.peer_proto_impl:
296             self.peer_proto_impl.shutdown()
297
298     def destroy(self):
299         if self.peer_proto_impl:
300             self.peer_proto_impl.destroy()
301             self.peer_proto_impl = None
302
303     def wait(self):
304         if self.peer_proto_impl:
305             self.peer_proto_impl.wait()
306
307     def sync_trace(self, local_dir, whichtrace, tracemap = None):
308         if self.peer_proto_impl:
309             return self.peer_proto_impl.sync_trace(local_dir, whichtrace,
310                     tracemap)
311         else:
312             return None
313
314     def remote_trace_path(self, whichtrace, tracemap = None):
315         if self.peer_proto_impl:
316             return self.peer_proto_impl.remote_trace_path(whichtrace, tracemap)
317         else:
318             return None
319
320     def remote_trace_name(self, whichtrace):
321         return whichtrace
322
323 class TapIface(TunIface):
324     _PROTO_MAP = tunproto.TAP_PROTO_MAP
325     _KIND = 'TAP'
326
327 # Yep, it does nothing - yet
328 class Internet(object):
329     def __init__(self, api=None):
330         if not api:
331             api = plcapi.PLCAPI()
332         self._api = api
333
334 class NetPipe(object):
335     def __init__(self, api=None):
336         if not api:
337             api = plcapi.PLCAPI()
338         self._api = api
339
340         # Attributes
341         self.mode = None
342         self.addrList = None
343         self.portList = None
344         
345         self.plrIn = None
346         self.bwIn = None
347         self.delayIn = None
348
349         self.plrOut = None
350         self.bwOut = None
351         self.delayOut = None
352         
353         # These get initialized when the pipe is connected to its node
354         self.node = None
355         self.configured = False
356     
357     def validate(self):
358         if not self.mode:
359             raise RuntimeError, "Undefined NetPipe mode"
360         if not self.portList:
361             raise RuntimeError, "Undefined NetPipe port list - must always define the scope"
362         if not (self.plrIn or self.bwIn or self.delayIn):
363             raise RuntimeError, "Undefined NetPipe inbound characteristics"
364         if not (self.plrOut or self.bwOut or self.delayOut):
365             raise RuntimeError, "Undefined NetPipe outbound characteristics"
366         if not self.node:
367             raise RuntimeError, "Unconnected NetPipe"
368     
369     def _add_pipedef(self, bw, plr, delay, options):
370         if delay:
371             options.extend(("delay","%dms" % (delay,)))
372         if bw:
373             options.extend(("bw","%.8fMbit/s" % (bw,)))
374         if plr:
375             options.extend(("plr","%.8f" % (plr,)))
376     
377     def _get_ruledef(self):
378         scope = "%s%s%s" % (
379             self.portList,
380             "@" if self.addrList else "",
381             self.addrList or "",
382         )
383         
384         options = []
385         if self.bwIn or self.plrIn or self.delayIn:
386             options.append("IN")
387             self._add_pipedef(self.bwIn, self.plrIn, self.delayIn, options)
388         if self.bwOut or self.plrOut or self.delayOut:
389             options.append("OUT")
390             self._add_pipedef(self.bwOut, self.plrOut, self.delayOut, options)
391         options = ' '.join(options)
392         
393         return (scope,options)
394     
395     def recover(self):
396         # Rules are safe on their nodes
397         self.configured = True
398
399     def configure(self):
400         # set up rule
401         scope, options = self._get_ruledef()
402         command = "sudo -S netconfig config %s %s %s" % (self.mode, scope, options)
403         
404         (out,err),proc = server.popen_ssh_command(
405             command,
406             host = self.node.hostname,
407             port = None,
408             user = self.node.slicename,
409             agent = None,
410             ident_key = self.node.ident_path,
411             server_key = self.node.server_key
412             )
413     
414         if proc.wait():
415             raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
416         
417         # we have to clean up afterwards
418         self.configured = True
419     
420     def refresh(self):
421         if self.configured:
422             # refresh rule
423             scope, options = self._get_ruledef()
424             command = "sudo -S netconfig refresh %s %s %s" % (self.mode, scope, options)
425             
426             (out,err),proc = server.popen_ssh_command(
427                 command,
428                 host = self.node.hostname,
429                 port = None,
430                 user = self.node.slicename,
431                 agent = None,
432                 ident_key = self.node.ident_path,
433                 server_key = self.node.server_key
434                 )
435         
436             if proc.wait():
437                 raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
438     
439     def cleanup(self):
440         if self.configured:
441             # remove rule
442             scope, options = self._get_ruledef()
443             command = "sudo -S netconfig delete %s %s" % (self.mode, scope)
444             
445             (out,err),proc = server.popen_ssh_command(
446                 command,
447                 host = self.node.hostname,
448                 port = None,
449                 user = self.node.slicename,
450                 agent = None,
451                 ident_key = self.node.ident_path,
452                 server_key = self.node.server_key
453                 )
454         
455             if proc.wait():
456                 raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
457             
458             self.configured = False
459     
460     def sync_trace(self, local_dir, whichtrace):
461         if whichtrace != 'netpipeStats':
462             raise ValueError, "Unsupported trace %s" % (whichtrace,)
463         
464         local_path = os.path.join(local_dir, "netpipe_stats_%s" % (self.mode,))
465         
466         # create parent local folders
467         proc = subprocess.Popen(
468             ["mkdir", "-p", os.path.dirname(local_path)],
469             stdout = open("/dev/null","w"),
470             stdin = open("/dev/null","r"))
471
472         if proc.wait():
473             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
474         
475         (out,err),proc = server.popen_ssh_command(
476             "echo 'Rules:' ; sudo -S netconfig show rules ; echo 'Pipes:' ; sudo -S netconfig show pipes",
477             host = self.node.hostname,
478             port = None,
479             user = self.node.slicename,
480             agent = None,
481             ident_key = self.node.ident_path,
482             server_key = self.node.server_key
483             )
484         
485         if proc.wait():
486             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
487         
488         # dump results to file
489         f = open(local_path, "wb")
490         f.write(err or "")
491         f.write(out or "")
492         f.close()
493         
494         return local_path
495     
496 class TunFilter(object):
497     _TRACEMAP = {
498         # tracename : (remotename, localname)
499     }
500     
501     def __init__(self, api=None):
502         if not api:
503             api = plcapi.PLCAPI()
504         self._api = api
505         
506         # Attributes
507         self.module = None
508         self.args = None
509
510         # These get initialised when the filter is connected
511         self.peer_guid = None
512         self.peer_proto = None
513         self.iface_guid = None
514         self.peer = None
515         self.iface = None
516     
517     def _get(what, self):
518         wref = self.iface
519         if wref:
520             wref = wref()
521         if wref:
522             return getattr(wref, what)
523         else:
524             return None
525
526     def _set(what, self, val):
527         wref = self.iface
528         if wref:
529             wref = wref()
530         if wref:
531             setattr(wref, what, val)
532     
533     tun_proto = property(
534         functools.partial(_get, 'tun_proto'),
535         functools.partial(_set, 'tun_proto') )
536     tun_addr = property(
537         functools.partial(_get, 'tun_addr'),
538         functools.partial(_set, 'tun_addr') )
539     tun_port = property(
540         functools.partial(_get, 'tun_port'),
541         functools.partial(_set, 'tun_port') )
542     tun_key = property(
543         functools.partial(_get, 'tun_key'),
544         functools.partial(_set, 'tun_key') )
545     tun_cipher = property(
546         functools.partial(_get, 'tun_cipher'),
547         functools.partial(_set, 'tun_cipher') )
548     
549     del _get
550     del _set
551
552     def remote_trace_path(self, whichtrace):
553         iface = self.iface()
554         if iface is not None:
555             return iface.remote_trace_path(whichtrace, self._TRACEMAP)
556         return None
557
558     def remote_trace_name(self, whichtrace):
559         iface = self.iface()
560         if iface is not None:
561             return iface.remote_trace_name(whichtrace, self._TRACEMAP)
562         return None
563
564     def sync_trace(self, local_dir, whichtrace):
565         iface = self.iface()
566         if iface is not None:
567             return iface.sync_trace(local_dir, whichtrace, self._TRACEMAP)
568         return None
569
570 class ClassQueueFilter(TunFilter):
571     _TRACEMAP = {
572         # tracename : (remotename, localname)
573         'dropped_stats' : ('dropped_stats', 'dropped_stats')
574     }
575     
576     def __init__(self, api=None):
577         super(ClassQueueFilter, self).__init__(api)
578         # Attributes
579         self.module = "classqueue.py"
580
581 class LoggingClassQueueFilter(ClassQueueFilter):
582     _TRACEMAP = ClassQueueFilter._TRACEMAP.copy()
583     _TRACEMAP.update({
584         # tracename : (remotename, localname)
585         'queue_stats_f'   : ('queue_stats_f', 'queue_stats_f'),
586         'queue_stats_b'   : ('queue_stats_b', 'queue_stats_b'),
587     })
588     
589     def __init__(self, api=None):
590         super(LoggingClassQueueFilter, self).__init__(api)
591         # Attributes
592         self.module = "loggingclassqueue.py classqueue.py"
593         
594     def _args_get(self):
595         # Inject outpath
596         args = dict(filter(lambda x:len(x)>1, map(lambda x:x.split('=',1),(self._args or "").split(','))))
597         args["outpath"] = "queue_stats"
598         return ",".join(map("=".join, args.iteritems()))
599     def _args_set(self, value):
600         self._args = value
601     args = property(_args_get, _args_set)
602
603 class ToSQueueFilter(TunFilter):
604     def __init__(self, api=None):
605         super(ToSQueueFilter, self).__init__(api)
606         # Attributes
607         self.module = "tosqueue.py"
608