X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fresources%2Flinux%2Fudptunnel.py;h=c00459c871170a7582edecf3aea4dc82bdc7188c;hb=7fed767f9f18ee81807950771145969cbb27b8a7;hp=ba96788f888a06d5541720eee9f93bce72271cec;hpb=cdc84ca63a26fa2b56ee6f71d051decde6afc217;p=nepi.git diff --git a/src/nepi/resources/linux/udptunnel.py b/src/nepi/resources/linux/udptunnel.py index ba96788f..c00459c8 100644 --- a/src/nepi/resources/linux/udptunnel.py +++ b/src/nepi/resources/linux/udptunnel.py @@ -18,9 +18,8 @@ # Author: Alina Quereilhac from nepi.execution.attribute import Attribute, Flags, Types -from nepi.execution.resource import clsinit_copy, ResourceState, \ - reschedule_delay -from nepi.resources.linux.application import LinuxApplication +from nepi.execution.resource import clsinit_copy, ResourceState +from nepi.resources.linux.tunnel import LinuxTunnel from nepi.util.sshfuncs import ProcStatus from nepi.util.timefuncs import tnow, tdiffsec @@ -29,11 +28,10 @@ import socket import time @clsinit_copy -class UdpTunnel(LinuxApplication): - _rtype = "UdpTunnel" +class LinuxUdpTunnel(LinuxTunnel): + _rtype = "linux::UdpTunnel" _help = "Constructs a tunnel between two Linux endpoints using a UDP connection " - _backend = "linux" - + _platform = "linux" @classmethod def _register_attributes(cls): @@ -69,15 +67,12 @@ class UdpTunnel(LinuxApplication): cls._register_attribute(bwlimit) def __init__(self, ec, guid): - super(UdpTunnel, self).__init__(ec, guid) + super(LinuxUdpTunnel, self).__init__(ec, guid) self._home = "udp-tunnel-%s" % self.guid - self._pid1 = None - self._ppid1 = None - self._pid2 = None - self._ppid2 = None + self._pids = dict() def log_message(self, msg): - return " guid %d - tunnel %s - %s - %s " % (self.guid, + return " guid %d - udptunnel %s - %s - %s " % (self.guid, self.endpoint1.node.get("hostname"), self.endpoint2.node.get("hostname"), msg) @@ -88,188 +83,65 @@ class UdpTunnel(LinuxApplication): connected = [] for guid in self.connections: rm = self.ec.get_resource(guid) - if hasattr(rm, "udp_connect_command"): + if hasattr(rm, "initiate_udp_connection"): connected.append(rm) return connected - @property - def endpoint1(self): - endpoints = self.get_endpoints() - if endpoints: return endpoints[0] - return None - - @property - def endpoint2(self): - endpoints = self.get_endpoints() - if endpoints and len(endpoints) > 1: return endpoints[1] - return None - - def app_home(self, endpoint): - return os.path.join(endpoint.node.exp_home, self._home) - - def run_home(self, endpoint): - return os.path.join(self.app_home(endpoint), self.ec.run_id) - - def udp_connect(self, endpoint, remote_ip): - # Get udp connect command - local_port_file = os.path.join(self.run_home(endpoint), - "local_port") - remote_port_file = os.path.join(self.run_home(endpoint), - "remote_port") - ret_file = os.path.join(self.run_home(endpoint), - "ret_file") + def initiate_connection(self, endpoint, remote_endpoint): cipher = self.get("cipher") cipher_key = self.get("cipherKey") bwlimit = self.get("bwLimit") txqueuelen = self.get("txQueueLen") - udp_connect_command = endpoint.udp_connect_command( - remote_ip, local_port_file, remote_port_file, - ret_file, cipher, cipher_key, bwlimit, txqueuelen) + connection_app_home = self.app_home(endpoint) + connection_run_home = self.run_home(endpoint) - # upload command to connect.sh script - shfile = os.path.join(self.app_home(endpoint), "udp-connect.sh") - endpoint.node.upload(udp_connect_command, - shfile, - text = True, - overwrite = False) + port = endpoint.initiate_udp_connection( + remote_endpoint, + connection_app_home, + connection_run_home, + cipher, cipher_key, bwlimit, txqueuelen) - # invoke connect script - cmd = "bash %s" % shfile - (out, err), proc = endpoint.node.run(cmd, self.run_home(endpoint)) - - # check if execution errors occurred - msg = " Failed to connect endpoints " - - if proc.poll(): - self.error(msg, out, err) - raise RuntimeError, msg - - # Wait for pid file to be generated - pid, ppid = endpoint.node.wait_pid(self.run_home(endpoint)) - - # If the process is not running, check for error information - # on the remote machine - if not pid or not ppid: - (out, err), proc = endpoint.node.check_errors(self.run_home(endpoint)) - # Out is what was written in the stderr file - if err: - msg = " Failed to start command '%s' " % command - self.error(msg, out, err) - raise RuntimeError, msg + return port - # wait until port is written to file - port = self.wait_local_port(endpoint) - return (port, pid, ppid) + def establish_connection(self, endpoint, remote_endpoint, port): + connection_app_home = self.app_home(endpoint) + connection_run_home = self.run_home(endpoint) - def do_provision(self): - # create run dir for tunnel on each node - self.endpoint1.node.mkdir(self.run_home(self.endpoint1)) - self.endpoint2.node.mkdir(self.run_home(self.endpoint2)) + endpoint.establish_udp_connection(remote_endpoint, + connection_app_home, + connection_run_home, + port) - # Invoke connect script in endpoint 1 - remote_ip1 = socket.gethostbyname(self.endpoint2.node.get("hostname")) - (port1, self._pid1, self._ppid1) = self.udp_connect(self.endpoint1, - remote_ip1) + def verify_connection(self, endpoint, remote_endpoint): + endpoint.verify_connection() - # Invoke connect script in endpoint 2 - remote_ip2 = socket.gethostbyname(self.endpoint1.node.get("hostname")) - (port2, self._pid2, self._ppid2) = self.udp_connect(self.endpoint2, - remote_ip2) + def terminate_connection(self, endpoint, remote_endpoint): + endpoint.terminate_connection() - # upload file with port 2 to endpoint 1 - self.upload_remote_port(self.endpoint1, port2) - - # upload file with port 1 to endpoint 2 - self.upload_remote_port(self.endpoint2, port1) + def check_state_connection(self): + # Make sure the process is still running in background + # No execution errors occurred. Make sure the background + # process with the recorded pid is still running. - # check if connection was successful on both sides - self.wait_result(self.endpoint1) - self.wait_result(self.endpoint2) - - self.info("Provisioning finished") - - self.set_provisioned() + status1 = self.endpoint1.check_status() + status2 = self.endpoint2.check_status() - def do_deploy(self): - if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \ - (not self.endpoint2 or self.endpoint2.state < ResourceState.READY): - self.ec.schedule(reschedule_delay, self.deploy) - else: - self.do_discover() - self.do_provision() - - self.set_ready() + if status1 == ProcStatus.FINISHED and \ + status2 == ProcStatus.FINISHED: - def do_start(self): - if self.state == ResourceState.READY: - command = self.get("command") - self.info("Starting command '%s'" % command) - - self.set_started() - else: - msg = " Failed to execute command '%s'" % command - self.error(msg, out, err) - raise RuntimeError, msg - - def do_stop(self): - """ Stops application execution - """ - if self.state == ResourceState.STARTED: - self.info("Stopping tunnel") - - # Only try to kill the process if the pid and ppid - # were retrieved - if self._pid1 and self._ppid1 and self._pid2 and self._ppid2: - (out1, err1), proc1 = self.endpoint1.node.kill(self._pid1, - self._ppid1, sudo = True) - (out2, err2), proc2 = self.endpoint2.node.kill(self._pid2, - self._ppid2, sudo = True) - - if err1 or err2 or proc1.poll() or proc2.poll(): - # check if execution errors occurred - msg = " Failed to STOP tunnel" - self.error(msg, err1, err2) - raise RuntimeError, msg - - self.set_stopped() - - @property - def state(self): - """ Returns the state of the application - """ - if self._state == ResourceState.STARTED: - # In order to avoid overwhelming the remote host and - # the local processor with too many ssh queries, the state is only - # requested every 'state_check_delay' seconds. - state_check_delay = 0.5 - if tdiffsec(tnow(), self._last_state_check) > state_check_delay: - if self._pid1 and self._ppid1 and self._pid2 and self._ppid2: - # Make sure the process is still running in background - # No execution errors occurred. Make sure the background - # process with the recorded pid is still running. - status1 = self.endpoint1.node.status(self._pid1, self._ppid1) - status2 = self.endpoint2.node.status(self._pid2, self._ppid2) + # check if execution errors occurred + (out1, err1), proc1 = self.endpoint1.node.check_errors( + self.run_home(self.endpoint1)) - if status1 == ProcStatus.FINISHED and \ - status2 == ProcStatus.FINISHED: + (out2, err2), proc2 = self.endpoint2.node.check_errors( + self.run_home(self.endpoint2)) - # check if execution errors occurred - (out1, err1), proc1 = self.endpoint1.node.check_errors( - self.run_home(self.endpoint1)) - - (out2, err2), proc2 = self.endpoint2.node.check_errors( - self.run_home(self.endpoint2)) - - if err1 or err2: - msg = "Error occurred in tunnel" - self.error(msg, err1, err2) - self.fail() - else: - self.set_stopped() - - self._last_state_check = tnow() - - return self._state + if err1 or err2: + msg = "Error occurred in tunnel" + self.error(msg, err1, err2) + self.fail() + else: + self.set_stopped() def wait_local_port(self, endpoint): """ Waits until the local_port file for the endpoint is generated, @@ -289,7 +161,7 @@ class UdpTunnel(LinuxApplication): result = None delay = 1.0 - for i in xrange(4): + for i in xrange(20): (out, err), proc = endpoint.node.check_output( self.run_home(endpoint), filename) @@ -306,15 +178,3 @@ class UdpTunnel(LinuxApplication): return result - def upload_remote_port(self, endpoint, port): - # upload remote port number to file - port = "%s\n" % port - endpoint.node.upload(port, - os.path.join(self.run_home(endpoint), "remote_port"), - text = True, - overwrite = False) - - def valid_connection(self, guid): - # TODO: Validate! - return True -