# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import clsinit_copy, ResourceState, \
- reschedule_delay
-from nepi.resources.linux.application import LinuxApplication
+from nepi.execution.resource import clsinit_copy, ResourceState
+from nepi.resources.linux.tunnel import LinuxTunnel
from nepi.util.sshfuncs import ProcStatus
from nepi.util.timefuncs import tnow, tdiffsec
import time
@clsinit_copy
-class UdpTunnel(LinuxApplication):
- _rtype = "UdpTunnel"
+class LinuxUdpTunnel(LinuxTunnel):
+ _rtype = "linux::UdpTunnel"
_help = "Constructs a tunnel between two Linux endpoints using a UDP connection "
_backend = "linux"
-
@classmethod
def _register_attributes(cls):
cipher = Attribute("cipher",
cls._register_attribute(bwlimit)
def __init__(self, ec, guid):
- super(UdpTunnel, self).__init__(ec, guid)
+ super(LinuxUdpTunnel, self).__init__(ec, guid)
self._home = "udp-tunnel-%s" % self.guid
- self._pid1 = None
- self._ppid1 = None
- self._pid2 = None
- self._ppid2 = None
+ self._pids = dict()
def log_message(self, msg):
- return " guid %d - tunnel %s - %s - %s " % (self.guid,
+ return " guid %d - udptunnel %s - %s - %s " % (self.guid,
self.endpoint1.node.get("hostname"),
self.endpoint2.node.get("hostname"),
msg)
connected = []
for guid in self.connections:
rm = self.ec.get_resource(guid)
- if hasattr(rm, "udp_connect_command"):
+ if hasattr(rm, "initiate_udp_connection"):
connected.append(rm)
return connected
- @property
- def endpoint1(self):
- endpoints = self.get_endpoints()
- if endpoints: return endpoints[0]
- return None
-
- @property
- def endpoint2(self):
- endpoints = self.get_endpoints()
- if endpoints and len(endpoints) > 1: return endpoints[1]
- return None
-
- def app_home(self, endpoint):
- return os.path.join(endpoint.node.exp_home, self._home)
-
- def run_home(self, endpoint):
- return os.path.join(self.app_home(endpoint), self.ec.run_id)
-
- def udp_connect(self, endpoint, remote_ip):
- # Get udp connect command
- local_port_file = os.path.join(self.run_home(endpoint),
- "local_port")
- remote_port_file = os.path.join(self.run_home(endpoint),
- "remote_port")
- ret_file = os.path.join(self.run_home(endpoint),
- "ret_file")
+ def initiate_connection(self, endpoint, remote_endpoint):
cipher = self.get("cipher")
cipher_key = self.get("cipherKey")
bwlimit = self.get("bwLimit")
txqueuelen = self.get("txQueueLen")
- udp_connect_command = endpoint.udp_connect_command(
- remote_ip, local_port_file, remote_port_file,
- ret_file, cipher, cipher_key, bwlimit, txqueuelen)
-
- # upload command to connect.sh script
- shfile = os.path.join(self.app_home(endpoint), "udp-connect.sh")
- endpoint.node.upload(udp_connect_command,
- shfile,
- text = True,
- overwrite = False)
-
- # invoke connect script
- cmd = "bash %s" % shfile
- (out, err), proc = endpoint.node.run(cmd, self.run_home(endpoint))
-
- # check if execution errors occurred
- msg = " Failed to connect endpoints "
-
- if proc.poll():
- self.error(msg, out, err)
- raise RuntimeError, msg
-
- # Wait for pid file to be generated
- pid, ppid = endpoint.node.wait_pid(self.run_home(endpoint))
-
- # If the process is not running, check for error information
- # on the remote machine
- if not pid or not ppid:
- (out, err), proc = endpoint.node.check_errors(self.run_home(endpoint))
- # Out is what was written in the stderr file
- if err:
- msg = " Failed to start command '%s' " % command
- self.error(msg, out, err)
- raise RuntimeError, msg
-
- # wait until port is written to file
- port = self.wait_local_port(endpoint)
- return (port, pid, ppid)
-
- def do_provision(self):
- # create run dir for tunnel on each node
- self.endpoint1.node.mkdir(self.run_home(self.endpoint1))
- self.endpoint2.node.mkdir(self.run_home(self.endpoint2))
-
- # Invoke connect script in endpoint 1
- remote_ip1 = socket.gethostbyname(self.endpoint2.node.get("hostname"))
- (port1, self._pid1, self._ppid1) = self.udp_connect(self.endpoint1,
- remote_ip1)
-
- # Invoke connect script in endpoint 2
- remote_ip2 = socket.gethostbyname(self.endpoint1.node.get("hostname"))
- (port2, self._pid2, self._ppid2) = self.udp_connect(self.endpoint2,
- remote_ip2)
+ connection_app_home = self.app_home(endpoint)
+ connection_run_home = self.run_home(endpoint)
- # upload file with port 2 to endpoint 1
- self.upload_remote_port(self.endpoint1, port2)
-
- # upload file with port 1 to endpoint 2
- self.upload_remote_port(self.endpoint2, port1)
+ port = endpoint.initiate_udp_connection(
+ remote_endpoint,
+ connection_app_home,
+ connection_run_home,
+ cipher, cipher_key, bwlimit, txqueuelen)
- # check if connection was successful on both sides
- self.wait_result(self.endpoint1)
- self.wait_result(self.endpoint2)
-
- self.info("Provisioning finished")
-
- self.set_provisioned()
+ return port
- def do_deploy(self):
- if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \
- (not self.endpoint2 or self.endpoint2.state < ResourceState.READY):
- self.ec.schedule(reschedule_delay, self.deploy)
- else:
- self.do_discover()
- self.do_provision()
-
- self.debug("----- READY ---- ")
- self.set_ready()
+ def establish_connection(self, endpoint, remote_endpoint, port):
+ endpoint.establish_udp_connection(remote_endpoint, port)
- def do_start(self):
- if self.state == ResourceState.READY:
- command = self.get("command")
- self.info("Starting command '%s'" % command)
-
- self.set_started()
- else:
- msg = " Failed to execute command '%s'" % command
- self.error(msg, out, err)
- raise RuntimeError, msg
+ def verify_connection(self, endpoint, remote_endpoint):
+ endpoint.verify_connection()
- def do_stop(self):
- """ Stops application execution
- """
- if self.state == ResourceState.STARTED:
- self.info("Stopping tunnel")
-
- # Only try to kill the process if the pid and ppid
- # were retrieved
- if self._pid1 and self._ppid1 and self._pid2 and self._ppid2:
- (out1, err1), proc1 = self.endpoint1.node.kill(self._pid1,
- self._ppid1, sudo = True)
- (out2, err2), proc2 = self.endpoint2.node.kill(self._pid2,
- self._ppid2, sudo = True)
+ def terminate_connection(self, endpoint, remote_endpoint):
+ endpoint.terminate_connection()
- if err1 or err2 or proc1.poll() or proc2.poll():
- # check if execution errors occurred
- msg = " Failed to STOP tunnel"
- self.error(msg, err1, err2)
- raise RuntimeError, msg
+ def check_state_connection(self):
+ # Make sure the process is still running in background
+ # No execution errors occurred. Make sure the background
+ # process with the recorded pid is still running.
- self.set_stopped()
+ status1 = self.endpoint1.check_status()
+ status2 = self.endpoint2.check_status()
- @property
- def state(self):
- """ Returns the state of the application
- """
- if self._state == ResourceState.STARTED:
- # In order to avoid overwhelming the remote host and
- # the local processor with too many ssh queries, the state is only
- # requested every 'state_check_delay' seconds.
- state_check_delay = 0.5
- if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
- if self._pid1 and self._ppid1 and self._pid2 and self._ppid2:
- # Make sure the process is still running in background
- # No execution errors occurred. Make sure the background
- # process with the recorded pid is still running.
- status1 = self.endpoint1.node.status(self._pid1, self._ppid1)
- status2 = self.endpoint2.node.status(self._pid2, self._ppid2)
-
- if status1 == ProcStatus.FINISHED and \
- status2 == ProcStatus.FINISHED:
-
- # check if execution errors occurred
- (out1, err1), proc1 = self.endpoint1.node.check_errors(
- self.run_home(self.endpoint1))
-
- (out2, err2), proc2 = self.endpoint2.node.check_errors(
- self.run_home(self.endpoint2))
+ if status1 == ProcStatus.FINISHED and \
+ status2 == ProcStatus.FINISHED:
- if err1 or err2:
- msg = "Error occurred in tunnel"
- self.error(msg, err1, err2)
- self.fail()
- else:
- self.set_stopped()
+ # check if execution errors occurred
+ (out1, err1), proc1 = self.endpoint1.node.check_errors(
+ self.run_home(self.endpoint1))
- self._last_state_check = tnow()
+ (out2, err2), proc2 = self.endpoint2.node.check_errors(
+ self.run_home(self.endpoint2))
- return self._state
+ if err1 or err2:
+ msg = "Error occurred in tunnel"
+ self.error(msg, err1, err2)
+ self.fail()
+ else:
+ self.set_stopped()
def wait_local_port(self, endpoint):
""" Waits until the local_port file for the endpoint is generated,
result = None
delay = 1.0
- for i in xrange(4):
+ for i in xrange(20):
(out, err), proc = endpoint.node.check_output(
self.run_home(endpoint), filename)
return result
- def upload_remote_port(self, endpoint, port):
- # upload remote port number to file
- port = "%s\n" % port
- endpoint.node.upload(port,
- os.path.join(self.run_home(endpoint), "remote_port"),
- text = True,
- overwrite = False)
-
- def valid_connection(self, guid):
- # TODO: Validate!
- return True
-