X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fresources%2Flinux%2Fudptunnel.py;h=5b0eac7c5eab3c3492089766cb2fbc0a04870604;hb=4ec5c9e5454b68a3dab82a5073aee50231706706;hp=01f6898ba2613629b86d55751b1db0a75fb4048a;hpb=88c5b86c62dbaef9e3b2a73bf08b6c21075e12a3;p=nepi.git diff --git a/src/nepi/resources/linux/udptunnel.py b/src/nepi/resources/linux/udptunnel.py index 01f6898b..5b0eac7c 100644 --- a/src/nepi/resources/linux/udptunnel.py +++ b/src/nepi/resources/linux/udptunnel.py @@ -20,7 +20,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.resource import clsinit_copy, ResourceState, \ reschedule_delay -from nepi.resources.linux.application import LinuxApplication +from nepi.resources.linux.tunnel import LinuxTunnel from nepi.util.sshfuncs import ProcStatus from nepi.util.timefuncs import tnow, tdiffsec @@ -29,8 +29,8 @@ import socket import time @clsinit_copy -class UdpTunnel(LinuxApplication): - _rtype = "UdpTunnel" +class LinuxUdpTunnel(LinuxTunnel): + _rtype = "LinuxUdpTunnel" _help = "Constructs a tunnel between two Linux endpoints using a UDP connection " _backend = "linux" @@ -68,62 +68,28 @@ class UdpTunnel(LinuxApplication): cls._register_attribute(bwlimit) def __init__(self, ec, guid): - super(UdpTunnel, self).__init__(ec, guid) + super(LinuxUdpTunnel, self).__init__(ec, guid) self._home = "udp-tunnel-%s" % self.guid - self._pid1 = None - self._ppid1 = None - self._pid2 = None - self._ppid2 = None + self._pids = dict() def log_message(self, msg): - return " guid %d - tunnel %s - %s - %s " % (self.guid, + return " guid %d - udptunnel %s - %s - %s " % (self.guid, self.endpoint1.node.get("hostname"), self.endpoint2.node.get("hostname"), msg) - def get_endpoints(self): - """ Returns the list of RM that are endpoints to the tunnel - """ - connected = [] - for guid in self.connections: - rm = self.ec.get_resource(guid) - if hasattr(rm, "udp_connect_command"): - connected.append(rm) - return connected - - @property - def endpoint1(self): - endpoints = self.get_endpoints() - if endpoints: return endpoints[0] - return None - - @property - def endpoint2(self): - endpoints = self.get_endpoints() - if endpoints and len(endpoints) > 1: return endpoints[1] - return None - - def app_home(self, endpoint): - return os.path.join(endpoint.node.exp_home, self._home) - - def run_home(self, endpoint): - return os.path.join(self.app_home(endpoint), self.ec.run_id) - - def udp_connect(self, endpoint, remote_ip): - # Get udp connect command - local_port_file = os.path.join(self.run_home(endpoint), - "local_port") - remote_port_file = os.path.join(self.run_home(endpoint), - "remote_port") - ret_file = os.path.join(self.run_home(endpoint), - "ret_file") + def initiate_connection(self, endpoint, remote_endpoint): cipher = self.get("cipher") cipher_key = self.get("cipherKey") bwlimit = self.get("bwLimit") txqueuelen = self.get("txQueueLen") + + # Return the command to execute to initiate the connection to the + # other endpoint + connection_run_home = self.run_home(endpoint) udp_connect_command = endpoint.udp_connect_command( - remote_ip, local_port_file, remote_port_file, - ret_file, cipher, cipher_key, bwlimit, txqueuelen) + remote_endpoint, connection_run_home, + cipher, cipher_key, bwlimit, txqueuelen) # upload command to connect.sh script shfile = os.path.join(self.app_home(endpoint), "udp-connect.sh") @@ -158,117 +124,56 @@ class UdpTunnel(LinuxApplication): # wait until port is written to file port = self.wait_local_port(endpoint) - return (port, pid, ppid) - def do_provision(self): - # create run dir for tunnel on each node - self.endpoint1.node.mkdir(self.run_home(self.endpoint1)) - self.endpoint2.node.mkdir(self.run_home(self.endpoint2)) + self._pids[endpoint] = (pid, ppid) - # Invoke connect script in endpoint 1 - remote_ip1 = socket.gethostbyname(self.endpoint2.node.get("hostname")) - (port1, self._pid1, self._ppid1) = self.udp_connect(self.endpoint1, - remote_ip1) + return port - # Invoke connect script in endpoint 2 - remote_ip2 = socket.gethostbyname(self.endpoint1.node.get("hostname")) - (port2, self._pid2, self._ppid2) = self.udp_connect(self.endpoint2, - remote_ip2) + def establish_connection(self, endpoint, remote_endpoint, port): + self.upload_remote_port(endpoint, port) - # upload file with port 2 to endpoint 1 - self.upload_remote_port(self.endpoint1, port2) - - # upload file with port 1 to endpoint 2 - self.upload_remote_port(self.endpoint2, port1) + def verify_connection(self, endpoint, remote_endpoint): + self.wait_result(endpoint) - # check if connection was successful on both sides - self.wait_result(self.endpoint1) - self.wait_result(self.endpoint2) - - self.info("Provisioning finished") - - self.set_provisioned() + def terminate_connection(self, endpoint, remote_endpoint): + pid, ppid = self._pids[endpoint] - def do_deploy(self): - if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \ - (not self.endpoint2 or self.endpoint2.state < ResourceState.READY): - self.ec.schedule(reschedule_delay, self.deploy) - else: - self.do_discover() - self.do_provision() - - self.set_ready() - - def do_start(self): - if self.state == ResourceState.READY: - command = self.get("command") - self.info("Starting command '%s'" % command) - - self.set_started() - else: - msg = " Failed to execute command '%s'" % command - self.error(msg, out, err) - raise RuntimeError, msg + if pid and ppid: + (out, err), proc = endpoint.node.kill(pid, ppid, + sudo = True) - def do_stop(self): - """ Stops application execution - """ - if self.state == ResourceState.STARTED: - self.info("Stopping tunnel") - - # Only try to kill the process if the pid and ppid - # were retrieved - if self._pid1 and self._ppid1 and self._pid2 and self._ppid2: - (out1, err1), proc1 = self.endpoint1.node.kill(self._pid1, - self._ppid1, sudo = True) - (out2, err2), proc2 = self.endpoint2.node.kill(self._pid2, - self._ppid2, sudo = True) - - if (proc1.poll() and err1) or (proc2.poll() and err2): - # check if execution errors occurred - msg = " Failed to STOP tunnel" - self.error(msg, err1, err2) - raise RuntimeError, msg - - self.set_stopped() - - @property - def state(self): - """ Returns the state of the application - """ - if self._state == ResourceState.STARTED: - # In order to avoid overwhelming the remote host and - # the local processor with too many ssh queries, the state is only - # requested every 'state_check_delay' seconds. - state_check_delay = 0.5 - if tdiffsec(tnow(), self._last_state_check) > state_check_delay: - if self._pid1 and self._ppid1 and self._pid2 and self._ppid2: - # Make sure the process is still running in background - # No execution errors occurred. Make sure the background - # process with the recorded pid is still running. - status1 = self.endpoint1.node.status(self._pid1, self._ppid1) - status2 = self.endpoint2.node.status(self._pid2, self._ppid2) - - if status1 == ProcStatus.FINISHED and \ - status2 == ProcStatus.FINISHED: - - # check if execution errors occurred - (out1, err1), proc1 = self.endpoint1.node.check_errors( - self.run_home(self.endpoint1)) - - (out2, err2), proc2 = self.endpoint2.node.check_errors( - self.run_home(self.endpoint2)) - - if err1 or err2: - msg = "Error occurred in tunnel" - self.error(msg, err1, err2) - self.fail() - else: - self.set_stopped() - - self._last_state_check = tnow() - - return self._state + # check if execution errors occurred + if proc.poll() and err: + msg = " Failed to STOP tunnel" + self.error(msg, out, err) + raise RuntimeError, msg + + def check_state_connection(self): + # Make sure the process is still running in background + # No execution errors occurred. Make sure the background + # process with the recorded pid is still running. + pid1, ppid1 = self._pids[self.endpoint1] + pid2, ppid2 = self._pids[self.endpoint2] + + status1 = self.endpoint1.node.status(pid1, ppid1) + status2 = self.endpoint2.node.status(pid2, ppid2) + + if status1 == ProcStatus.FINISHED and \ + status2 == ProcStatus.FINISHED: + + # check if execution errors occurred + (out1, err1), proc1 = self.endpoint1.node.check_errors( + self.run_home(self.endpoint1)) + + (out2, err2), proc2 = self.endpoint2.node.check_errors( + self.run_home(self.endpoint2)) + + if err1 or err2: + msg = "Error occurred in tunnel" + self.error(msg, err1, err2) + self.fail() + else: + self.set_stopped() def wait_local_port(self, endpoint): """ Waits until the local_port file for the endpoint is generated, @@ -313,7 +218,4 @@ class UdpTunnel(LinuxApplication): text = True, overwrite = False) - def valid_connection(self, guid): - # TODO: Validate! - return True