Several fixes:
[nepi.git] / src / nepi / util / tunchannel_impl.py
1 import os
2 import sys
3 import random
4 import threading
5 import socket
6 import select
7 import weakref
8 import time
9
10 from tunchannel import tun_fwd, udp_establish, tcp_establish
11
12 class TunChannel(object):
13     """
14     Helper box class that implements most of the required boilerplate
15     for tunnelling cross connections.
16     
17     The class implements a threaded forwarder that runs in the
18     testbed controller process. It takes several parameters that
19     can be given by directly setting attributes:
20     
21         tun_port/addr/proto/cipher: information about the local endpoint.
22             The addresses here should be externally-reachable,
23             since when listening or when using the UDP protocol,
24             connections to this address/port will be attempted
25             by remote endpoitns.
26         
27         peer_port/addr/proto/cipher: information about the remote endpoint.
28             Usually, you set these when the cross connection 
29             initializer/completion functions are invoked (both).
30         
31         tun_key: the agreed upon encryption key.
32         
33         with_pi: set if the incoming packet stream (see tun_socket)
34             contains PI headers - if so, they will be stripped.
35         
36         ethernet_mode: set if the incoming packet stream is
37             composed of ethernet frames (as opposed of IP packets).
38         
39         tun_socket: a socket or file object that can be read
40             from and written to. Packets will be read when available,
41             remote packets will be forwarded as writes.
42             A socket should be of type SOCK_SEQPACKET (or SOCK_DGRAM
43             if not possible), a file object should preserve packet
44             boundaries (ie, a pipe or TUN/TAP device file descriptor).
45         
46         trace_target: a file object where trace output will be sent.
47             It cannot be changed after launch.
48             By default, it's sys.stderr
49     """
50     
51     def __init__(self):
52         # Some operational attributes
53         self.ethernet_mode = True
54         self.with_pi = False
55         
56         # These get initialized when the channel is configured
57         # They're part of the TUN standard attribute set
58         self.tun_port = None
59         self.tun_addr = None
60         self.tun_cipher = 'AES'
61         
62         # These get initialized when the channel is connected to its peer
63         self.peer_proto = None
64         self.peer_addr = None
65         self.peer_port = None
66         self.peer_cipher = None
67         
68         # These get initialized when the channel is connected to its iface
69         self.tun_socket = None
70
71         # same as peer proto, but for execute-time standard attribute lookups
72         self.tun_proto = None 
73         
74         # some state
75         self.prepared = False
76         self._terminate = [] # terminate signaller
77         self._exc = [] # exception store, to relay exceptions from the forwarder thread
78         self._connected = threading.Event()
79         self._forwarder_thread = None
80        
81         # trace to stderr
82         self.stderr = sys.stderr
83         
84         # Generate an initial random cryptographic key to use for tunnelling
85         # Upon connection, both endpoints will agree on a common one based on
86         # this one.
87         self.tun_key = os.urandom(32).encode("base64").strip()
88         
89
90     def __str__(self):
91         return "%s<%s %s:%s %s %s:%s %s>" % (
92             self.__class__.__name__,
93             self.tun_proto, 
94             self.tun_addr, self.tun_port,
95             self.peer_proto, 
96             self.peer_addr, self.peer_port,
97             self.tun_cipher,
98         )
99
100     def launch(self):
101         # self.tun_proto is only set if the channel is connected
102         # launch has to be a no-op in unconnected channels because
103         # it is called at configuration time, which for cross connections
104         # happens before connection.
105         if self.tun_proto:
106             if not self._forwarder_thread:
107                 self._launch()
108     
109     def cleanup(self):
110         if self._forwarder_thread:
111             self.kill()
112
113     def wait(self):
114         if self._forwarder_thread:
115             self._connected.wait()
116             for exc in self._exc:
117                 # Relay exception
118                 eTyp, eVal, eLoc = exc
119                 raise eTyp, eVal, eLoc
120
121     def kill(self):    
122         if self._forwarder_thread:
123             if not self._terminate:
124                 self._terminate.append(None)
125             self._forwarder_thread.join()
126
127     def _launch(self):
128         # Launch forwarder thread with a weak reference
129         # to self, so that we don't create any strong cycles
130         # and automatic refcounting works as expected
131         self._forwarder_thread = threading.Thread(
132             target = self._forwarder,
133             args = (weakref.ref(self),) )
134         self._forwarder_thread.start()
135
136     @staticmethod
137     def _forwarder(weak_self):
138         try:
139             weak_self().__forwarder(weak_self)
140         except:
141             self = weak_self()
142             
143             # store exception and wake up anyone waiting
144             self._exc.append(sys.exc_info())
145             self._connected.set()
146     
147     @staticmethod
148     def __forwarder(weak_self):
149         # grab strong reference
150         self = weak_self()
151         if not self:
152             return
153         
154         peer_port = self.peer_port
155         peer_addr = self.peer_addr
156         peer_proto= self.peer_proto
157         peer_cipher=self.peer_cipher
158
159         local_port = self.tun_port
160         local_addr = self.tun_addr
161         local_proto = self.tun_proto
162         local_cipher= self.tun_cipher
163         
164         stderr = self.stderr
165         ether_mode = self.ethernet_mode
166         with_pi = self.with_pi
167         
168         if local_proto != peer_proto:
169             raise RuntimeError, "Peering protocol mismatch: %s != %s" % (local_proto, peer_proto)
170
171         if local_cipher != peer_cipher:
172             raise RuntimeError, "Peering cipher mismatch: %s != %s" % (local_cipher, peer_cipher)
173         
174         if not peer_port or not peer_addr:
175             raise RuntimeError, "Misconfigured peer for: %s" % (self,)
176
177         if not local_port or not local_addr:
178             raise RuntimeError, "Misconfigured TUN: %s" % (self,)
179         
180         TERMINATE = self._terminate
181         cipher_key = self.tun_key
182         tun = self.tun_socket
183         udp = local_proto == 'udp'
184
185         if not tun:
186             raise RuntimeError, "Unconnected TUN channel %s" % (self,)
187
188         if local_proto == 'udp':
189             rsock = udp_establish(TERMINATE, local_addr, local_port, 
190                     peer_addr, peer_port)
191             remote = os.fdopen(rsock.fileno(), 'r+b', 0)
192         elif local_proto == 'tcp':
193             rsock = tcp_establish(TERMINATE, local_addr, local_port,
194                     peer_addr, peer_port)
195             remote = os.fdopen(rsock.fileno(), 'r+b', 0)
196         else:
197             raise RuntimeError, "Bad protocol for %s: %r" % (self,local_proto)
198
199         # notify that we're ready
200         self._connected.set()
201         
202         # drop strong reference
203         del self
204         
205         print >>sys.stderr, "Connected"
206         tun_fwd(tun, remote,
207             with_pi = with_pi, 
208             ether_mode = ether_mode, 
209             cipher_key = cipher_key, 
210             udp = udp, 
211             TERMINATE = TERMINATE,
212             stderr = stderr,
213             cipher = local_cipher
214         )
215         
216         tun.close()
217         remote.close()
218
219
220 def create_tunchannel(testbed_instance, guid, devnull = []):
221     """
222     TunChannel factory for metadata.
223     By default, silences traceing.
224     
225     You can override the created element's attributes if you will.
226     """
227     if not devnull:
228         # just so it's not open if not needed
229         devnull.append(open("/dev/null","w"))
230     element = TunChannel()
231     element.stderr = devnull[0] # silence tracing
232     testbed_instance._elements[guid] = element
233
234 def preconfigure_tunchannel(testbed_instance, guid):
235     """
236     TunChannel preconfiguration.
237     
238     It initiates the forwarder thread for listening tcp channels.
239     
240     Takes the public address from the operating system, so it should be adequate
241     for most situations when the TunChannel forwarder thread runs in the same
242     process as the testbed controller.
243     """
244     element = testbed_instance._elements[guid]
245     
246     # Find external interface, if any
247     public_addr = os.popen(
248         "/sbin/ifconfig "
249         "| grep $(ip route | grep default | awk '{print $3}' "
250                 "| awk -F. '{print $1\"[.]\"$2}') "
251         "| head -1 | awk '{print $2}' "
252         "| awk -F : '{print $2}'").read().rstrip()
253     element.tun_addr = public_addr
254
255     # Set standard TUN attributes
256     if not element.tun_port and element.tun_addr:
257         element.tun_port = 15000 + int(guid)
258
259 def postconfigure_tunchannel(testbed_instance, guid):
260     """
261     TunChannel preconfiguration.
262     
263     Initiates the forwarder thread for connecting tcp channels or 
264     udp channels in general.
265     
266     Should be adequate for most implementations.
267     """
268     element = testbed_instance._elements[guid]
269    
270     element.launch()
271
272 def crossconnect_tunchannel_peer_init(proto, testbed_instance, tun_guid, peer_data,
273         preconfigure_tunchannel = preconfigure_tunchannel):
274     """
275     Cross-connection initialization.
276     Should be adequate for most implementations.
277     
278     For use in metadata, bind the first "proto" argument with the connector type. Eg:
279     
280         conn_init = functools.partial(crossconnect_tunchannel_peer_init, "tcp")
281     
282     If you don't use the stock preconfigure function, specify your own as a keyword argument.
283     """
284     tun = testbed_instance._elements[tun_guid]
285     tun.peer_addr = peer_data.get("tun_addr")
286     tun.peer_proto = peer_data.get("tun_proto") or proto
287     tun.peer_port = peer_data.get("tun_port")
288     tun.peer_cipher = peer_data.get("tun_cipher")
289     tun.tun_key = min(tun.tun_key, peer_data.get("tun_key"))
290     tun.tun_proto = proto
291   
292     preconfigure_tunchannel(testbed_instance, tun_guid)
293
294 def crossconnect_tunchannel_peer_compl(proto, testbed_instance, tun_guid, peer_data,
295         postconfigure_tunchannel = postconfigure_tunchannel):
296     """
297     Cross-connection completion.
298     Should be adequeate for most implementations.
299     
300     For use in metadata, bind the first "proto" argument with the connector type. Eg:
301     
302         conn_init = functools.partial(crossconnect_tunchannel_peer_compl, "tcp")
303     
304     If you don't use the stock postconfigure function, specify your own as a keyword argument.
305     """
306     # refresh (refreshable) attributes for second-phase
307     tun = testbed_instance._elements[tun_guid]
308     tun.peer_addr = peer_data.get("tun_addr")
309     tun.peer_proto = peer_data.get("tun_proto") or proto
310     tun.peer_port = peer_data.get("tun_port")
311     tun.peer_cipher = peer_data.get("tun_cipher")
312    
313     postconfigure_tunchannel(testbed_instance, tun_guid)
314
315 def prestart_tunchannel(testbed_instance, guid):
316     """
317     Wait for the channel forwarder to be up and running.
318     
319     Useful as a pre-start function to assure proper startup synchronization,
320     be certain to start TunChannels before applications that might require them.
321     """
322     element = testbed_instance.elements[guid]
323     element.wait()
324