X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fresources%2Flinux%2Fnode.py;h=edfa0cae43732cc574be16ffe3bdf2b252cfcf43;hb=68adac66099b08e3daae7a84b29af0f7c69ee955;hp=1163f6d91bbd336e0f13bc23dcc80bf708bf2a40;hpb=4801a11c067fb00eeb430e5d866e186f6a37e88f;p=nepi.git diff --git a/src/nepi/resources/linux/node.py b/src/nepi/resources/linux/node.py index 1163f6d9..edfa0cae 100644 --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@ -17,9 +17,9 @@ # # Author: Alina Quereilhac -from nepi.execution.attribute import Attribute, Flags -from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \ - reschedule_delay +from nepi.execution.attribute import Attribute, Flags, Types +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState, reschedule_delay from nepi.resources.linux import rpmfuncs, debfuncs from nepi.util import sshfuncs, execfuncs from nepi.util.sshfuncs import ProcStatus @@ -57,7 +57,7 @@ class OSType: UBUNTU = "ubuntu" DEBIAN = "debian" -@clsinit +@clsinit_copy class LinuxNode(ResourceManager): """ .. class:: Class Args : @@ -149,38 +149,44 @@ class LinuxNode(ResourceManager): @classmethod def _register_attributes(cls): hostname = Attribute("hostname", "Hostname of the machine", - flags = Flags.ExecReadOnly) + flags = Flags.Design) username = Attribute("username", "Local account username", flags = Flags.Credential) - port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly) + port = Attribute("port", "SSH port", flags = Flags.Design) home = Attribute("home", "Experiment home directory to store all experiment related files", - flags = Flags.ExecReadOnly) + flags = Flags.Design) identity = Attribute("identity", "SSH identity file", flags = Flags.Credential) server_key = Attribute("serverKey", "Server public key", - flags = Flags.ExecReadOnly) + flags = Flags.Design) clean_home = Attribute("cleanHome", "Remove all nepi files and directories " " from node home folder before starting experiment", - flags = Flags.ExecReadOnly) + type = Types.Bool, + default = False, + flags = Flags.Design) clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " " from a previous same experiment, before the new experiment starts", - flags = Flags.ExecReadOnly) + type = Types.Bool, + default = False, + flags = Flags.Design) clean_processes = Attribute("cleanProcesses", "Kill all running processes before starting experiment", - flags = Flags.ExecReadOnly) + type = Types.Bool, + default = False, + flags = Flags.Design) tear_down = Attribute("tearDown", "Bash script to be executed before " + \ "releasing the resource", - flags = Flags.ExecReadOnly) + flags = Flags.Design) cls._register_attribute(hostname) cls._register_attribute(username) @@ -273,6 +279,8 @@ class LinuxNode(ResourceManager): self._os = OSType.FEDORA_12 elif out.find("Fedora release 14") == 0: self._os = OSType.FEDORA_14 + elif out.find("Fedora release") == 0: + self._os = OSType.FEDORA elif out.find("Debian") == 0: self._os = OSType.DEBIAN elif out.find("Ubuntu") ==0: @@ -290,25 +298,16 @@ class LinuxNode(ResourceManager): # To work arround this, repeat the operation N times or # until the result is not empty string out = "" - retrydelay = 1.0 - for i in xrange(2): - try: - (out, err), proc = self.execute("cat /etc/issue", - retry = 5, - with_lock = True, - blocking = True) - - if out.strip() != "": - return out - except: - trace = traceback.format_exc() - msg = "Error detecting OS: %s " % trace - self.error(msg, out, err) - return False - - time.sleep(min(30.0, retrydelay)) - retrydelay *= 1.5 - + try: + (out, err), proc = self.execute("cat /etc/issue", + with_lock = True, + blocking = True) + except: + trace = traceback.format_exc() + msg = "Error detecting OS: %s " % trace + self.error(msg, out, err) + + return out @property def use_deb(self): @@ -323,11 +322,9 @@ class LinuxNode(ResourceManager): def localhost(self): return self.get("hostname") in ['localhost', '127.0.0.7', '::1'] - def provision(self): + def do_provision(self): # check if host is alive if not self.is_alive(): - self.fail() - msg = "Deploy failed. Unresponsive node %s" % self.get("hostname") self.error(msg) raise RuntimeError, msg @@ -352,34 +349,31 @@ class LinuxNode(ResourceManager): # Create experiment node home directory self.mkdir(self.node_home) - super(LinuxNode, self).provision() + super(LinuxNode, self).do_provision() - def deploy(self): + def do_deploy(self): if self.state == ResourceState.NEW: - try: - self.discover() - self.provision() - except: - self.fail() - raise + self.info("Deploying node") + self.do_discover() + self.do_provision() # Node needs to wait until all associated interfaces are # ready before it can finalize deployment from nepi.resources.linux.interface import LinuxInterface - ifaces = self.get_connected(LinuxInterface.rtype()) + ifaces = self.get_connected(LinuxInterface.get_rtype()) for iface in ifaces: if iface.state < ResourceState.READY: self.ec.schedule(reschedule_delay, self.deploy) return - super(LinuxNode, self).deploy() + super(LinuxNode, self).do_deploy() - def release(self): - # Node needs to wait until all associated RMs are released - # to be released + def do_release(self): rms = self.get_connected() for rm in rms: - if rm.state < ResourceState.STOPPED: + # Node needs to wait until all associated RMs are released + # before it can be released + if rm.state != ResourceState.RELEASED: self.ec.schedule(reschedule_delay, self.release) return @@ -389,32 +383,22 @@ class LinuxNode(ResourceManager): self.clean_processes() - super(LinuxNode, self).release() + super(LinuxNode, self).do_release() def valid_connection(self, guid): # TODO: Validate! return True - def clean_processes(self, killer = False): + def clean_processes(self): self.info("Cleaning up processes") - if killer: - # Hardcore kill - cmd = ("sudo -S killall python tcpdump || /bin/true ; " + - "sudo -S killall python tcpdump || /bin/true ; " + - "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " + - "sudo -S killall -u root || /bin/true ; " + - "sudo -S killall -u root || /bin/true ; ") - else: - # Be gentler... - cmd = ("sudo -S killall tcpdump || /bin/true ; " + - "sudo -S killall tcpdump || /bin/true ; " + - "sudo -S killall -u %s || /bin/true ; " % self.get("username") + - "sudo -S killall -u %s || /bin/true ; " % self.get("username")) + cmd = ("sudo -S killall tcpdump || /bin/true ; " + + "sudo -S kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " + + "sudo -S killall -u %s || /bin/true ; " % self.get("username")) out = err = "" - (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) - + (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) + def clean_home(self): """ Cleans all NEPI related folders in the Linux host """ @@ -438,15 +422,44 @@ class LinuxNode(ResourceManager): return self.execute(cmd, with_lock = True) + def socat(self, local_socket_name, + remote_socket_name, + sudo = False, + identity = None, + server_key = None, + env = None, + tty = False, + connect_timeout = 30, + retry = 3, + strict_host_checking = True): + """ Connectes a local and a remote UNIX socket through SSH using socat """ + + if self.localhost: + return (None, None), None + else: + return sshfuncs.socat( + local_socket_name, + remote_socket_name, + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + agent = True, + sudo = sudo, + identity = self.get("identity"), + server_key = self.get("serverKey"), + env = env, + tty = tty, + retry = retry, + connect_timeout = connect_timeout, + strict_host_checking = strict_host_checking + ) + def execute(self, command, sudo = False, - stdin = None, env = None, tty = False, forward_x11 = False, - timeout = None, retry = 3, - err_on_timeout = True, connect_timeout = 30, strict_host_checking = False, persistent = True, @@ -459,12 +472,15 @@ class LinuxNode(ResourceManager): if self.localhost: (out, err), proc = execfuncs.lexec(command, - user = user, + user = self.get("username"), # still problem with localhost sudo = sudo, - stdin = stdin, env = env) else: if with_lock: + # If the execute command is blocking, we don't want to keep + # the node lock. This lock is used to avoid race conditions + # when creating the ControlMaster sockets. A more elegant + # solution is needed. with self._node_lock: (out, err), proc = sshfuncs.rexec( command, @@ -473,15 +489,12 @@ class LinuxNode(ResourceManager): port = self.get("port"), agent = True, sudo = sudo, - stdin = stdin, identity = self.get("identity"), server_key = self.get("serverKey"), env = env, tty = tty, forward_x11 = forward_x11, - timeout = timeout, retry = retry, - err_on_timeout = err_on_timeout, connect_timeout = connect_timeout, persistent = persistent, blocking = blocking, @@ -495,15 +508,12 @@ class LinuxNode(ResourceManager): port = self.get("port"), agent = True, sudo = sudo, - stdin = stdin, identity = self.get("identity"), server_key = self.get("serverKey"), env = env, tty = tty, forward_x11 = forward_x11, - timeout = timeout, retry = retry, - err_on_timeout = err_on_timeout, connect_timeout = connect_timeout, persistent = persistent, blocking = blocking, @@ -617,18 +627,16 @@ class LinuxNode(ResourceManager): recursive = True, strict_host_checking = False) else: - with self._node_lock: - (out, err), proc = sshfuncs.rcopy( - src, dst, - port = self.get("port"), - identity = self.get("identity"), - server_key = self.get("serverKey"), - recursive = True, - strict_host_checking = False) + (out, err), proc = sshfuncs.rcopy( + src, dst, + port = self.get("port"), + identity = self.get("identity"), + server_key = self.get("serverKey"), + recursive = True, + strict_host_checking = False) return (out, err), proc - def upload(self, src, dst, text = False, overwrite = True): """ Copy content to destination @@ -962,35 +970,25 @@ class LinuxNode(ResourceManager): return True out = err = "" + msg = "Unresponsive host. Wrong answer. " + # The underlying SSH layer will sometimes return an empty # output (even if the command was executed without errors). # To work arround this, repeat the operation N times or # until the result is not empty string - retrydelay = 1.0 - for i in xrange(2): - try: - (out, err), proc = self.execute("echo 'ALIVE'", - retry = 5, - blocking = True, - with_lock = True) - - if out.find("ALIVE") > -1: - return True - except: - trace = traceback.format_exc() - msg = "Unresponsive host. Error reaching host: %s " % trace - self.error(msg, out, err) - return False + try: + (out, err), proc = self.execute("echo 'ALIVE'", + blocking = True, + with_lock = True) + + if out.find("ALIVE") > -1: + return True + except: + trace = traceback.format_exc() + msg = "Unresponsive host. Error reaching host: %s " % trace - time.sleep(min(30.0, retrydelay)) - retrydelay *= 1.5 - - if out.find("ALIVE") > -1: - return True - else: - msg = "Unresponsive host. Wrong answer. " - self.error(msg, out, err) - return False + self.error(msg, out, err) + return False def find_home(self): """ Retrieves host home directory @@ -999,29 +997,20 @@ class LinuxNode(ResourceManager): # output (even if the command was executed without errors). # To work arround this, repeat the operation N times or # until the result is not empty string - retrydelay = 1.0 - for i in xrange(2): - try: - (out, err), proc = self.execute("echo ${HOME}", - retry = 5, - blocking = True, - with_lock = True) - - if out.strip() != "": - self._home_dir = out.strip() - break - except: - trace = traceback.format_exc() - msg = "Impossible to retrieve HOME directory" % trace - self.error(msg, out, err) - return False - - time.sleep(min(30.0, retrydelay)) - retrydelay *= 1.5 + msg = "Impossible to retrieve HOME directory" + try: + (out, err), proc = self.execute("echo ${HOME}", + blocking = True, + with_lock = True) + + if out.strip() != "": + self._home_dir = out.strip() + except: + trace = traceback.format_exc() + msg = "Impossible to retrieve HOME directory %s" % trace if not self._home_dir: - msg = "Impossible to retrieve HOME directory" - self.error(msg, out, err) + self.error(msg) raise RuntimeError, msg def filter_existing_files(self, src, dst):