Fix metadata breakage from recent commit
[nepi.git] / src / nepi / util / tunchannel_impl.py
1 import os
2 import sys
3 import random
4 import threading
5 import socket
6 import select
7 import weakref
8 import time
9
10 from tunchannel import tun_fwd, udp_establish, tcp_establish
11
12 class TunChannel(object):
13     """
14     Helper box class that implements most of the required boilerplate
15     for tunnelling cross connections.
16     
17     The class implements a threaded forwarder that runs in the
18     testbed controller process. It takes several parameters that
19     can be given by directly setting attributes:
20     
21         tun_port/addr/proto/cipher: information about the local endpoint.
22             The addresses here should be externally-reachable,
23             since when listening or when using the UDP protocol,
24             connections to this address/port will be attempted
25             by remote endpoitns.
26         
27         peer_port/addr/proto/cipher: information about the remote endpoint.
28             Usually, you set these when the cross connection 
29             initializer/completion functions are invoked (both).
30         
31         tun_key: the agreed upon encryption key.
32         
33         with_pi: set if the incoming packet stream (see tun_socket)
34             contains PI headers - if so, they will be stripped.
35         
36         ethernet_mode: set if the incoming packet stream is
37             composed of ethernet frames (as opposed of IP packets).
38         
39         tun_socket: a socket or file object that can be read
40             from and written to. Packets will be read when available,
41             remote packets will be forwarded as writes.
42             A socket should be of type SOCK_SEQPACKET (or SOCK_DGRAM
43             if not possible), a file object should preserve packet
44             boundaries (ie, a pipe or TUN/TAP device file descriptor).
45         
46         trace_target: a file object where trace output will be sent.
47             It cannot be changed after launch.
48             By default, it's sys.stderr
49     """
50     
51     def __init__(self):
52         # Some operational attributes
53         self.ethernet_mode = True
54         self.with_pi = False
55         
56         # These get initialized when the channel is configured
57         # They're part of the TUN standard attribute set
58         self.tun_port = None
59         self.tun_addr = None
60         self.tun_cipher = 'AES'
61         
62         # These get initialized when the channel is connected to its peer
63         self.peer_proto = None
64         self.peer_addr = None
65         self.peer_port = None
66         self.peer_cipher = None
67         
68         # These get initialized when the channel is connected to its iface
69         self.tun_socket = None
70
71         # same as peer proto, but for execute-time standard attribute lookups
72         self.tun_proto = None 
73         
74         # some state
75         self.prepared = False
76         self._terminate = [] # terminate signaller
77         self._exc = [] # exception store, to relay exceptions from the forwarder thread
78         self._connected = threading.Event()
79         self._forwarder_thread = None
80        
81         # trace to stderr
82         self.stderr = sys.stderr
83         
84         # Generate an initial random cryptographic key to use for tunnelling
85         # Upon connection, both endpoints will agree on a common one based on
86         # this one.
87         self.tun_key = ( ''.join(map(chr, [ 
88                     r.getrandbits(8) 
89                     for i in xrange(32) 
90                     for r in (random.SystemRandom(),) ])
91                 ).encode("base64").strip() )        
92         
93
94     def __str__(self):
95         return "%s<%s %s:%s %s %s:%s %s>" % (
96             self.__class__.__name__,
97             self.tun_proto, 
98             self.tun_addr, self.tun_port,
99             self.peer_proto, 
100             self.peer_addr, self.peer_port,
101             self.tun_cipher,
102         )
103
104     def launch(self):
105         # self.tun_proto is only set if the channel is connected
106         # launch has to be a no-op in unconnected channels because
107         # it is called at configuration time, which for cross connections
108         # happens before connection.
109         if self.tun_proto:
110             if not self._forwarder_thread:
111                 self._launch()
112     
113     def cleanup(self):
114         if self._forwarder_thread:
115             self.kill()
116
117     def wait(self):
118         if self._forwarder_thread:
119             self._connected.wait()
120             for exc in self._exc:
121                 # Relay exception
122                 eTyp, eVal, eLoc = exc
123                 raise eTyp, eVal, eLoc
124
125     def kill(self):    
126         if self._forwarder_thread:
127             if not self._terminate:
128                 self._terminate.append(None)
129             self._forwarder_thread.join()
130
131     def _launch(self):
132         # Launch forwarder thread with a weak reference
133         # to self, so that we don't create any strong cycles
134         # and automatic refcounting works as expected
135         self._forwarder_thread = threading.Thread(
136             target = self._forwarder,
137             args = (weakref.ref(self),) )
138         self._forwarder_thread.start()
139
140     @staticmethod
141     def _forwarder(weak_self):
142         try:
143             weak_self().__forwarder(weak_self)
144         except:
145             self = weak_self()
146             
147             # store exception and wake up anyone waiting
148             self._exc.append(sys.exc_info())
149             self._connected.set()
150     
151     @staticmethod
152     def __forwarder(weak_self):
153         # grab strong reference
154         self = weak_self()
155         if not self:
156             return
157         
158         peer_port = self.peer_port
159         peer_addr = self.peer_addr
160         peer_proto= self.peer_proto
161         peer_cipher=self.peer_cipher
162
163         local_port = self.tun_port
164         local_addr = self.tun_addr
165         local_proto = self.tun_proto
166         local_cipher= self.tun_cipher
167         
168         stderr = self.stderr
169         ether_mode = self.ethernet_mode
170         with_pi = self.with_pi
171         
172         if local_proto != peer_proto:
173             raise RuntimeError, "Peering protocol mismatch: %s != %s" % (local_proto, peer_proto)
174
175         if local_cipher != peer_cipher:
176             raise RuntimeError, "Peering cipher mismatch: %s != %s" % (local_cipher, peer_cipher)
177         
178         if not peer_port or not peer_addr:
179             raise RuntimeError, "Misconfigured peer for: %s" % (self,)
180
181         if not local_port or not local_addr:
182             raise RuntimeError, "Misconfigured TUN: %s" % (self,)
183         
184         TERMINATE = self._terminate
185         cipher_key = self.tun_key
186         tun = self.tun_socket
187         udp = local_proto == 'udp'
188
189         if not tun:
190             raise RuntimeError, "Unconnected TUN channel %s" % (self,)
191
192         if local_proto == 'udp':
193             rsock = udp_establish(TERMINATE, local_addr, local_port, 
194                     peer_addr, peer_port)
195             remote = os.fdopen(rsock.fileno(), 'r+b', 0)
196         elif local_proto == 'tcp':
197             rsock = tcp_establish(TERMINATE, local_addr, local_port,
198                     peer_addr, peer_port)
199             remote = os.fdopen(rsock.fileno(), 'r+b', 0)
200         else:
201             raise RuntimeError, "Bad protocol for %s: %r" % (self,local_proto)
202
203         # notify that we're ready
204         self._connected.set()
205         
206         # drop strong reference
207         del self
208         
209         tun_fwd(tun, remote,
210             with_pi = with_pi, 
211             ether_mode = ether_mode, 
212             cipher_key = cipher_key, 
213             udp = udp, 
214             TERMINATE = TERMINATE,
215             stderr = stderr,
216             cipher = local_cipher
217         )
218         
219         tun.close()
220         remote.close()
221
222
223 def create_tunchannel(testbed_instance, guid, devnull = []):
224     """
225     TunChannel factory for metadata.
226     By default, silences traceing.
227     
228     You can override the created element's attributes if you will.
229     """
230     if not devnull:
231         # just so it's not open if not needed
232         devnull.append(open("/dev/null","w"))
233     element = TunChannel()
234     element.stderr = devnull[0] # silence tracing
235     testbed_instance._elements[guid] = element
236
237 def preconfigure_tunchannel(testbed_instance, guid):
238     """
239     TunChannel preconfiguration.
240     
241     It initiates the forwarder thread for listening tcp channels.
242     
243     Takes the public address from the operating system, so it should be adequate
244     for most situations when the TunChannel forwarder thread runs in the same
245     process as the testbed controller.
246     """
247     element = testbed_instance._elements[guid]
248     
249     # Find external interface, if any
250     public_addr = os.popen(
251         "/sbin/ifconfig "
252         "| grep $(ip route | grep default | awk '{print $3}' "
253                 "| awk -F. '{print $1\"[.]\"$2}') "
254         "| head -1 | awk '{print $2}' "
255         "| awk -F : '{print $2}'").read().rstrip()
256     element.tun_addr = public_addr
257
258     # Set standard TUN attributes
259     if not element.tun_port and element.tun_addr:
260         element.tun_port = 15000 + int(guid)
261
262 def postconfigure_tunchannel(testbed_instance, guid):
263     """
264     TunChannel preconfiguration.
265     
266     Initiates the forwarder thread for connecting tcp channels or 
267     udp channels in general.
268     
269     Should be adequate for most implementations.
270     """
271     element = testbed_instance._elements[guid]
272    
273     element.launch()
274
275 def crossconnect_tunchannel_peer_init(proto, testbed_instance, tun_guid, peer_data,
276         preconfigure_tunchannel = preconfigure_tunchannel):
277     """
278     Cross-connection initialization.
279     Should be adequate for most implementations.
280     
281     For use in metadata, bind the first "proto" argument with the connector type. Eg:
282     
283         conn_init = functools.partial(crossconnect_tunchannel_peer_init, "tcp")
284     
285     If you don't use the stock preconfigure function, specify your own as a keyword argument.
286     """
287     tun = testbed_instance._elements[tun_guid]
288     tun.peer_addr = peer_data.get("tun_addr")
289     tun.peer_proto = peer_data.get("tun_proto") or proto
290     tun.peer_port = peer_data.get("tun_port")
291     tun.peer_cipher = peer_data.get("tun_cipher")
292     tun.tun_key = min(tun.tun_key, peer_data.get("tun_key"))
293     tun.tun_proto = proto
294   
295     preconfigure_tunchannel(testbed_instance, tun_guid)
296
297 def crossconnect_tunchannel_peer_compl(proto, testbed_instance, tun_guid, peer_data,
298         postconfigure_tunchannel = postconfigure_tunchannel):
299     """
300     Cross-connection completion.
301     Should be adequeate for most implementations.
302     
303     For use in metadata, bind the first "proto" argument with the connector type. Eg:
304     
305         conn_init = functools.partial(crossconnect_tunchannel_peer_compl, "tcp")
306     
307     If you don't use the stock postconfigure function, specify your own as a keyword argument.
308     """
309     # refresh (refreshable) attributes for second-phase
310     tun = testbed_instance._elements[tun_guid]
311     tun.peer_addr = peer_data.get("tun_addr")
312     tun.peer_proto = peer_data.get("tun_proto") or proto
313     tun.peer_port = peer_data.get("tun_port")
314     tun.peer_cipher = peer_data.get("tun_cipher")
315    
316     postconfigure_tunchannel(testbed_instance, tun_guid)
317
318 def prestart_tunchannel(testbed_instance, guid):
319     """
320     Wait for the channel forwarder to be up and running.
321     
322     Useful as a pre-start function to assure proper startup synchronization,
323     be certain to start TunChannels before applications that might require them.
324     """
325     element = testbed_instance.elements[guid]
326     element.wait()
327