# requested every 'state_check_delay' seconds.
state_check_delay = 0.5
if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
- # check if execution errors occurred
- (out, err), proc = self.node.check_errors(self.run_home)
-
- if err:
- msg = " Failed to execute command '%s'" % self.get("command")
- self.error(msg, out, err)
- self.fail()
-
- elif self.pid and self.ppid:
- # No execution errors occurred. Make sure the background
- # process with the recorded pid is still running.
+ if self.pid and self.ppid:
+ # Make sure the process is still running in background
status = self.node.status(self.pid, self.ppid)
if status == ProcStatus.FINISHED:
- self._state = ResourceState.FINISHED
+ # If the program finished, check if execution
+ # errors occurred
+ (out, err), proc = self.node.check_errors(
+ self.run_home)
+
+ if err:
+ msg = " Failed to execute command '%s'" % \
+ self.get("command")
+ self.error(msg, out, err)
+ self.fail()
+ else:
+ self._state = ResourceState.FINISHED
self._last_state_check = tnow()
super(LinuxNode, self).deploy()
def release(self):
+ # Node needs to wait until all associated RMs are released
+ # to be released
+ rms = self.get_connected()
+ for rm in rms:
+ if rm.state < ResourceState.STOPPED:
+ self.ec.schedule(reschedule_delay, self.release)
+ return
+
tear_down = self.get("tearDown")
if tear_down:
self.execute(tear_down)
from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
reschedule_delay
from nepi.resources.linux.application import LinuxApplication
+from nepi.util.sshfuncs import ProcStatus
from nepi.util.timefuncs import tnow, tdiffsec
import os
class UdpTunnel(LinuxApplication):
_rtype = "UdpTunnel"
+ @classmethod
+ def _register_attributes(cls):
+ cipher = Attribute("cipher",
+ "Cipher to encript communication. "
+ "One of PLAIN, AES, Blowfish, DES, DES3. ",
+ default = None,
+ allowed = ["PLAIN", "AES", "Blowfish", "DES", "DES3"],
+ type = Types.Enumerate,
+ flags = Flags.ExecReadOnly)
+
+ cipher_key = Attribute("cipherKey",
+ "Specify a symmetric encryption key with which to protect "
+ "packets across the tunnel. python-crypto must be installed "
+ "on the system." ,
+ flags = Flags.ExecReadOnly)
+
+ txqueuelen = Attribute("txQueueLen",
+ "Specifies the interface's transmission queue length. "
+ "Defaults to 1000. ",
+ type = Types.Integer,
+ flags = Flags.ExecReadOnly)
+
+ bwlimit = Attribute("bwLimit",
+ "Specifies the interface's emulated bandwidth in bytes "
+ "per second.",
+ type = Types.Integer,
+ flags = Flags.ExecReadOnly)
+
+ cls._register_attribute(cipher)
+ cls._register_attribute(cipher_key)
+ cls._register_attribute(txqueuelen)
+ cls._register_attribute(bwlimit)
+
def __init__(self, ec, guid):
super(UdpTunnel, self).__init__(ec, guid)
self._home = "udp-tunnel-%s" % self.guid
"remote_port")
ret_file = os.path.join(self.run_home(endpoint),
"ret_file")
+ cipher = self.get("cipher")
+ cipher_key = self.get("cipherKey")
+ bwlimit = self.get("bwLimit")
+ txqueuelen = self.get("txQueueLen")
udp_connect_command = endpoint.udp_connect_command(
remote_ip, local_port_file, remote_port_file,
- ret_file)
+ ret_file, cipher, cipher_key, bwlimit, txqueuelen)
# upload command to connect.sh script
shfile = os.path.join(self.app_home(endpoint), "udp-connect.sh")
self._state = ResourceState.FAILED
raise RuntimeError, msg
- def stop(self):
- command = self.get('command') or ''
- state = self.state
-
- if state == ResourceState.STARTED:
- self.info("Stopping command '%s'" % command)
-
- command = "bash %s" % os.path.join(self.app_home, "stop.sh")
- (out, err), proc = self.execute_command(command,
- blocking = True)
-
- self._stop_time = tnow()
- self._state = ResourceState.STOPPED
-
+ # XXX: Leaves process unkilled!!
+ # Implement another mechanism to kill the tunnel!
def stop(self):
""" Stops application execution
"""
if err1 or err2 or pro1.poll() or proc2.poll():
# check if execution errors occurred
msg = " Failed to STOP tunnel"
- self.error(msg, out, err)
+ self.error(msg, err1, err2)
self.fail()
stopped = False
# requested every 'state_check_delay' seconds.
state_check_delay = 0.5
if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
- # check if execution errors occurred
- (out1, err1), proc1 = self.endpoint1.node.check_errors(
- self.run_home(self.endpoint1))
-
- (out2, err2), proc2 = self.endpoint2.node.check_errors(
- self.run_home(self.endpoint2))
-
- if err1 or err2:
- msg = " Failed to connect endpoints "
- self.error(msg, err1, err2)
- self.fail()
-
- elif self._pid1 and self._ppid1 and self._pid2 and self._ppid2:
+ if self._pid1 and self._ppid1 and self._pid2 and self._ppid2:
+ # Make sure the process is still running in background
# No execution errors occurred. Make sure the background
# process with the recorded pid is still running.
- status1 = self.node.status(self._pid1, self._ppid1)
- status2 = self.node.status(self._pid2, self._ppid2)
+ status1 = self.endpoint1.node.status(self._pid1, self._ppid1)
+ status2 = self.endpoint2.node.status(self._pid2, self._ppid2)
if status1 == ProcStatus.FINISHED and \
- satus2 == ProcStatus.FINISHED:
- self._state = ResourceState.FINISHED
+ status2 == ProcStatus.FINISHED:
+
+ # check if execution errors occurred
+ (out1, err1), proc1 = self.endpoint1.node.check_errors(
+ self.run_home(self.endpoint1))
+
+ (out2, err2), proc2 = self.endpoint2.node.check_errors(
+ self.run_home(self.endpoint2))
+
+ if err1 or err2:
+ msg = "Error occurred in tunnel"
+ self.error(msg, err1, err2)
+ self.fail()
+ else:
+ self._state = ResourceState.FINISHED
self._last_state_check = tnow()
# TODO: Validate!
return True
- def blacklist(self):
- # TODO!!!!
- self.warn(" Blacklisting malfunctioning node ")
- #import util
- #util.appendBlacklist(self.hostname)
-
def get_options():
usage = ("usage: %prog -t <vif-type> -S <fd-socket-name> "
- "-l <local-port-file> -r <remote-port-file> -H <remote-host> "
- "-R <ret-file> ")
+ "-b <bwlimit> -c <cipher> -k <cipher-key> -q <txqueuelen> "
+ "-l <local-port-file> -r <remote-port-file> -H <remote-host> "
+ "-R <ret-file> ")
parser = OptionParser(usage = usage)
parser.add_option("-S", "--fd-socket-name", dest="fd_socket_name",
help = "Name for the unix socket to request the TAP file descriptor",
default = "tap.sock", type="str")
+
+ parser.add_option("-b", "--bwlimit", dest="bwlimit",
+ help = "Specifies the interface's emulated bandwidth in bytes ",
+ default = None, type="int")
+ parser.add_option("-q", "--txqueuelen", dest="txqueuelen",
+ help = "Specifies the interface's transmission queue length. ",
+ default = 1000, type="int")
+ parser.add_option("-c", "--cipher", dest="cipher",
+ help = "Cipher to encript communication. "
+ "One of PLAIN, AES, Blowfish, DES, DES3. ",
+ default = None, type="str")
+ parser.add_option("-k", "--cipher-key", dest="cipher_key",
+ help = "Specify a symmetric encryption key with which to protect "
+ "packets across the tunnel. python-crypto must be installed "
+ "on the system." ,
+ default = None, type="str")
+
parser.add_option("-l", "--local-port-file", dest="local_port_file",
help = "File where to store the local binded UDP port number ",
default = "local_port_file", type="str")
vif_type = vsys.IFF_TUN
return ( vif_type, options.fd_socket_name, options.local_port_file,
- options.remote_port_file, options.remote_host,
- options.ret_file )
+ options.remote_port_file, options.remote_host, options.ret_file,
+ options.bwlimit, options.cipher, options.cipher_key,
+ options.txqueuelen )
if __name__ == '__main__':
( vif_type, socket_name, local_port_file, remote_port_file,
- remote_host, ret_file ) = get_options()
+ remote_host, ret_file, bwlimit, cipher, cipher_key, txqueuelen
+ ) = get_options()
# Get the file descriptor of the TAP device from the process
# that created it
while not os.path.exists(remote_port_file):
time.sleep(2)
+ remote_port = ''
# Read remote port from file
- f = open(remote_port_file, 'r')
- remote_port = f.read()
- f.close()
+ # Try until something is read...
+ # xxx: There seems to be a weird behavior where
+ # even if the file exists and had the port number,
+ # the read operation returns empty string!
+ # Maybe a raise condition?
+ for i in xrange(10):
+ f = open(remote_port_file, 'r')
+ remote_port = f.read()
+ f.close()
+
+ if remote_port:
+ break
+
+ time.sleep(2)
remote_port = remote_port.strip()
remote_port = int(remote_port)
f.close()
# Establish tunnel
- # TODO: ADD parameters tunqueue, tunkqueue, cipher_key
tunchannel.tun_fwd(tun, remote,
- # Planetlab TAP devices add PI headers
- with_pi = True,
+ with_pi = True, # Planetlab TAP devices add PI headers
ether_mode = (vif_type == vsys.IFF_TAP),
- cipher_key = None,
udp = True,
+ cipher_key = cipher_key,
+ cipher = cipher,
TERMINATE = TERMINATE,
SUSPEND = SUSPEND,
- tunqueue = 1000,
+ tunqueue = txqueuelen,
tunkqueue = 500,
+ bwlimit = bwlimit
)
return if_name
def udp_connect_command(self, remote_ip, local_port_file,
- remote_port_file, ret_file):
+ remote_port_file, ret_file, cipher, cipher_key,
+ bwlimit, txqueuelen):
command = ["sudo -S "]
command.append("PYTHONPATH=$PYTHONPATH:${SRC}")
command.append("python ${SRC}/pl-vif-udp-connect.py")
command.append("-r %s " % remote_port_file)
command.append("-H %s " % remote_ip)
command.append("-R %s " % ret_file)
+ if cipher:
+ command.append("-c %s " % cipher)
+ if cipher_key:
+ command.append("-k %s " % cipher_key)
+ if txqueuelen:
+ command.append("-q %s " % txqueuelen)
+ if bwlimit:
+ command.append("-b %s " % bwlimit)
command = " ".join(command)
command = self.replace_paths(command)
if_name = ec.get(tap2, "deviceName")
self.assertTrue(if_name.startswith("tap"))
+ ec.shutdown()
+
+ @skipIfAnyNotAlive
+ def t_tun_udp_tunnel(self, user, host1, host2):
+
+ ec = ExperimentController(exp_id = "test-tap-udp-tunnel")
+
+ node1 = ec.register_resource("PlanetlabNode")
+ ec.set(node1, "hostname", host1)
+ ec.set(node1, "username", user)
+ ec.set(node1, "cleanHome", True)
+ ec.set(node1, "cleanProcesses", True)
+
+ tun1 = ec.register_resource("PlanetlabTun")
+ ec.set(tun1, "ip4", "192.168.1.1")
+ ec.set(tun1, "pointopoint", "192.168.1.2")
+ ec.set(tun1, "prefix4", 24)
+ ec.register_connection(tun1, node1)
+
+ node2 = ec.register_resource("PlanetlabNode")
+ ec.set(node2, "hostname", host2)
+ ec.set(node2, "username", user)
+ ec.set(node2, "cleanHome", True)
+ ec.set(node2, "cleanProcesses", True)
+
+ tun2 = ec.register_resource("PlanetlabTun")
+ ec.set(tun2, "ip4", "192.168.1.2")
+ ec.set(tun2, "pointopoint", "192.168.1.1")
+ ec.set(tun2, "prefix4", 24)
+ ec.register_connection(tun2, node2)
+
+ udptun = ec.register_resource("UdpTunnel")
+ ec.register_connection(tun1, udptun)
+ ec.register_connection(tun2, udptun)
+
+ app = ec.register_resource("LinuxApplication")
+ cmd = "ping -c3 192.168.1.2"
+ ec.set(app, "command", cmd)
+ ec.register_connection(app, node1)
+
+ ec.deploy()
+
+ ec.wait_finished(app)
+
+ ping = ec.trace(app, 'stdout')
+ expected = """3 packets transmitted, 3 received, 0% packet loss"""
+ self.assertTrue(ping.find(expected) > -1)
+
+ if_name = ec.get(tun1, "deviceName")
+ self.assertTrue(if_name.startswith("tun"))
+
+ if_name = ec.get(tun2, "deviceName")
+ self.assertTrue(if_name.startswith("tun"))
ec.shutdown()
def test_tap_udp_tunnel(self):
self.t_tap_udp_tunnel(self.user, self.host1, self.host2)
+ def test_tun_udp_tunnel(self):
+ self.t_tun_udp_tunnel(self.user, self.host1, self.host2)
+
if __name__ == '__main__':
unittest.main()