Multicast forwarding KINDA working
[nepi.git] / src / nepi / testbeds / planetlab / interfaces.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 import nepi.util.ipaddr2 as ipaddr2
6 import nepi.util.server as server
7 import plcapi
8 import subprocess
9 import os
10 import os.path
11 import random
12 import ipaddr
13 import functools
14
15 import tunproto
16
17 class NodeIface(object):
18     def __init__(self, api=None):
19         if not api:
20             api = plcapi.PLCAPI()
21         self._api = api
22         
23         # Attributes
24         self.primary = True
25
26         # These get initialized at configuration time
27         self.address = None
28         self.lladdr = None
29         self.netprefix = None
30         self.netmask = None
31         self.broadcast = True
32         self._interface_id = None
33
34         # These get initialized when the iface is connected to its node
35         self.node = None
36
37         # These get initialized when the iface is connected to the internet
38         self.has_internet = False
39
40     def __str__(self):
41         return "%s<ip:%s/%s up mac:%s>" % (
42             self.__class__.__name__,
43             self.address, self.netmask,
44             self.lladdr,
45         )
46     
47     __repr__ = __str__
48
49     def add_address(self, address, netprefix, broadcast):
50         raise RuntimeError, "Cannot add explicit addresses to public interface"
51     
52     def pick_iface(self, siblings):
53         """
54         Picks an interface using the PLCAPI to query information about the node.
55         
56         Needs an assigned node.
57         
58         Params:
59             siblings: other NodeIface elements attached to the same node
60         """
61         
62         if self.node is None or self.node._node_id is None:
63             raise RuntimeError, "Cannot pick interface without an assigned node"
64         
65         avail = self._api.GetInterfaces(
66             node_id=self.node._node_id, 
67             is_primary=self.primary,
68             fields=('interface_id','mac','netmask','ip') )
69         
70         used = set([sibling._interface_id for sibling in siblings
71                     if sibling._interface_id is not None])
72         
73         for candidate in avail:
74             candidate_id = candidate['interface_id']
75             if candidate_id not in used:
76                 # pick it!
77                 self._interface_id = candidate_id
78                 self.address = candidate['ip']
79                 self.lladdr = candidate['mac']
80                 self.netprefix = candidate['netmask']
81                 self.netmask = ipaddr2.ipv4_dot2mask(self.netprefix) if self.netprefix else None
82                 return
83         else:
84             raise RuntimeError, "Cannot configure interface: cannot find suitable interface in PlanetLab node"
85
86     def validate(self):
87         if not self.has_internet:
88             raise RuntimeError, "All external interface devices must be connected to the Internet"
89     
90
91 class _CrossIface(object):
92     def __init__(self, proto, addr, port, cipher):
93         self.tun_proto = proto
94         self.tun_addr = addr
95         self.tun_port = port
96         self.tun_cipher = cipher
97         
98         # Cannot access cross peers
99         self.peer_proto_impl = None
100     
101     def __str__(self):
102         return "%s%r" % (
103             self.__class__.__name__,
104             ( self.tun_proto,
105               self.tun_addr,
106               self.tun_port,
107               self.tun_cipher ) 
108         )
109     
110     __repr__ = __str__
111
112 class TunIface(object):
113     _PROTO_MAP = tunproto.TUN_PROTO_MAP
114     _KIND = 'TUN'
115
116     def __init__(self, api=None):
117         if not api:
118             api = plcapi.PLCAPI()
119         self._api = api
120         
121         # Attributes
122         self.address = None
123         self.netprefix = None
124         self.netmask = None
125         
126         self.up = None
127         self.mtu = None
128         self.snat = False
129         self.txqueuelen = None
130         self.pointopoint = None
131         self.multicast = False
132         self.bwlimit = None
133         
134         # Enabled traces
135         self.capture = False
136
137         # These get initialized when the iface is connected to its node
138         self.node = None
139         
140         # These get initialized when the iface is connected to any filter
141         self.filter_module = None
142         self.multicast_forwarder = None
143         
144         # These get initialized when the iface is configured
145         self.external_iface = None
146         
147         # These get initialized when the iface is configured
148         # They're part of the TUN standard attribute set
149         self.tun_port = None
150         self.tun_addr = None
151         self.tun_cipher = "AES"
152         
153         # These get initialized when the iface is connected to its peer
154         self.peer_iface = None
155         self.peer_proto = None
156         self.peer_addr = None
157         self.peer_port = None
158         self.peer_proto_impl = None
159         self._delay_recover = False
160
161         # same as peer proto, but for execute-time standard attribute lookups
162         self.tun_proto = None 
163         
164         
165         # Generate an initial random cryptographic key to use for tunnelling
166         # Upon connection, both endpoints will agree on a common one based on
167         # this one.
168         self.tun_key = ( ''.join(map(chr, [ 
169                     r.getrandbits(8) 
170                     for i in xrange(32) 
171                     for r in (random.SystemRandom(),) ])
172                 ).encode("base64").strip() )        
173         
174
175     def __str__(self):
176         return "%s<ip:%s/%s %s%s%s>" % (
177             self.__class__.__name__,
178             self.address, self.netprefix,
179             " up" if self.up else " down",
180             " snat" if self.snat else "",
181             (" p2p %s" % (self.pointopoint,)) if self.pointopoint else "",
182         )
183     
184     __repr__ = __str__
185     
186     @property
187     def if_name(self):
188         if self.peer_proto_impl:
189             return self.peer_proto_impl.if_name
190
191     def routes_here(self, route):
192         """
193         Returns True if the route should be attached to this interface
194         (ie, it references a gateway in this interface's network segment)
195         """
196         if self.address and self.netprefix:
197             addr, prefix = self.address, self.netprefix
198             pointopoint = self.pointopoint
199             if not pointopoint:
200                 pointopoint = self.peer_iface.address
201             
202             if pointopoint:
203                 prefix = 32
204                 
205             dest, destprefix, nexthop, metric = route
206             
207             myNet = ipaddr.IPNetwork("%s/%d" % (addr, prefix))
208             gwIp = ipaddr.IPNetwork(nexthop)
209             
210             if pointopoint:
211                 peerIp = ipaddr.IPNetwork(pointopoint)
212                 
213                 if gwIp == peerIp:
214                     return True
215             else:
216                 if gwIp in myNet:
217                     return True
218         return False
219     
220     def add_address(self, address, netprefix, broadcast):
221         if (self.address or self.netprefix or self.netmask) is not None:
222             raise RuntimeError, "Cannot add more than one address to %s interfaces" % (self._KIND,)
223         if broadcast:
224             raise ValueError, "%s interfaces cannot broadcast in PlanetLab (%s)" % (self._KIND,broadcast)
225         
226         self.address = address
227         self.netprefix = netprefix
228         self.netmask = ipaddr2.ipv4_mask2dot(netprefix)
229     
230     def validate(self):
231         if not self.node:
232             raise RuntimeError, "Unconnected %s iface - missing node" % (self._KIND,)
233         if self.peer_iface and self.peer_proto not in self._PROTO_MAP:
234             raise RuntimeError, "Unsupported tunnelling protocol: %s" % (self.peer_proto,)
235         if not self.address or not self.netprefix or not self.netmask:
236             raise RuntimeError, "Misconfigured %s iface - missing address" % (self._KIND,)
237         if self.filter_module and self.peer_proto not in ('udp','tcp',None):
238             raise RuntimeError, "Miscofnigured TUN: %s - filtered tunnels only work with udp or tcp links" % (self,)
239         if self.tun_cipher != 'PLAIN' and self.peer_proto not in ('udp','tcp',None):
240             raise RuntimeError, "Miscofnigured TUN: %s - ciphered tunnels only work with udp or tcp links" % (self,)
241     
242     def _impl_instance(self, home_path, listening):
243         impl = self._PROTO_MAP[self.peer_proto](
244             self, self.peer_iface, home_path, self.tun_key, listening)
245         impl.port = self.tun_port
246         impl.cross_slice = not self.peer_iface or isinstance(self.peer_iface, _CrossIface)
247         return impl
248     
249     def recover(self):
250         if self.peer_proto:
251             self.peer_proto_impl = self._impl_instance(
252                 self._home_path,
253                 False) # no way to know, no need to know
254             self.peer_proto_impl.recover()
255         else:
256             self._delay_recover = True
257     
258     def prepare(self, home_path, listening):
259         if not self.peer_iface and (self.peer_proto and (listening or (self.peer_addr and self.peer_port))):
260             # Ad-hoc peer_iface
261             self.peer_iface = _CrossIface(
262                 self.peer_proto,
263                 self.peer_addr,
264                 self.peer_port,
265                 self.peer_cipher)
266         if self.peer_iface:
267             if not self.peer_proto_impl:
268                 self.peer_proto_impl = self._impl_instance(home_path, listening)
269             if self._delay_recover:
270                 self.peer_proto_impl.recover()
271             else:
272                 self.peer_proto_impl.prepare()
273     
274     def setup(self):
275         if self.peer_proto_impl:
276             self.peer_proto_impl.setup()
277     
278     def cleanup(self):
279         if self.peer_proto_impl:
280             self.peer_proto_impl.shutdown()
281
282     def destroy(self):
283         if self.peer_proto_impl:
284             self.peer_proto_impl.destroy()
285             self.peer_proto_impl = None
286
287     def async_launch_wait(self):
288         if self.peer_proto_impl:
289             self.peer_proto_impl.async_launch_wait()
290
291     def sync_trace(self, local_dir, whichtrace, tracemap = None):
292         if self.peer_proto_impl:
293             return self.peer_proto_impl.sync_trace(local_dir, whichtrace,
294                     tracemap)
295         else:
296             return None
297
298     def remote_trace_path(self, whichtrace, tracemap = None):
299         if self.peer_proto_impl:
300             return self.peer_proto_impl.remote_trace_path(whichtrace, tracemap)
301         else:
302             return None
303
304     def remote_trace_name(self, whichtrace):
305         return whichtrace
306
307 class TapIface(TunIface):
308     _PROTO_MAP = tunproto.TAP_PROTO_MAP
309     _KIND = 'TAP'
310
311 # Yep, it does nothing - yet
312 class Internet(object):
313     def __init__(self, api=None):
314         if not api:
315             api = plcapi.PLCAPI()
316         self._api = api
317
318 class NetPipe(object):
319     def __init__(self, api=None):
320         if not api:
321             api = plcapi.PLCAPI()
322         self._api = api
323
324         # Attributes
325         self.mode = None
326         self.addrList = None
327         self.portList = None
328         
329         self.plrIn = None
330         self.bwIn = None
331         self.delayIn = None
332
333         self.plrOut = None
334         self.bwOut = None
335         self.delayOut = None
336         
337         # These get initialized when the pipe is connected to its node
338         self.node = None
339         self.configured = False
340     
341     def validate(self):
342         if not self.mode:
343             raise RuntimeError, "Undefined NetPipe mode"
344         if not self.portList:
345             raise RuntimeError, "Undefined NetPipe port list - must always define the scope"
346         if not (self.plrIn or self.bwIn or self.delayIn):
347             raise RuntimeError, "Undefined NetPipe inbound characteristics"
348         if not (self.plrOut or self.bwOut or self.delayOut):
349             raise RuntimeError, "Undefined NetPipe outbound characteristics"
350         if not self.node:
351             raise RuntimeError, "Unconnected NetPipe"
352     
353     def _add_pipedef(self, bw, plr, delay, options):
354         if delay:
355             options.extend(("delay","%dms" % (delay,)))
356         if bw:
357             options.extend(("bw","%.8fMbit/s" % (bw,)))
358         if plr:
359             options.extend(("plr","%.8f" % (plr,)))
360     
361     def _get_ruledef(self):
362         scope = "%s%s%s" % (
363             self.portList,
364             "@" if self.addrList else "",
365             self.addrList or "",
366         )
367         
368         options = []
369         if self.bwIn or self.plrIn or self.delayIn:
370             options.append("IN")
371             self._add_pipedef(self.bwIn, self.plrIn, self.delayIn, options)
372         if self.bwOut or self.plrOut or self.delayOut:
373             options.append("OUT")
374             self._add_pipedef(self.bwOut, self.plrOut, self.delayOut, options)
375         options = ' '.join(options)
376         
377         return (scope,options)
378     
379     def recover(self):
380         # Rules are safe on their nodes
381         self.configured = True
382
383     def configure(self):
384         # set up rule
385         scope, options = self._get_ruledef()
386         command = "sudo -S netconfig config %s %s %s" % (self.mode, scope, options)
387         
388         (out,err),proc = server.popen_ssh_command(
389             command,
390             host = self.node.hostname,
391             port = None,
392             user = self.node.slicename,
393             agent = None,
394             ident_key = self.node.ident_path,
395             server_key = self.node.server_key
396             )
397     
398         if proc.wait():
399             raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
400         
401         # we have to clean up afterwards
402         self.configured = True
403     
404     def refresh(self):
405         if self.configured:
406             # refresh rule
407             scope, options = self._get_ruledef()
408             command = "sudo -S netconfig refresh %s %s %s" % (self.mode, scope, options)
409             
410             (out,err),proc = server.popen_ssh_command(
411                 command,
412                 host = self.node.hostname,
413                 port = None,
414                 user = self.node.slicename,
415                 agent = None,
416                 ident_key = self.node.ident_path,
417                 server_key = self.node.server_key
418                 )
419         
420             if proc.wait():
421                 raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
422     
423     def cleanup(self):
424         if self.configured:
425             # remove rule
426             scope, options = self._get_ruledef()
427             command = "sudo -S netconfig delete %s %s" % (self.mode, scope)
428             
429             (out,err),proc = server.popen_ssh_command(
430                 command,
431                 host = self.node.hostname,
432                 port = None,
433                 user = self.node.slicename,
434                 agent = None,
435                 ident_key = self.node.ident_path,
436                 server_key = self.node.server_key
437                 )
438         
439             if proc.wait():
440                 raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
441             
442             self.configured = False
443     
444     def sync_trace(self, local_dir, whichtrace):
445         if whichtrace != 'netpipeStats':
446             raise ValueError, "Unsupported trace %s" % (whichtrace,)
447         
448         local_path = os.path.join(local_dir, "netpipe_stats_%s" % (self.mode,))
449         
450         # create parent local folders
451         proc = subprocess.Popen(
452             ["mkdir", "-p", os.path.dirname(local_path)],
453             stdout = open("/dev/null","w"),
454             stdin = open("/dev/null","r"))
455
456         if proc.wait():
457             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
458         
459         (out,err),proc = server.popen_ssh_command(
460             "echo 'Rules:' ; sudo -S netconfig show rules ; echo 'Pipes:' ; sudo -S netconfig show pipes",
461             host = self.node.hostname,
462             port = None,
463             user = self.node.slicename,
464             agent = None,
465             ident_key = self.node.ident_path,
466             server_key = self.node.server_key
467             )
468         
469         if proc.wait():
470             raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
471         
472         # dump results to file
473         f = open(local_path, "wb")
474         f.write(err or "")
475         f.write(out or "")
476         f.close()
477         
478         return local_path
479     
480 class TunFilter(object):
481     _TRACEMAP = {
482         # tracename : (remotename, localname)
483     }
484     
485     def __init__(self, api=None):
486         if not api:
487             api = plcapi.PLCAPI()
488         self._api = api
489         
490         # Attributes
491         self.module = None
492         self.args = None
493
494         # These get initialised when the filter is connected
495         self.peer_guid = None
496         self.peer_proto = None
497         self.iface_guid = None
498         self.peer = None
499         self.iface = None
500     
501     def _get(what, self):
502         wref = self.iface
503         if wref:
504             wref = wref()
505         if wref:
506             return getattr(wref, what)
507         else:
508             return None
509
510     def _set(what, self, val):
511         wref = self.iface
512         if wref:
513             wref = wref()
514         if wref:
515             setattr(wref, what, val)
516     
517     tun_proto = property(
518         functools.partial(_get, 'tun_proto'),
519         functools.partial(_set, 'tun_proto') )
520     tun_addr = property(
521         functools.partial(_get, 'tun_addr'),
522         functools.partial(_set, 'tun_addr') )
523     tun_port = property(
524         functools.partial(_get, 'tun_port'),
525         functools.partial(_set, 'tun_port') )
526     tun_key = property(
527         functools.partial(_get, 'tun_key'),
528         functools.partial(_set, 'tun_key') )
529     tun_cipher = property(
530         functools.partial(_get, 'tun_cipher'),
531         functools.partial(_set, 'tun_cipher') )
532     
533     del _get
534     del _set
535
536     def remote_trace_path(self, whichtrace):
537         iface = self.iface()
538         if iface is not None:
539             return iface.remote_trace_path(whichtrace, self._TRACEMAP)
540         return None
541
542     def remote_trace_name(self, whichtrace):
543         iface = self.iface()
544         if iface is not None:
545             return iface.remote_trace_name(whichtrace, self._TRACEMAP)
546         return None
547
548     def sync_trace(self, local_dir, whichtrace):
549         iface = self.iface()
550         if iface is not None:
551             return iface.sync_trace(local_dir, whichtrace, self._TRACEMAP)
552         return None
553
554 class ClassQueueFilter(TunFilter):
555     _TRACEMAP = {
556         # tracename : (remotename, localname)
557         'dropped_stats' : ('dropped_stats', 'dropped_stats')
558     }
559     
560     def __init__(self, api=None):
561         super(ClassQueueFilter, self).__init__(api)
562         # Attributes
563         self.module = "classqueue.py"
564
565 class ToSQueueFilter(TunFilter):
566     def __init__(self, api=None):
567         super(ToSQueueFilter, self).__init__(api)
568         # Attributes
569         self.module = "tosqueue.py"
570