Fix typo
[nepi.git] / src / nepi / util / tunchannel_impl.py
1 import os
2 import sys
3 import random
4 import threading
5 import socket
6 import select
7 import weakref
8 import time
9
10 from tunchannel import tun_fwd
11
12 class TunChannel(object):
13     """
14     Helper box class that implements most of the required boilerplate
15     for tunnelling cross connections.
16     
17     The class implements a threaded forwarder that runs in the
18     testbed controller process. It takes several parameters that
19     can be given by directly setting attributes:
20     
21         tun_port/addr/proto: information about the local endpoint.
22             The addresses here should be externally-reachable,
23             since when listening or when using the UDP protocol,
24             connections to this address/port will be attempted
25             by remote endpoitns.
26         
27         peer_port/addr/proto: information about the remote endpoint.
28             Usually, you set these when the cross connection 
29             initializer/completion functions are invoked (both).
30         
31         tun_key: the agreed upon encryption key.
32         
33         listen: if set to True (and in TCP mode), it marks a
34             listening endpoint. Be certain that any TCP connection
35             is made between a listening and a non-listening
36             endpoint, or it won't work.
37         
38         with_pi: set if the incoming packet stream (see tun_socket)
39             contains PI headers - if so, they will be stripped.
40         
41         ethernet_mode: set if the incoming packet stream is
42             composed of ethernet frames (as opposed of IP packets).
43         
44         udp: set to use UDP datagrams instead of TCP connections.
45         
46         tun_socket: a socket or file object that can be read
47             from and written to. Packets will be read when available,
48             remote packets will be forwarded as writes.
49             A socket should be of type SOCK_SEQPACKET (or SOCK_DGRAM
50             if not possible), a file object should preserve packet
51             boundaries (ie, a pipe or TUN/TAP device file descriptor).
52         
53         trace_target: a file object where trace output will be sent.
54             It cannot be changed after launch.
55             By default, it's sys.stderr
56     """
57     
58     def __init__(self):
59         # Some operational attributes
60         self.listen = False
61         self.ethernet_mode = True
62         self.with_pi = False
63         
64         # These get initialized when the channel is configured
65         # They're part of the TUN standard attribute set
66         self.tun_port = None
67         self.tun_addr = None
68         
69         # These get initialized when the channel is connected to its peer
70         self.peer_proto = None
71         self.peer_addr = None
72         self.peer_port = None
73         
74         # These get initialized when the channel is connected to its iface
75         self.tun_socket = None
76
77         # same as peer proto, but for execute-time standard attribute lookups
78         self.tun_proto = None 
79         
80         # some state
81         self.prepared = False
82         self._terminate = [] # terminate signaller
83         self._exc = [] # exception store, to relay exceptions from the forwarder thread
84         self._connected = threading.Event()
85         self._forwarder_thread = None
86         
87         # trace to stderr
88         self.stderr = sys.stderr
89         
90         # Generate an initial random cryptographic key to use for tunnelling
91         # Upon connection, both endpoints will agree on a common one based on
92         # this one.
93         self.tun_key = ( ''.join(map(chr, [ 
94                     r.getrandbits(8) 
95                     for i in xrange(32) 
96                     for r in (random.SystemRandom(),) ])
97                 ).encode("base64").strip() )        
98         
99
100     def __str__(self):
101         return "%s<%s %s:%s %s %s:%s>" % (
102             self.__class__.__name__,
103             self.tun_proto, 
104             self.tun_addr, self.tun_port,
105             self.peer_proto, 
106             self.peer_addr, self.peer_port,
107         )
108
109     def Prepare(self):
110         if self.tun_proto:
111             udp = self.tun_proto == "udp"
112             if not udp and self.listen and not self._forwarder_thread:
113                 if self.listen or (self.peer_addr and self.peer_port and self.peer_proto):
114                     self._launch()
115     
116     def Setup(self):
117         if self.tun_proto:
118             if not self._forwarder_thread:
119                 self._launch()
120     
121     def Cleanup(self):
122         if self._forwarder_thread:
123             self.Kill()
124
125     def Wait(self):
126         if self._forwarder_thread:
127             self._connected.wait()
128             for exc in self._exc:
129                 # Relay exception
130                 eTyp, eVal, eLoc = exc
131                 raise eTyp, eVal, eLoc
132
133     def Kill(self):    
134         if self._forwarder_thread:
135             if not self._terminate:
136                 self._terminate.append(None)
137             self._forwarder_thread.join()
138
139     def _launch(self):
140         # Launch forwarder thread with a weak reference
141         # to self, so that we don't create any strong cycles
142         # and automatic refcounting works as expected
143         self._forwarder_thread = threading.Thread(
144             target = self._forwarder,
145             args = (weakref.ref(self),) )
146         self._forwarder_thread.start()
147
148     @staticmethod
149     def _forwarder(weak_self):
150         try:
151             weak_self().__forwarder(weak_self)
152         except:
153             self = weak_self()
154             
155             # store exception and wake up anyone waiting
156             self._exc.append(sys.exc_info())
157             self._connected.set()
158     
159     @staticmethod
160     def __forwarder(weak_self):
161         # grab strong reference
162         self = weak_self()
163         if not self:
164             return
165         
166         peer_port = self.peer_port
167         peer_addr = self.peer_addr
168         peer_proto= self.peer_proto
169
170         local_port = self.tun_port
171         local_addr = self.tun_addr
172         local_proto = self.tun_proto
173         
174         stderr = self.stderr
175         ether_mode = self.ethernet_mode
176         with_pi = self.with_pi
177         
178         if local_proto != peer_proto:
179             raise RuntimeError, "Peering protocol mismatch: %s != %s" % (local_proto, peer_proto)
180         
181         udp = local_proto == 'udp'
182         listen = self.listen
183
184         if (udp or not listen) and (not peer_port or not peer_addr):
185             raise RuntimeError, "Misconfigured peer for: %s" % (self,)
186
187         if (udp or listen) and (not local_port or not local_addr):
188             raise RuntimeError, "Misconfigured TUN: %s" % (self,)
189         
190         TERMINATE = self._terminate
191         cipher_key = self.tun_key
192         tun = self.tun_socket
193         
194         if not tun:
195             raise RuntimeError, "Unconnected TUN channel %s" % (self,)
196         
197         if udp:
198             # listen on udp port
199             rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
200             for i in xrange(30):
201                 try:
202                     rsock.bind((local_addr,local_port))
203                     break
204                 except socket.error:
205                     # wait a while, retry
206                     time.sleep(1)
207             else:
208                 rsock.bind((local_addr,local_port))
209             rsock.connect((peer_addr,peer_port))
210             remote = os.fdopen(rsock.fileno(), 'r+b', 0)
211         elif listen:
212             # accept tcp connections
213             lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
214             for i in xrange(30):
215                 try:
216                     lsock.bind((local_addr,local_port))
217                     break
218                 except socket.error:
219                     # wait a while, retry
220                     time.sleep(1)
221             else:
222                 lsock.bind((local_addr,local_port))
223             lsock.listen(1)
224             rsock,raddr = lsock.accept()
225             remote = os.fdopen(rsock.fileno(), 'r+b', 0)
226         else:
227             # connect to tcp server
228             rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
229             for i in xrange(30):
230                 try:
231                     rsock.connect((peer_addr,peer_port))
232                     break
233                 except socket.error:
234                     # wait a while, retry
235                     time.sleep(1)
236             else:
237                 rsock.connect((peer_addr,peer_port))
238             remote = os.fdopen(rsock.fileno(), 'r+b', 0)
239         
240         # notify that we're ready
241         self._connected.set()
242         
243         # drop strong reference
244         del self
245         
246         tun_fwd(tun, remote,
247             with_pi = with_pi, 
248             ether_mode = ether_mode, 
249             cipher_key = cipher_key, 
250             udp = udp, 
251             TERMINATE = TERMINATE,
252             stderr = stderr
253         )
254         
255         tun.close()
256         remote.close()
257
258
259 def create_tunchannel(testbed_instance, guid, devnull = []):
260     """
261     TunChannel factory for metadata.
262     By default, silences traceing.
263     
264     You can override the created element's attributes if you will.
265     """
266     if not devnull:
267         # just so it's not open if not needed
268         devnull.append(open("/dev/null","w"))
269     element = TunChannel()
270     element.stderr = devnull[0] # silence tracing
271     testbed_instance._elements[guid] = element
272
273 def preconfigure_tunchannel(testbed_instance, guid):
274     """
275     TunChannel preconfiguration.
276     
277     It initiates the forwarder thread for listening tcp channels.
278     
279     Takes the public address from the operating system, so it should be adequate
280     for most situations when the TunChannel forwarder thread runs in the same
281     process as the testbed controller.
282     """
283     element = testbed_instance._elements[guid]
284     
285     # Find external interface, if any
286     public_addr = os.popen(
287         "/sbin/ifconfig "
288         "| grep $(ip route | grep default | awk '{print $3}' "
289                 "| awk -F. '{print $1\"[.]\"$2}') "
290         "| head -1 | awk '{print $2}' "
291         "| awk -F : '{print $2}'").read().rstrip()
292     element.tun_addr = public_addr
293
294     # Set standard TUN attributes
295     if not element.tun_port and element.tun_addr:
296         element.tun_port = 15000 + int(guid)
297
298     # First-phase setup
299     if element.peer_proto:
300         # cross tun
301         if not element.tun_addr or not element.tun_port:
302             listening = True
303         elif not element.peer_addr or not element.peer_port:
304             listening = True
305         else:
306             # both have addresses...
307             # ...the one with the lesser address listens
308             listening = element.tun_addr < element.peer_addr
309         element.listen = listening
310         element.Prepare()
311
312 def postconfigure_tunchannel(testbed_instance, guid):
313     """
314     TunChannel preconfiguration.
315     
316     Initiates the forwarder thread for connecting tcp channels or 
317     udp channels in general.
318     
319     Should be adequate for most implementations.
320     """
321     element = testbed_instance._elements[guid]
322     
323     # Second-phase setup
324     element.Setup()
325
326
327 def crossconnect_tunchannel_peer_init(proto, testbed_instance, tun_guid, peer_data,
328         preconfigure_tunchannel = preconfigure_tunchannel):
329     """
330     Cross-connection initialization.
331     Should be adequate for most implementations.
332     
333     For use in metadata, bind the first "proto" argument with the connector type. Eg:
334     
335         conn_init = functools.partial(crossconnect_tunchannel_peer_init, "tcp")
336     
337     If you don't use the stock preconfigure function, specify your own as a keyword argument.
338     """
339     tun = testbed_instance._elements[tun_guid]
340     tun.peer_addr = peer_data.get("tun_addr")
341     tun.peer_proto = peer_data.get("tun_proto") or proto
342     tun.peer_port = peer_data.get("tun_port")
343     tun.tun_key = min(tun.tun_key, peer_data.get("tun_key"))
344     tun.tun_proto = proto
345     
346     preconfigure_tunchannel(testbed_instance, tun_guid)
347
348 def crossconnect_tunchannel_peer_compl(proto, testbed_instance, tun_guid, peer_data,
349         postconfigure_tunchannel = postconfigure_tunchannel):
350     """
351     Cross-connection completion.
352     Should be adequeate for most implementations.
353     
354     For use in metadata, bind the first "proto" argument with the connector type. Eg:
355     
356         conn_init = functools.partial(crossconnect_tunchannel_peer_compl, "tcp")
357     
358     If you don't use the stock postconfigure function, specify your own as a keyword argument.
359     """
360     # refresh (refreshable) attributes for second-phase
361     tun = testbed_instance._elements[tun_guid]
362     tun.peer_addr = peer_data.get("tun_addr")
363     tun.peer_proto = peer_data.get("tun_proto") or proto
364     tun.peer_port = peer_data.get("tun_port")
365     
366     postconfigure_tunchannel(testbed_instance, tun_guid)
367
368     
369
370 def wait_tunchannel(testbed_instance, guid):
371     """
372     Wait for the channel forwarder to be up and running.
373     
374     Useful as a pre-start function to assure proper startup synchronization,
375     be certain to start TunChannels before applications that might require them.
376     """
377     element = testbed_instance.elements[guid]
378     element.Wait()
379