# These get initialized when the iface is connected to any filter
self.filter_module = None
+ self.multicast_forwarder = None
# These get initialized when the iface is configured
self.external_iface = None
if self.tun_cipher != 'PLAIN' and self.peer_proto not in ('udp','tcp',None):
raise RuntimeError, "Miscofnigured TUN: %s - ciphered tunnels only work with udp or tcp links" % (self,)
- def _impl_instance(self, home_path, listening):
+ def _impl_instance(self, home_path):
impl = self._PROTO_MAP[self.peer_proto](
- self, self.peer_iface, home_path, self.tun_key, listening)
+ self, self.peer_iface, home_path, self.tun_key)
impl.port = self.tun_port
+ impl.cross_slice = not self.peer_iface or isinstance(self.peer_iface, _CrossIface)
return impl
def recover(self):
else:
self._delay_recover = True
- def prepare(self, home_path, listening):
- if not self.peer_iface and (self.peer_proto and (listening or (self.peer_addr and self.peer_port))):
+ def prepare(self, home_path):
+ if not self.peer_iface and (self.peer_proto and self.peer_addr and self.peer_port):
# Ad-hoc peer_iface
self.peer_iface = _CrossIface(
self.peer_proto,
self.peer_cipher)
if self.peer_iface:
if not self.peer_proto_impl:
- self.peer_proto_impl = self._impl_instance(home_path, listening)
+ self.peer_proto_impl = self._impl_instance(home_path)
if self._delay_recover:
self.peer_proto_impl.recover()
- else:
- self.peer_proto_impl.prepare()
- def setup(self):
+ def launch(self):
if self.peer_proto_impl:
- self.peer_proto_impl.setup()
+ self.peer_proto_impl.launch()
def cleanup(self):
if self.peer_proto_impl:
self.peer_proto_impl.destroy()
self.peer_proto_impl = None
- def async_launch_wait(self):
+ def wait(self):
if self.peer_proto_impl:
- self.peer_proto_impl.async_launch_wait()
+ self.peer_proto_impl.wait()
def sync_trace(self, local_dir, whichtrace, tracemap = None):
if self.peer_proto_impl:
TUNFILTER = "TunFilter"
CLASSQUEUEFILTER = "ClassQueueFilter"
TOSQUEUEFILTER = "TosQueueFilter"
+MULTICASTFORWARDER = "MulticastForwarder"
+MULTICASTANNOUNCER = "MulticastAnnouncer"
+MULTICASTROUTER = "MulticastRouter"
TUNFILTERS = (TUNFILTER, CLASSQUEUEFILTER, TOSQUEUEFILTER)
TAPFILTERS = (TUNFILTER, )
crossconnect_filter_peer_init(proto, testbed_instance, iface_guid, peer_iface_data)
crossconnect_filter_peer_compl(proto, testbed_instance, iface_guid, peer_iface_data)
-def connect_dep(testbed_instance, node_guid, app_guid):
- node = testbed_instance._elements[node_guid]
- app = testbed_instance._elements[app_guid]
+def connect_dep(testbed_instance, node_guid, app_guid, node=None, app=None):
+ node = node or testbed_instance._elements[node_guid]
+ app = app or testbed_instance._elements[app_guid]
app.node = node
if app.depends:
if app.rpmFusion:
node.rpmFusion = True
+def connect_forwarder(testbed_instance, node_guid, fwd_guid):
+ node = testbed_instance._elements[node_guid]
+ fwd = testbed_instance._elements[fwd_guid]
+ node.multicast_forwarder = fwd
+
+ if fwd.router:
+ connect_dep(testbed_instance, node_guid, None, app=fwd.router)
+
+ connect_dep(testbed_instance, node_guid, fwd_guid)
+
+def connect_router(testbed_instance, fwd_guid, router_guid):
+ fwd = testbed_instance._elements[fwd_guid]
+ router = testbed_instance._elements[router_guid]
+ fwd.router = router
+
+ if fwd.node:
+ connect_dep(testbed_instance, None, router_guid, node=fwd.node)
+
def connect_node_netpipe(testbed_instance, node_guid, netpipe_guid):
node = testbed_instance._elements[node_guid]
netpipe = testbed_instance._elements[netpipe_guid]
testbed_instance.elements[guid] = element
+def create_multicast_forwarder(testbed_instance, guid):
+ parameters = testbed_instance._get_parameters(guid)
+ element = testbed_instance._make_multicast_forwarder(parameters)
+
+ # Just inject configuration stuff
+ element.home_path = "nepi-mcfwd-%s" % (guid,)
+
+ testbed_instance.elements[guid] = element
+
+def create_multicast_announcer(testbed_instance, guid):
+ parameters = testbed_instance._get_parameters(guid)
+ element = testbed_instance._make_multicast_announcer(parameters)
+
+ # Just inject configuration stuff
+ element.home_path = "nepi-mcann-%s" % (guid,)
+
+ testbed_instance.elements[guid] = element
+
+def create_multicast_router(testbed_instance, guid):
+ parameters = testbed_instance._get_parameters(guid)
+ element = testbed_instance._make_multicast_router(parameters)
+
+ # Just inject configuration stuff
+ element.home_path = "nepi-mcrt-%s" % (guid,)
+
+ testbed_instance.elements[guid] = element
+
def create_internet(testbed_instance, guid):
parameters = testbed_instance._get_parameters(guid)
element = testbed_instance._make_internet(parameters)
element.validate()
# First-phase setup
- if element.peer_proto:
- if element.peer_iface and isinstance(element.peer_iface, testbed_instance._interfaces.TunIface):
- # intra tun
- listening = id(element) < id(element.peer_iface)
- else:
- # cross tun
- if not element.tun_addr or not element.tun_port:
- listening = True
- elif not element.peer_addr or not element.peer_port:
- listening = True
- else:
- # both have addresses...
- # ...the one with the lesser address listens
- listening = element.tun_addr < element.peer_addr
- element.prepare(
- 'tun-%s' % (guid,),
- listening)
+ element.prepare('tun-%s' % (guid,))
def postconfigure_tuniface(testbed_instance, guid):
element = testbed_instance._elements[guid]
# Second-phase setup
- element.setup()
+ element.launch()
- def wait_tuniface(testbed_instance, guid):
+ def prestart_tuniface(testbed_instance, guid):
element = testbed_instance._elements[guid]
# Second-phase setup
- element.async_launch_wait()
-
+ element.wait()
def configure_node(testbed_instance, guid):
node = testbed_instance._elements[guid]
# Install stuff
dep.async_setup()
+def configure_announcer(testbed_instance, guid):
+ # Link ifaces
+ fwd = testbed_instance._elements[guid]
+ fwd.ifaces = [ dev
+ for node_guid in testbed_instance.get_connected(guid, "node", "apps")
+ for dev_guid in testbed_instance.get_connected(node_guid, "devs", "node")
+ for dev in ( testbed_instance._elements.get(dev_guid) ,)
+ if dev and isinstance(dev, testbed_instance._interfaces.TunIface)
+ and dev.multicast ]
+
+ # Install stuff
+ configure_dependency(testbed_instance, guid)
+
+def configure_forwarder(testbed_instance, guid):
+ configure_announcer(testbed_instance, guid)
+
+ # Link ifaces to forwarder
+ fwd = testbed_instance._elements[guid]
+ for iface in fwd.ifaces:
+ iface.multicast_forwarder = '/var/run/mcastfwd'
+
+def configure_router(testbed_instance, guid):
+ # Link ifaces
+ rt = testbed_instance._elements[guid]
+ rt.nonifaces = [ dev
+ for fwd_guid in testbed_instance.get_connected(guid, "fwd", "router")
+ for node_guid in testbed_instance.get_connected(fwd_guid, "node", "apps")
+ for dev_guid in testbed_instance.get_connected(node_guid, "devs", "node")
+ for dev in ( testbed_instance._elements.get(dev_guid) ,)
+ if dev and isinstance(dev, testbed_instance._interfaces.TunIface)
+ and not dev.multicast ]
+
+ # Install stuff
+ configure_dependency(testbed_instance, guid)
+
def configure_netpipe(testbed_instance, guid):
netpipe = testbed_instance._elements[guid]
"max": 1,
"min": 1
}),
+ "router": dict({
+ "help": "Connector to a routing daemon",
+ "name": "router",
+ "max": 1,
+ "min": 1
+ }),
+ "fwd": dict({
+ "help": "Forwarder this routing daemon communicates with",
+ "name": "fwd",
+ "max": 1,
+ "min": 1
+ }),
"pipes": dict({
"help": "Connector to a NetPipe",
"name": "pipes",
}),
dict({
"from": (TESTBED_ID, NODE, "apps"),
- "to": (TESTBED_ID, APPLICATION, "node"),
+ "to": (TESTBED_ID, (APPLICATION, DEPENDENCY, NEPIDEPENDENCY, NS3DEPENDENCY, MULTICASTANNOUNCER), "node"),
"init_code": connect_dep,
"can_cross": False
}),
dict({
- "from": (TESTBED_ID, NODE, "deps"),
- "to": (TESTBED_ID, DEPENDENCY, "node"),
- "init_code": connect_dep,
- "can_cross": False
- }),
- dict({
- "from": (TESTBED_ID, NODE, "deps"),
- "to": (TESTBED_ID, NEPIDEPENDENCY, "node"),
- "init_code": connect_dep,
- "can_cross": False
- }),
- dict({
- "from": (TESTBED_ID, NODE, "deps"),
- "to": (TESTBED_ID, NS3DEPENDENCY, "node"),
- "init_code": connect_dep,
+ "from": (TESTBED_ID, NODE, "apps"),
+ "to": (TESTBED_ID, MULTICASTFORWARDER, "node"),
+ "init_code": connect_forwarder,
"can_cross": False
}),
dict({
- "from": (TESTBED_ID, NODE, "pipes"),
- "to": (TESTBED_ID, NETPIPE, "node"),
- "init_code": connect_node_netpipe,
+ "from": (TESTBED_ID, MULTICASTFORWARDER, "router"),
+ "to": (TESTBED_ID, MULTICASTROUTER, "fwd"),
+ "init_code": connect_router,
"can_cross": False
}),
dict({
"flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
"validation_function": validation.is_string
}),
+ "routing_algorithm": dict({
+ "name": "algorithm",
+ "help": "Routing algorithm.",
+ "value": "dvmrp",
+ "type": Attribute.ENUM,
+ "allowed": ["dvmrp"],
+ "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
+ "validation_function": validation.is_enum,
+ }),
})
traces = dict({
}),
})
-create_order = [ INTERNET, NODE, NODEIFACE, CLASSQUEUEFILTER, TOSQUEUEFILTER, TUNFILTER, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ]
+create_order = [
+ INTERNET, NODE, NODEIFACE, CLASSQUEUEFILTER, TOSQUEUEFILTER,
+ MULTICASTANNOUNCER, MULTICASTFORWARDER, MULTICASTROUTER,
+ TUNFILTER, TAPIFACE, TUNIFACE, NETPIPE,
+ NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ]
-configure_order = [ INTERNET, Parallel(NODE), NODEIFACE, Parallel(TAPIFACE), Parallel(TUNIFACE), NETPIPE, Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ]
+configure_order = [
+ INTERNET, Parallel(NODE),
+ NODEIFACE,
+ Parallel(MULTICASTANNOUNCER), Parallel(MULTICASTFORWARDER), Parallel(MULTICASTROUTER),
+ Parallel(TAPIFACE), Parallel(TUNIFACE), NETPIPE,
+ Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ]
# Start (and prestart) node after ifaces, because the node needs the ifaces in order to set up routes
-start_order = [ INTERNET, NODEIFACE, Parallel(TAPIFACE), Parallel(TUNIFACE), Parallel(NODE), NETPIPE, Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ]
+start_order = [ INTERNET,
+ NODEIFACE,
+ Parallel(TAPIFACE), Parallel(TUNIFACE),
+ Parallel(NODE), NETPIPE,
+ Parallel(MULTICASTANNOUNCER), Parallel(MULTICASTFORWARDER), Parallel(MULTICASTROUTER),
+ Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ]
# cleanup order
-shutdown_order = [ Parallel(APPLICATION), Parallel(TAPIFACE), Parallel(TUNIFACE), Parallel(NETPIPE), Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), NODEIFACE, Parallel(NODE) ]
+shutdown_order = [
+ Parallel(APPLICATION),
+ Parallel(MULTICASTROUTER), Parallel(MULTICASTFORWARDER), Parallel(MULTICASTANNOUNCER),
+ Parallel(TAPIFACE), Parallel(TUNIFACE), Parallel(NETPIPE),
+ Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY),
+ NODEIFACE, Parallel(NODE) ]
factories_info = dict({
NODE: dict({
"create_function": create_tuniface,
"preconfigure_function": preconfigure_tuniface,
"configure_function": postconfigure_tuniface,
- "prestart_function": wait_tuniface,
+ "prestart_function": prestart_tuniface,
"box_attributes": [
"up", "if_name", "mtu", "snat", "pointopoint", "multicast", "bwlimit",
"txqueuelen",
"create_function": create_tapiface,
"preconfigure_function": preconfigure_tuniface,
"configure_function": postconfigure_tuniface,
- "prestart_function": wait_tuniface,
+ "prestart_function": prestart_tuniface,
"box_attributes": [
"up", "if_name", "mtu", "snat", "pointopoint", "multicast", "bwlimit",
"txqueuelen",
"connector_types": ["node"],
"traces": ["buildlog"],
}),
+ MULTICASTFORWARDER: dict({
+ "help": "This application installs a userspace packet forwarder "
+ "that, when connected to a node, filters all packets "
+ "flowing through multicast-capable virtual interfaces "
+ "and applies custom-specified routing policies.",
+ "category": FC.CATEGORY_APPLICATIONS,
+ "create_function": create_multicast_forwarder,
+ "preconfigure_function": configure_forwarder,
+ "start_function": start_application,
+ "status_function": status_application,
+ "stop_function": stop_application,
+ "box_attributes": [ ],
+ "connector_types": ["node","router"],
+ "traces": ["buildlog","stderr"],
+ }),
+ MULTICASTANNOUNCER: dict({
+ "help": "This application installs a userspace daemon that "
+ "monitors multicast membership and announces it on all "
+ "multicast-capable interfaces.\n"
+ "This does not usually happen automatically on PlanetLab slivers.",
+ "category": FC.CATEGORY_APPLICATIONS,
+ "create_function": create_multicast_announcer,
+ "preconfigure_function": configure_announcer,
+ "start_function": start_application,
+ "status_function": status_application,
+ "stop_function": stop_application,
+ "box_attributes": [ ],
+ "connector_types": ["node"],
+ "traces": ["buildlog","stderr"],
+ }),
+ MULTICASTROUTER: dict({
+ "help": "This application installs a userspace daemon that "
+ "monitors multicast membership and announces it on all "
+ "multicast-capable interfaces.\n"
+ "This does not usually happen automatically on PlanetLab slivers.",
+ "category": FC.CATEGORY_APPLICATIONS,
+ "create_function": create_multicast_router,
+ "preconfigure_function": configure_router,
+ "start_function": start_application,
+ "status_function": status_application,
+ "stop_function": stop_application,
+ "box_attributes": ["routing_algorithm"],
+ "connector_types": ["fwd"],
+ "traces": ["buildlog","stdout","stderr"],
+ }),
INTERNET: dict({
"help": "Internet routing",
"category": FC.CATEGORY_CHANNELS,
import traceback
import tunchannel
-import ipaddr2
+
+try:
+ import iovec
+ HAS_IOVEC = True
+except:
+ HAS_IOVEC = False
tun_name = 'tun0'
tun_path = '/dev/net/tun'
hostaddr = socket.gethostbyname(socket.gethostname())
- usage = "usage: %prog [options] <remote-endpoint>"
+ usage = "usage: %prog [options]"
parser = optparse.OptionParser(usage=usage)
default = "/dev/net/tun",
help = "TUN/TAP device file path or file descriptor number")
parser.add_option(
- "-p", "--port", dest="port", metavar="PORT", type="int",
+ "-p", "--peer-port", dest="peer_port", metavar="PEER_PORT", type="int",
default = 15000,
- help = "Peering TCP port to connect or listen to.")
+ help = "Remote TCP/UDP port to connect to.")
parser.add_option(
"--pass-fd", dest="pass_fd", metavar="UNIX_SOCKET",
default = None,
"If given, all other connectivity options are ignored, tun_connect will "
"simply wait to be killed after passing the file descriptor, and it will be "
"the receiver's responsability to handle the tunneling.")
-
parser.add_option(
"-m", "--mode", dest="mode", metavar="MODE",
default = "none",
"by using the proper interface (tunctl for tun/tap, /vsys/fd_tuntap.control for pl-tun/pl-tap), "
"and it will be brought up (with ifconfig for tun/tap, with /vsys/vif_up for pl-tun/pl-tap). You have "
"to specify an VIF_ADDRESS and VIF_MASK in any case (except for none).")
+ parser.add_option(
+ "-t", "--protocol", dest="protocol", metavar="PROTOCOL",
+ default = None,
+ help =
+ "Set protocol. One of tcp, udp, fd, gre. In any mode except none, a TUN/TAP will be created.")
parser.add_option(
"-A", "--vif-address", dest="vif_addr", metavar="VIF_ADDRESS",
default = None,
help =
"See mode. This specifies the VIF_MASK, "
"a number indicating the network type (ie: 24 for a C-class network).")
+ parser.add_option(
+ "-P", "--port", dest="port", type="int", metavar="PORT",
+ default = None,
+ help =
+ "This specifies the LOCAL_PORT. This will be the local bind port for UDP/TCP.")
parser.add_option(
"-S", "--vif-snat", dest="vif_snat",
action = "store_true",
default = False,
help = "See mode. This specifies whether SNAT will be enabled for the virtual interface. " )
parser.add_option(
- "-P", "--vif-pointopoint", dest="vif_pointopoint", metavar="DST_ADDR",
+ "-Z", "--vif-pointopoint", dest="vif_pointopoint", metavar="DST_ADDR",
default = None,
help =
"See mode. This specifies the remote endpoint's virtual address, "
help =
"This specifies the interface's emulated bandwidth in bytes per second." )
parser.add_option(
- "-u", "--udp", dest="udp", metavar="PORT", type="int",
+ "-a", "--peer-address", dest="peer_addr", metavar="PEER_ADDRESS",
default = None,
help =
- "Bind to the specified UDP port locally, and send UDP datagrams to the "
- "remote endpoint, creating a tunnel through UDP rather than TCP." )
+ "This specifies the PEER_ADDRESS, "
+ "the IP address of the remote interface.")
parser.add_option(
"-k", "--key", dest="cipher_key", metavar="KEY",
default = None,
"Specify a symmetric encryption key with which to protect packets across "
"the tunnel. python-crypto must be installed on the system." )
parser.add_option(
- "-K", "--gre-key", dest="gre_key", metavar="KEY", type="int",
- default = None,
+ "-K", "--gre-key", dest="gre_key", metavar="KEY", type="string",
+ default = "true",
help =
"Specify a demultiplexing 32-bit numeric key for GRE." )
parser.add_option(
help = "If specified, packets won't be logged to standard output, "
"but dumped to a pcap-formatted trace in the specified file. " )
parser.add_option(
- "--multicast", dest="multicast",
- action = "store_true",
- default = False,
- help = "If specified, multicast packets will be forwarded and IGMP "
- "join/leave packets will be generated. Routing information "
- "must be sent to the mroute unix socket, in a format identical "
- "to that of the kernel's MRT ioctls, prefixed with 32-bit IOCTL "
- "code and 32-bit data length." )
+ "--multicast-forwarder", dest="multicast_fwd",
+ default = None,
+ help = "If specified, multicast packets will be forwarded to "
+ "the specified unix-domain socket. If the device uses ethernet "
+ "frames, ethernet headers will be stripped and IP packets "
+ "will be forwarded, prefixed with the interface's address." )
parser.add_option(
"--filter", dest="filter_module", metavar="PATH",
default = None,
help = "If specified, packets won't be logged to standard output, "
"but dumped to a pcap-formatted trace in the specified file. " )
- (options, remaining_args) = parser.parse_args(sys.argv[1:])
+ (options,args) = parser.parse_args(sys.argv[1:])
options.cipher = {
'aes' : 'AES',
IFREQ_SZ = 0x00000028
FIONREAD = 0x0000541b
-class MulticastThread(threading.Thread):
- def __init__(self, *p, **kw):
- super(MulticastThread, self).__init__(*p, **kw)
- self.igmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IGMP)
- self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF,
- socket.inet_aton(options.vif_addr) )
- self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
- self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
- self._stop = False
- self.setDaemon(True)
-
- def run(self):
- devnull = open('/dev/null','r+b')
- maddr_re = re.compile(r"\s*inet\s*(\d{1,3}[.]\d{1,3}[.]\d{1,3}[.]\d{1,3})\s*")
- cur_maddr = set()
- lastfullrefresh = time.time()
- while not self._stop:
- # Get current subscriptions
- proc = subprocess.Popen(['ip','maddr','show',tun_name],
- stdout = subprocess.PIPE,
- stderr = subprocess.STDOUT,
- stdin = devnull)
- new_maddr = set()
- for line in proc.stdout:
- match = maddr_re.match(line)
- if match:
- new_maddr.add(match.group(1))
- proc.wait()
-
- # Every now and then, send a full report
- now = time.time()
- report_new = new_maddr
- if (now - lastfullrefresh) <= 30.0:
- report_new = report_new - cur_maddr
- else:
- lastfullrefresh = now
-
- # Report subscriptions
- for grp in report_new:
- igmpp = ipaddr2.ipigmp(
- options.vif_addr, '224.0.0.2', 1, 0x16, 0, grp,
- noipcksum=True)
- self.igmp_socket.sendto(igmpp, 0, ('224.0.0.2',0))
-
- # Notify group leave
- for grp in cur_maddr - new_maddr:
- igmpp = ipaddr2.ipigmp(
- options.vif_addr, '224.0.0.2', 1, 0x17, 0, grp,
- noipcksum=True)
- self.igmp_socket.sendto(igmpp, 0, ('224.0.0.2',0))
-
- cur_maddr = new_maddr
-
- time.sleep(1)
-
- def stop(self):
- self._stop = True
- self.join(5)
-
class HostLock(object):
# This class is used as a lock to prevent concurrency issues with more
# than one instance of netns running in the same machine. Both in
if options.vif_txqueuelen is not None:
stdin.write("txqueuelen=%d\n" % (options.vif_txqueuelen,))
if options.mode.startswith('pl-gre'):
- stdin.write("gre=%d\n" % (options.gre_key,))
+ stdin.write("gre=%s\n" % (options.gre_key,))
- stdin.write("remote=%s\n" % (remaining_args[0],))
+ stdin.write("remote=%s\n" % (options.peer_addr,))
stdin.close()
t.join()
with_pi = options.mode.startswith('pl-'),
ether_mode = tun_name.startswith('tap'),
cipher_key = options.cipher_key,
- udp = options.udp,
+ udp = options.protocol == 'udp',
TERMINATE = TERMINATE,
stderr = None,
reconnect = reconnect,
filter_close = None
queueclass = None
+# install multicast forwarding hook
+if options.multicast_fwd:
+ print >>sys.stderr, "Connecting to mcast filter"
+ mcfwd_sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+ tunchannel.nonblock(mcfwd_sock.fileno())
+
# be careful to roll back stuff on exceptions
tun_path, tun_name = modeinfo['alloc'](tun_path, tun_name)
try:
tcpdump = None
reconnect = None
mcastthread = None
+
+ # install multicast forwarding hook
+ if options.multicast_fwd:
+ print >>sys.stderr, "Installing mcast filter"
+
+ if HAS_IOVEC:
+ writev = iovec.writev
+ else:
+ os_write = os.write
+ map_ = map
+ str_ = str
+ def writev(fileno, *stuff):
+ os_write(''.join(map_(str_,stuff)))
+
+ def accept_packet(packet, direction,
+ _up_accept=accept_packet,
+ sock=mcfwd_sock,
+ sockno=mcfwd_sock.fileno(),
+ etherProto=tunchannel.etherProto,
+ etherStrip=tunchannel.etherStrip,
+ etherMode=tun_name.startswith('tap'),
+ multicast_fwd = options.multicast_fwd,
+ vif_addr = socket.inet_aton(options.vif_addr),
+ connected = [], writev=writev,
+ len=len, ord=ord):
+ if _up_accept:
+ rv = _up_accept(packet, direction)
+ if not rv:
+ return rv
+
+ if direction == 1:
+ # Incoming... what?
+ if etherMode:
+ if etherProto(packet)=='\x08\x00':
+ fwd = etherStrip(packet)
+ else:
+ fwd = None
+ else:
+ fwd = packet
+ if fwd is not None and len(fwd) >= 20:
+ if (ord(fwd[16]) & 0xf0) == 0xe0:
+ # Forward it
+ if not connected:
+ try:
+ sock.connect(multicast_fwd)
+ connected.append(None)
+ except:
+ traceback.print_exc(file=sys.stderr)
+ if connected:
+ try:
+ writev(sockno, vif_addr,fwd)
+ except:
+ traceback.print_exc(file=sys.stderr)
+ return 1
+
- if options.pass_fd:
+ if options.protocol == 'fd':
if accept_packet or filter_init:
raise NotImplementedError, "--pass-fd and --filter are not compatible"
while not TERM:
time.sleep(1)
remote = None
- elif options.mode.startswith('pl-gre'):
+ elif options.protocol == "gre":
if accept_packet or filter_init:
raise NotImplementedError, "--mode %s and --filter are not compatible" % (options.mode,)
TERM = TERMINATE
while not TERM:
time.sleep(1)
- remote = remaining_args[0]
- elif options.udp:
+ remote = options.peer_addr
+ elif options.protocol == "udp":
# connect to remote endpoint
- if remaining_args and not remaining_args[0].startswith('-'):
- print >>sys.stderr, "Listening at: %s:%d" % (hostaddr,options.udp)
- print >>sys.stderr, "Connecting to: %s:%d" % (remaining_args[0],options.port)
- rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
- retrydelay = 1.0
- for i in xrange(30):
- if TERMINATE:
- raise OSError, "Killed"
- try:
- rsock.bind((hostaddr,options.udp))
- break
- except socket.error:
- # wait a while, retry
- print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
- time.sleep(min(30.0,retrydelay))
- retrydelay *= 1.1
- else:
- rsock.bind((hostaddr,options.udp))
- rsock.connect((remaining_args[0],options.port))
+ if options.peer_addr and options.peer_port:
+ rsock = tunchannel.udp_establish(TERMINATE, hostaddr, options.port,
+ options.peer_addr, options.peer_port)
+ remote = os.fdopen(rsock.fileno(), 'r+b', 0)
else:
print >>sys.stderr, "Error: need a remote endpoint in UDP mode"
raise AssertionError, "Error: need a remote endpoint in UDP mode"
-
- # Wait for other peer
- tunchannel.udp_handshake(TERMINATE, rsock)
-
- remote = os.fdopen(rsock.fileno(), 'r+b', 0)
- else:
+ elif options.protocol == "tcp":
# connect to remote endpoint
- if remaining_args and not remaining_args[0].startswith('-'):
- print >>sys.stderr, "Connecting to: %s:%d" % (remaining_args[0],options.port)
- rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
- retrydelay = 1.0
- for i in xrange(30):
- if TERMINATE:
- raise OSError, "Killed"
- try:
- rsock.connect((remaining_args[0],options.port))
- break
- except socket.error:
- # wait a while, retry
- print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),)
- time.sleep(min(30.0,retrydelay))
- retrydelay *= 1.1
- else:
- rsock.connect((remaining_args[0],options.port))
+ if options.peer_addr and options.peer_port:
+ rsock = tunchannel.tcp_establish(TERMINATE, hostaddr, options.port,
+ options.peer_addr, options.peer_port)
+ remote = os.fdopen(rsock.fileno(), 'r+b', 0)
else:
- print >>sys.stderr, "Listening at: %s:%d" % (hostaddr,options.port)
- lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
- retrydelay = 1.0
- for i in xrange(30):
- if TERMINATE:
- raise OSError, "Killed"
- try:
- lsock.bind((hostaddr,options.port))
- break
- except socket.error:
- # wait a while, retry
- print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
- time.sleep(min(30.0,retrydelay))
- retrydelay *= 1.1
- else:
- lsock.bind((hostaddr,options.port))
- lsock.listen(1)
- rsock,raddr = lsock.accept()
- remote = os.fdopen(rsock.fileno(), 'r+b', 0)
+ print >>sys.stderr, "Error: need a remote endpoint in TCP mode"
+ raise AssertionError, "Error: need a remote endpoint in TCP mode"
+ else:
+ msg = "Error: Invalid protocol %s" % options.protocol
+ print >>sys.stderr, msg
+ raise AssertionError, msg
if filter_init:
filter_local, filter_remote = filter_init()
# or perhaps there is no os.nice support in the system
pass
- if options.multicast:
- # Start multicast forwarding daemon
- mcastthread = MulticastThread()
- mcastthread.start()
-
if not filter_init:
tun_fwd(tun, remote,
reconnect = reconnect,
self.port = 15000
self.mode = 'pl-tun'
self.key = key
+ self.cross_slice = False
self.home_path = home_path
-
- self._launcher = None
+
self._started = False
- self._started_listening = False
- self._starting = False
+
self._pid = None
self._ppid = None
self._if_name = None
if proc.wait():
raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
-
def _install_scripts(self):
local = self.local()
if proc.wait():
raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
- def launch(self, check_proto, listen, extra_args=[]):
- if self._starting:
- raise AssertionError, "Double start"
-
- self._starting = True
-
+ def launch(self, check_proto):
peer = self.peer()
local = self.local()
peer_port = peer.tun_port
peer_addr = peer.tun_addr
- peer_proto= peer.tun_proto
- peer_cipher=peer.tun_cipher
+ peer_proto = peer.tun_proto
+ peer_cipher = peer.tun_cipher
local_port = self.port
local_cap = local.capture
local_snat = local.snat
local_txq = local.txqueuelen
local_p2p = local.pointopoint
- local_cipher = local.tun_cipher
- local_mcast = local.multicast
- local_bwlim = local.bwlimit
+ local_cipher=local.tun_cipher
+ local_mcast= local.multicast
+ local_bwlim= local.bwlimit
+ local_mcastfwd = local.multicast_forwarder
if not local_p2p and hasattr(peer, 'address'):
local_p2p = peer.address
if local_cipher != peer_cipher:
raise RuntimeError, "Peering cipher mismatch: %s != %s" % (local_cipher, peer_cipher)
- if not listen and ((peer_proto != 'fd' and not peer_port) or not peer_addr):
- raise RuntimeError, "Misconfigured peer: %s" % (peer,)
-
- if listen and ((peer_proto != 'fd' and not local_port) or not local_addr or not local_mask):
- raise RuntimeError, "Misconfigured TUN: %s" % (local,)
-
if check_proto == 'gre' and local_cipher.lower() != 'plain':
raise RuntimeError, "Misconfigured TUN: %s - GRE tunnels do not support encryption. Got %s, you MUST use PLAIN" % (local, local_cipher,)
args = ["python", "tun_connect.py",
"-m", str(self.mode),
+ "-t", str(check_proto),
"-A", str(local_addr),
"-M", str(local_mask),
- "-C", str(local_cipher)]
+ "-C", str(local_cipher),
+ ]
if check_proto == 'fd':
passfd_arg = str(peer_addr)
"--pass-fd", passfd_arg
])
elif check_proto == 'gre':
- "-K", str(min(local_port, peer_port)),
+ if self.cross_slice:
+ args.extend([
+ "-K", str(self.key.strip('='))
+ ])
++
+ args.extend([
+ "-a", str(peer_addr),
+ ])
+ # both udp and tcp
else:
args.extend([
- "-p", str(local_port if listen else peer_port),
+ "-P", str(local_port),
+ "-p", str(peer_port),
+ "-a", str(peer_addr),
"-k", str(self.key)
])
if local_snat:
args.append("-S")
if local_p2p:
- args.extend(("-P",str(local_p2p)))
+ args.extend(("-Z",str(local_p2p)))
if local_txq:
args.extend(("-Q",str(local_txq)))
if not local_cap:
args.append("-N")
elif local_cap == 'pcap':
args.extend(('-c','pcap'))
- if local_mcast:
- args.append("--multicast")
if local_bwlim:
args.extend(("-b",str(local_bwlim*1024)))
- if extra_args:
- args.extend(map(str,extra_args))
- if not listen and check_proto != 'fd':
- args.append(str(peer_addr))
if filter_module:
args.extend(("--filter", filter_module))
if filter_args:
args.extend(("--filter-args", filter_args))
+ if local_mcast and local_mcastfwd:
+ args.extend(("--multicast-forwarder", local_mcastfwd))
self._logger.info("Starting %s", self)
if proc.wait():
raise RuntimeError, "Failed to set up TUN: %s %s" % (out,err,)
-
+
self._started = True
def recover(self):
# Tunnel should be still running in its node
# Just check its pidfile and we're done
self._started = True
- self._started_listening = True
self.checkpid()
- def _launch_and_wait(self, *p, **kw):
- try:
- self.__launch_and_wait(*p, **kw)
- except:
- if self._launcher:
- import sys
- self._launcher._exc.append(sys.exc_info())
- else:
- raise
-
- def __launch_and_wait(self, *p, **kw):
+ def wait(self):
local = self.local()
- self.launch(*p, **kw)
-
- # Wait for the process to be started
- while self.status() == rspawn.NOT_STARTED:
- time.sleep(1.0)
-
# Wait for the connection to be established
retrytime = 2.0
for spin in xrange(30):
# Connected?
(out,err),proc = server.eintr_retry(server.popen_ssh_command)(
- "cd %(home)s ; grep -c Connected capture" % dict(
+ "cd %(home)s ; grep -a -c Connected capture" % dict(
home = server.shell_escape(self.home_path)),
host = local.node.hostname,
port = None,
# At least listening?
(out,err),proc = server.eintr_retry(server.popen_ssh_command)(
- "cd %(home)s ; grep -c Listening capture" % dict(
+ "cd %(home)s ; grep -a -c Listening capture" % dict(
home = server.shell_escape(self.home_path)),
host = local.node.hostname,
port = None,
)
proc.wait()
- if out.strip() == '1':
- self._started_listening = True
-
time.sleep(min(30.0, retrytime))
retrytime *= 1.1
else:
# Inspect the trace to check the assigned iface
local = self.local()
if local:
- cmd = "cd %(home)s ; grep 'Using tun:' capture | head -1" % dict(
+ cmd = "cd %(home)s ; grep -a 'Using tun:' capture | head -1" % dict(
home = server.shell_escape(self.home_path))
for spin in xrange(30):
(out,err),proc = server.eintr_retry(server.popen_ssh_command)(
return True
return False
- def async_launch(self, check_proto, listen, extra_args=[]):
- if not self._started and not self._launcher:
- self._launcher = threading.Thread(
- target = self._launch_and_wait,
- args = (check_proto, listen, extra_args))
- self._launcher._exc = []
- self._launcher.start()
-
- def async_launch_wait(self):
- if self._launcher:
- self._launcher.join()
-
- if self._launcher._exc:
- exctyp,exval,exctrace = self._launcher._exc[0]
- raise exctyp,exval,exctrace
- elif not self._started:
- raise RuntimeError, "Failed to launch TUN forwarder"
- elif not self._started:
- self.launch()
-
- def async_launch_wait_listening(self):
- if self._launcher:
- for x in xrange(180):
- if self._launcher._exc:
- exctyp,exval,exctrace = self._launcher._exc[0]
- raise exctyp,exval,exctrace
- elif self._started and self._started_listening:
- break
- time.sleep(1)
- elif not self._started:
- self.launch()
-
def checkpid(self):
local = self.local()
def remote_trace_path(self, whichtrace, tracemap = None):
tracemap = self._TRACEMAP if not tracemap else tracemap
-
if whichtrace not in tracemap:
return None
return local_path
-
- def prepare(self):
- """
- First-phase setup
-
- eg: set up listening ports
- """
- raise NotImplementedError
-
- def setup(self):
- """
- Second-phase setup
-
- eg: connect to peer
- """
- raise NotImplementedError
-
def shutdown(self):
- """
- Cleanup
- """
- raise NotImplementedError
+ self.kill()
def destroy(self):
- """
- Second-phase cleanup
- """
- pass
-
+ self.waitkill()
class TunProtoUDP(TunProtoBase):
- def __init__(self, local, peer, home_path, key, listening):
+ def __init__(self, local, peer, home_path, key):
super(TunProtoUDP, self).__init__(local, peer, home_path, key)
- self.listening = listening
-
- def prepare(self):
- pass
- def setup(self):
- self.async_launch('udp', False, ("-u",str(self.port)))
-
- def shutdown(self):
- self.kill()
-
- def destroy(self):
- self.waitkill()
-
- def launch(self, check_proto='udp', listen=False, extra_args=None):
- if extra_args is None:
- extra_args = ("-u",str(self.port))
- super(TunProtoUDP, self).launch(check_proto, listen, extra_args)
+ def launch(self):
+ super(TunProtoUDP, self).launch('udp')
class TunProtoFD(TunProtoBase):
- def __init__(self, local, peer, home_path, key, listening):
+ def __init__(self, local, peer, home_path, key):
super(TunProtoFD, self).__init__(local, peer, home_path, key)
- self.listening = listening
-
- def prepare(self):
- pass
- def setup(self):
- self.async_launch('fd', False)
-
- def shutdown(self):
- self.kill()
-
- def destroy(self):
- self.waitkill()
-
- def launch(self, check_proto='fd', listen=False, extra_args=[]):
- super(TunProtoFD, self).launch(check_proto, listen, extra_args)
+ def launch(self):
+ super(TunProtoFD, self).launch('fd')
class TunProtoGRE(TunProtoBase):
- def __init__(self, local, peer, home_path, key, listening):
+ def __init__(self, local, peer, home_path, key):
super(TunProtoGRE, self).__init__(local, peer, home_path, key)
- self.listening = listening
self.mode = 'pl-gre-ip'
-
- def prepare(self):
- pass
-
- def setup(self):
- self.async_launch('gre', False)
-
- def shutdown(self):
- self.kill()
-
- def destroy(self):
- self.waitkill()
- def launch(self, check_proto='gre', listen=False, extra_args=[]):
- super(TunProtoGRE, self).launch(check_proto, listen, extra_args)
+ def launch(self):
+ super(TunProtoGRE, self).launch('gre')
class TunProtoTCP(TunProtoBase):
- def __init__(self, local, peer, home_path, key, listening):
+ def __init__(self, local, peer, home_path, key):
super(TunProtoTCP, self).__init__(local, peer, home_path, key)
- self.listening = listening
-
- def prepare(self):
- if self.listening:
- self.async_launch('tcp', True)
-
- def setup(self):
- if not self.listening:
- # make sure our peer is ready
- peer = self.peer()
- if peer and peer.peer_proto_impl:
- peer.peer_proto_impl.async_launch_wait_listening()
-
- if not self._started:
- self.async_launch('tcp', False)
-
- self.checkpid()
- def shutdown(self):
- self.kill()
-
- def destroy(self):
- self.waitkill()
-
- def launch(self, check_proto='tcp', listen=None, extra_args=[]):
- if listen is None:
- listen = self.listening
- super(TunProtoTCP, self).launch(check_proto, listen, extra_args)
+ def launch(self):
+ super(TunProtoTCP, self).launch('tcp')
class TapProtoUDP(TunProtoUDP):
- def __init__(self, local, peer, home_path, key, listening):
- super(TapProtoUDP, self).__init__(local, peer, home_path, key, listening)
+ def __init__(self, local, peer, home_path, key):
+ super(TapProtoUDP, self).__init__(local, peer, home_path, key)
self.mode = 'pl-tap'
class TapProtoTCP(TunProtoTCP):
- def __init__(self, local, peer, home_path, key, listening):
- super(TapProtoTCP, self).__init__(local, peer, home_path, key, listening)
+ def __init__(self, local, peer, home_path, key):
+ super(TapProtoTCP, self).__init__(local, peer, home_path, key)
self.mode = 'pl-tap'
class TapProtoFD(TunProtoFD):
- def __init__(self, local, peer, home_path, key, listening):
- super(TapProtoFD, self).__init__(local, peer, home_path, key, listening)
+ def __init__(self, local, peer, home_path, key):
+ super(TapProtoFD, self).__init__(local, peer, home_path, key)
self.mode = 'pl-tap'
class TapProtoGRE(TunProtoGRE):
- def __init__(self, local, peer, home_path, key, listening):
- super(TapProtoGRE, self).__init__(local, peer, home_path, key, listening)
+ def __init__(self, local, peer, home_path, key):
+ super(TapProtoGRE, self).__init__(local, peer, home_path, key)
self.mode = 'pl-gre-eth'
-
-
TUN_PROTO_MAP = {
'tcp' : TunProtoTCP,
'udp' : TunProtoUDP,
'gre' : TapProtoGRE,
}
-
import threading
import errno
import fcntl
+ import random
import traceback
import functools
import collections
rv = buf.popleft()
return rv
-def etherStrip(buf):
+def etherStrip(buf, buffer=buffer, len=len):
if len(buf) < 14:
return ""
if buf[12:14] == '\x08\x10' and buf[16:18] == '\x08\x00':
# tagged ethernet frame
- return buf[18:]
+ return buffer(buf, 18)
elif buf[12:14] == '\x08\x00':
# untagged ethernet frame
- return buf[14:]
+ return buffer(buf, 14)
else:
return ""
retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR) ):
crypto_mode = False
crypter = None
+
try:
if cipher_key and cipher:
import Crypto.Cipher
if rread is None:
def rread(remote, maxlen, os_read=os.read):
return os_read(remote_fd, maxlen)
-
+
rnonblock = nonblock(remote)
tnonblock = nonblock(tun)
remoteok = True
+
while not TERMINATE:
wset = []
if packetReady(bkbuf):
if e.args[0] == errno.EINTR:
# just retry
continue
+ else:
+ raise
# check for errors
if errs:
try:
for x in xrange(maxbatch):
packet = rread(remote,2000)
+
#rr += 1
if crypto_mode:
#print >>sys.stderr, "rr:%d\twr:%d\trt:%d\twt:%d" % (rr,wr,rt,wt)
+ def udp_connect(TERMINATE, local_addr, local_port, peer_addr, peer_port):
+ rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
+ retrydelay = 1.0
+ for i in xrange(30):
+ # TERMINATE is a array. An item can be added to TERMINATE, from
+ # outside this function to force termination of the loop
+ if TERMINATE:
+ raise OSError, "Killed"
+ try:
+ rsock.bind((local_addr, local_port))
+ break
+ except socket.error:
+ # wait a while, retry
+ print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
+ time.sleep(min(30.0,retrydelay))
+ retrydelay *= 1.1
+ else:
+ rsock.bind((local_addr, local_port))
+ print >>sys.stderr, "Listening UDP at: %s:%d" % (local_addr, local_port)
+ print >>sys.stderr, "Connecting UDP to: %s:%d" % (peer_addr, peer_port)
+ rsock.connect((peer_addr, peer_port))
+ return rsock
def udp_handshake(TERMINATE, rsock):
endme = False
endme = True
keepalive_thread.join()
+ def udp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
+ rsock = udp_connect(TERMINATE, local_addr, local_port, peer_addr,
+ peer_port)
+ udp_handshake(TERMINATE, rsock)
+ return rsock
+
+ def tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port):
+ sock = None
+ retrydelay = 1.0
+ # The peer has a firewall that prevents a response to the connect, we
+ # will be forever blocked in the connect, so we put a reasonable timeout.
+ rsock.settimeout(10)
+ # We wait for
+ for i in xrange(30):
+ if stop:
+ break
+ if TERMINATE:
+ raise OSError, "Killed"
+ try:
+ rsock.connect((peer_addr, peer_port))
+ sock = rsock
+ break
+ except socket.error:
+ # wait a while, retry
+ print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),)
+ time.sleep(min(30.0,retrydelay))
+ retrydelay *= 1.1
+ else:
+ rsock.connect((peer_addr, peer_port))
+ sock = rsock
+ if sock:
+ print >>sys.stderr, "tcp_connect: TCP sock connected to remote %s:%s" % (peer_addr, peer_port)
+ sock.settimeout(0)
+ return sock
+
+ def tcp_listen(TERMINATE, stop, lsock, local_addr, local_port):
+ sock = None
+ retrydelay = 1.0
+ # We try to bind to the local virtual interface.
+ # It might not exist yet so we wait in a loop.
+ for i in xrange(30):
+ if stop:
+ break
+ if TERMINATE:
+ raise OSError, "Killed"
+ try:
+ lsock.bind((local_addr, local_port))
+ break
+ except socket.error:
+ # wait a while, retry
+ print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
+ time.sleep(min(30.0,retrydelay))
+ retrydelay *= 1.1
+ else:
+ lsock.bind((local_addr, local_port))
+
+ print >>sys.stderr, "tcp_listen: TCP sock listening in local sock %s:%s" % (local_addr, local_port)
+ # Now we wait until the other side connects.
+ # The other side might not be ready yet, so we also wait in a loop for timeouts.
+ timeout = 1
+ lsock.listen(1)
+ for i in xrange(30):
+ if TERMINATE:
+ raise OSError, "Killed"
+ rlist, wlist, xlist = select.select([lsock], [], [], timeout)
+ if stop:
+ break
+ if lsock in rlist:
+ sock,raddr = lsock.accept()
+ print >>sys.stderr, "tcp_listen: TCP connection accepted in local sock %s:%s" % (local_addr, local_port)
+ break
+ timeout += 5
+ return sock
+
+ def tcp_handshake(rsock, listen, hand):
+ # we are going to use a barrier algorithm to decide wich side listen.
+ # each side will "roll a dice" and send the resulting value to the other
+ # side.
+ win = False
+ rsock.settimeout(10)
+ try:
+ rsock.send(hand)
+ peer_hand = rsock.recv(1)
+ print >>sys.stderr, "tcp_handshake: hand %s, peer_hand %s" % (hand, peer_hand)
+ if hand < peer_hand:
+ if listen:
+ win = True
+ elif hand > peer_hand:
+ if not listen:
+ win = True
+ except socket.timeout:
+ pass
+ rsock.settimeout(0)
+ return win
+
+ def tcp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
+ def listen(stop, hand, lsock, lresult):
+ win = False
+ rsock = tcp_listen(TERMINATE, stop, lsock, local_addr, local_port)
+ if rsock:
+ win = tcp_handshake(rsock, True, hand)
+ stop.append(True)
+ lresult.append((win, rsock))
+
+ def connect(stop, hand, rsock, rresult):
+ win = False
+ rsock = tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port)
+ if rsock:
+ win = tcp_handshake(rsock, False, hand)
+ stop.append(True)
+ rresult.append((win, rsock))
+
+ end = False
+ sock = None
+ while not end:
+ if TERMINATE:
+ raise OSError, "Killed"
+ hand = str(random.randint(1, 6))
+ stop = []
+ lresult = []
+ rresult = []
+ lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+ rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+ listen_thread = threading.Thread(target=listen, args=(stop, hand, lsock, lresult))
+ connect_thread = threading.Thread(target=connect, args=(stop, hand, rsock, rresult))
+ connect_thread.start()
+ listen_thread.start()
+ connect_thread.join()
+ listen_thread.join()
+ (lwin, lrsock) = lresult[0]
+ (rwin, rrsock) = rresult[0]
+ if not lrsock or not rrsock:
+ if not lrsock:
+ sock = rrsock
+ if not rrsock:
+ sock = lrsock
+ end = True
+ # both socket are connected
+ else:
+ if lwin:
+ sock = lrsock
+ end = True
+ elif rwin:
+ sock = rrsock
+ end = True
+
+ if not sock:
+ raise OSError, "Error: tcp_establish could not establish connection."
+ return sock
+
+