X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fresources%2Flinux%2Fnode.py;h=cc94daa88e744fee0c6644be132a1a0c3b58ae31;hb=35af1f1f8393dc9663b2a8be8a4c2b1d78f03bc1;hp=67fbbbcb3df89d12e66b2ef6535f0df21bf06db1;hpb=450b5dd0a993f63eb2ec34bbc656c558572eb44c;p=nepi.git diff --git a/src/nepi/resources/linux/node.py b/src/nepi/resources/linux/node.py index 67fbbbcb..cc94daa8 100644 --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@ -19,7 +19,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.resource import ResourceManager, clsinit_copy, \ - ResourceState, reschedule_delay, failtrap + ResourceState, reschedule_delay from nepi.resources.linux import rpmfuncs, debfuncs from nepi.util import sshfuncs, execfuncs from nepi.util.sshfuncs import ProcStatus @@ -188,6 +188,12 @@ class LinuxNode(ResourceManager): "releasing the resource", flags = Flags.ExecReadOnly) + gateway_user = Attribute("gatewayUser", "Gateway account username", + flags = Flags.ExecReadOnly) + + gateway = Attribute("gateway", "Hostname of the gateway machine", + flags = Flags.ExecReadOnly) + cls._register_attribute(hostname) cls._register_attribute(username) cls._register_attribute(port) @@ -198,13 +204,15 @@ class LinuxNode(ResourceManager): cls._register_attribute(clean_experiment) cls._register_attribute(clean_processes) cls._register_attribute(tear_down) + cls._register_attribute(gateway_user) + cls._register_attribute(gateway) def __init__(self, ec, guid): super(LinuxNode, self).__init__(ec, guid) self._os = None # home directory at Linux host self._home_dir = "" - + # lock to prevent concurrent applications on the same node, # to execute commands at the same time. There are potential # concurrency issues when using SSH to a same host from @@ -298,24 +306,16 @@ class LinuxNode(ResourceManager): # To work arround this, repeat the operation N times or # until the result is not empty string out = "" - retrydelay = 1.0 - for i in xrange(2): - try: - (out, err), proc = self.execute("cat /etc/issue", - retry = 5, - with_lock = True, - blocking = True) - - if out.strip() != "": - return out - except: - trace = traceback.format_exc() - msg = "Error detecting OS: %s " % trace - self.error(msg, out, err) - return False - - time.sleep(min(30.0, retrydelay)) - retrydelay *= 1.5 + try: + (out, err), proc = self.execute("cat /etc/issue", + with_lock = True, + blocking = True) + except: + trace = traceback.format_exc() + msg = "Error detecting OS: %s " % trace + self.error(msg, out, err) + + return out @property def use_deb(self): @@ -330,8 +330,7 @@ class LinuxNode(ResourceManager): def localhost(self): return self.get("hostname") in ['localhost', '127.0.0.7', '::1'] - @failtrap - def provision(self): + def do_provision(self): # check if host is alive if not self.is_alive(): msg = "Deploy failed. Unresponsive node %s" % self.get("hostname") @@ -358,72 +357,76 @@ class LinuxNode(ResourceManager): # Create experiment node home directory self.mkdir(self.node_home) - super(LinuxNode, self).provision() + super(LinuxNode, self).do_provision() - @failtrap - def deploy(self): + def do_deploy(self): if self.state == ResourceState.NEW: self.info("Deploying node") - self.discover() - self.provision() + self.do_discover() + self.do_provision() # Node needs to wait until all associated interfaces are # ready before it can finalize deployment from nepi.resources.linux.interface import LinuxInterface - ifaces = self.get_connected(LinuxInterface.rtype()) + ifaces = self.get_connected(LinuxInterface.get_rtype()) for iface in ifaces: if iface.state < ResourceState.READY: self.ec.schedule(reschedule_delay, self.deploy) return - super(LinuxNode, self).deploy() + super(LinuxNode, self).do_deploy() - def release(self): - try: - rms = self.get_connected() - for rm in rms: - # Node needs to wait until all associated RMs are released - # before it can be released - if rm.state < ResourceState.STOPPED: - self.ec.schedule(reschedule_delay, self.release) - return - - tear_down = self.get("tearDown") - if tear_down: - self.execute(tear_down) + def do_release(self): + rms = self.get_connected() + for rm in rms: + # Node needs to wait until all associated RMs are released + # before it can be released + if rm.state != ResourceState.RELEASED: + self.ec.schedule(reschedule_delay, self.release) + return - self.clean_processes() - except: - import traceback - err = traceback.format_exc() - self.error(err) + tear_down = self.get("tearDown") + if tear_down: + self.execute(tear_down) + + self.clean_processes() - super(LinuxNode, self).release() + super(LinuxNode, self).do_release() def valid_connection(self, guid): # TODO: Validate! return True - def clean_processes(self, killer = False): + def clean_processes(self): self.info("Cleaning up processes") - - if killer: - # Hardcore kill - cmd = ("sudo -S killall python tcpdump || /bin/true ; " + - "sudo -S killall python tcpdump || /bin/true ; " + - "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " + - "sudo -S killall -u root || /bin/true ; " + - "sudo -S killall -u root || /bin/true ; ") - else: - # Be gentler... + + if self.get("username") != 'root': cmd = ("sudo -S killall tcpdump || /bin/true ; " + - "sudo -S killall tcpdump || /bin/true ; " + - "sudo -S killall -u %s || /bin/true ; " % self.get("username") + + "sudo -S kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " + "sudo -S killall -u %s || /bin/true ; " % self.get("username")) + else: + if self.state >= ResourceState.READY: + import pickle + pids = pickle.load(open("/tmp/save.proc", "rb")) + pids_temp = dict() + ps_aux = "ps aux |awk '{print $2,$11}'" + (out, err), proc = self.execute(ps_aux) + for line in out.strip().split("\n"): + parts = line.strip().split(" ") + pids_temp[parts[0]] = parts[1] + kill_pids = set(pids_temp.items()) - set(pids.items()) + kill_pids = ' '.join(dict(kill_pids).keys()) + + cmd = ("killall tcpdump || /bin/true ; " + + "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " + + "kill %s || /bin/true ; " % kill_pids) + else: + cmd = ("killall tcpdump || /bin/true ; " + + "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ") out = err = "" - (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) - + (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) + def clean_home(self): """ Cleans all NEPI related folders in the Linux host """ @@ -468,7 +471,7 @@ class LinuxNode(ResourceManager): if self.localhost: (out, err), proc = execfuncs.lexec(command, - user = user, + user = self.get("username"), # still problem with localhost sudo = sudo, stdin = stdin, env = env) @@ -480,6 +483,8 @@ class LinuxNode(ResourceManager): host = self.get("hostname"), user = self.get("username"), port = self.get("port"), + gwuser = self.get("gatewayUser"), + gw = self.get("gateway"), agent = True, sudo = sudo, stdin = stdin, @@ -502,6 +507,8 @@ class LinuxNode(ResourceManager): host = self.get("hostname"), user = self.get("username"), port = self.get("port"), + gwuser = self.get("gatewayUser"), + gw = self.get("gateway"), agent = True, sudo = sudo, stdin = stdin, @@ -555,6 +562,8 @@ class LinuxNode(ResourceManager): host = self.get("hostname"), user = self.get("username"), port = self.get("port"), + gwuser = self.get("gatewayUser"), + gw = self.get("gateway"), agent = True, identity = self.get("identity"), server_key = self.get("serverKey"), @@ -573,6 +582,8 @@ class LinuxNode(ResourceManager): host = self.get("hostname"), user = self.get("username"), port = self.get("port"), + gwuser = self.get("gatewayUser"), + gw = self.get("gateway"), agent = True, identity = self.get("identity"), server_key = self.get("serverKey") @@ -590,6 +601,8 @@ class LinuxNode(ResourceManager): host = self.get("hostname"), user = self.get("username"), port = self.get("port"), + gwuser = self.get("gatewayUser"), + gw = self.get("gateway"), agent = True, identity = self.get("identity"), server_key = self.get("serverKey") @@ -612,6 +625,8 @@ class LinuxNode(ResourceManager): host = self.get("hostname"), user = self.get("username"), port = self.get("port"), + gwuser = self.get("gatewayUser"), + gw = self.get("gateway"), agent = True, sudo = sudo, identity = self.get("identity"), @@ -630,6 +645,8 @@ class LinuxNode(ResourceManager): (out, err), proc = sshfuncs.rcopy( src, dst, port = self.get("port"), + gwuser = self.get("gatewayUser"), + gw = self.get("gateway"), identity = self.get("identity"), server_key = self.get("serverKey"), recursive = True, @@ -640,11 +657,17 @@ class LinuxNode(ResourceManager): def upload(self, src, dst, text = False, overwrite = True): """ Copy content to destination - src content to copy. Can be a local file, directory or a list of files + src string with the content to copy. Can be: + - plain text + - a string with the path to a local file + - a string with a colon-separeted list of local files + - a string with a local directory - dst destination path on the remote host (remote is always self.host) + dst string with destination path on the remote host (remote is + always self.host) - text src is text input, it must be stored into a temp file before uploading + text src is text input, it must be stored into a temp file before + uploading """ # If source is a string input f = None @@ -657,11 +680,14 @@ class LinuxNode(ResourceManager): src = f.name # If dst files should not be overwritten, check that the files do not - # exits already + # exits already + if isinstance(src, str): + src = map(str.strip, src.split(";")) + if overwrite == False: src = self.filter_existing_files(src, dst) if not src: - return ("", ""), None + return ("", ""), None if not self.localhost: # Build destination as @: @@ -970,35 +996,25 @@ class LinuxNode(ResourceManager): return True out = err = "" + msg = "Unresponsive host. Wrong answer. " + # The underlying SSH layer will sometimes return an empty # output (even if the command was executed without errors). # To work arround this, repeat the operation N times or # until the result is not empty string - retrydelay = 1.0 - for i in xrange(2): - try: - (out, err), proc = self.execute("echo 'ALIVE'", - retry = 5, - blocking = True, - with_lock = True) - - if out.find("ALIVE") > -1: - return True - except: - trace = traceback.format_exc() - msg = "Unresponsive host. Error reaching host: %s " % trace - self.error(msg, out, err) - return False + try: + (out, err), proc = self.execute("echo 'ALIVE'", + blocking = True, + with_lock = True) + + if out.find("ALIVE") > -1: + return True + except: + trace = traceback.format_exc() + msg = "Unresponsive host. Error reaching host: %s " % trace - time.sleep(min(30.0, retrydelay)) - retrydelay *= 1.5 - - if out.find("ALIVE") > -1: - return True - else: - msg = "Unresponsive host. Wrong answer. " - self.error(msg, out, err) - return False + self.error(msg, out, err) + return False def find_home(self): """ Retrieves host home directory @@ -1007,37 +1023,29 @@ class LinuxNode(ResourceManager): # output (even if the command was executed without errors). # To work arround this, repeat the operation N times or # until the result is not empty string - retrydelay = 1.0 - for i in xrange(2): - try: - (out, err), proc = self.execute("echo ${HOME}", - retry = 5, - blocking = True, - with_lock = True) - - if out.strip() != "": - self._home_dir = out.strip() - break - except: - trace = traceback.format_exc() - msg = "Impossible to retrieve HOME directory" % trace - self.error(msg, out, err) - return False - - time.sleep(min(30.0, retrydelay)) - retrydelay *= 1.5 + msg = "Impossible to retrieve HOME directory" + try: + (out, err), proc = self.execute("echo ${HOME}", + blocking = True, + with_lock = True) + + if out.strip() != "": + self._home_dir = out.strip() + except: + trace = traceback.format_exc() + msg = "Impossible to retrieve HOME directory %s" % trace if not self._home_dir: - msg = "Impossible to retrieve HOME directory" - self.error(msg, out, err) + self.error(msg) raise RuntimeError, msg def filter_existing_files(self, src, dst): """ Removes files that already exist in the Linux host from src list """ # construct a dictionary with { dst: src } - dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ), x ), - src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src}) + dests = dict(map( + lambda s: (os.path.join(dst, os.path.basename(s)), s ), s)) \ + if len(src) > 1 else dict({dst: src[0]}) command = [] for d in dests.keys(): @@ -1052,7 +1060,7 @@ class LinuxNode(ResourceManager): del dests[d] if not dests: - return "" + return [] - return " ".join(dests.values()) + return dests.values()