X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;ds=sidebyside;f=src%2Fneco%2Fresources%2Flinux%2Fnode.py;h=a030eb437706651ffbe40ddcf9628d60e915816c;hb=7c534b4f1a01e6933602c306b82853da0d5840ef;hp=b482747b30e044d09e53fb8c0dec6bbfe32614de;hpb=17ff6f11fb876a99f99988c0b8baf7e2b4b549fe;p=nepi.git diff --git a/src/neco/resources/linux/node.py b/src/neco/resources/linux/node.py index b482747b..a030eb43 100644 --- a/src/neco/resources/linux/node.py +++ b/src/neco/resources/linux/node.py @@ -13,6 +13,9 @@ import time import threading # TODO: Verify files and dirs exists already +# TODO: Blacklist node! + +DELAY ="1s" @clsinit class LinuxNode(ResourceManager): @@ -20,32 +23,35 @@ class LinuxNode(ResourceManager): @classmethod def _register_attributes(cls): - hostname = Attribute("hostname", "Hostname of the machine") + hostname = Attribute("hostname", "Hostname of the machine", + flags = Flags.ExecReadOnly) username = Attribute("username", "Local account username", flags = Flags.Credential) - port = Attribute("port", "SSH port", flags = Flags.Credential) + port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly) - home = Attribute("home", - "Experiment home directory to store all experiment related files") + home = Attribute("home", + "Experiment home directory to store all experiment related files", + flags = Flags.ExecReadOnly) identity = Attribute("identity", "SSH identity file", flags = Flags.Credential) server_key = Attribute("serverKey", "Server public key", - flags = Flags.Credential) + flags = Flags.ExecReadOnly) clean_home = Attribute("cleanHome", "Remove all files and directories " + \ " from home folder before starting experiment", - flags = Flags.ReadOnly) + flags = Flags.ExecReadOnly) clean_processes = Attribute("cleanProcesses", - "Kill all running processes before starting experiment", - flags = Flags.ReadOnly) + "Kill all running processes before starting experiment", + flags = Flags.ExecReadOnly) tear_down = Attribute("tearDown", "Bash script to be executed before " + \ - "releasing the resource", flags = Flags.ReadOnly) + "releasing the resource", + flags = Flags.ExecReadOnly) cls._register_attribute(hostname) cls._register_attribute(username) @@ -60,7 +66,6 @@ class LinuxNode(ResourceManager): def __init__(self, ec, guid): super(LinuxNode, self).__init__(ec, guid) self._os = None - self._home = "nepi-exp-%s" % os.urandom(8).encode('hex') # lock to avoid concurrency issues on methods used by applications self._lock = threading.Lock() @@ -69,10 +74,17 @@ class LinuxNode(ResourceManager): @property def home(self): - home = self.get("home") - if home and not home.startswith("nepi-"): - home = "nepi-" + home - return home or self._home + return self.get("home") or "/tmp" + + @property + def exp_dir(self): + exp_dir = os.path.join(self.home, self.ec.exp_id) + return exp_dir if exp_dir.startswith('/') else "${HOME}/" + + @property + def node_dir(self): + node_dir = "node-%d" % self.guid + return os.path.join(self.exp_dir, node_dir) @property def os(self): @@ -116,17 +128,29 @@ class LinuxNode(ResourceManager): self.logger.error("Deploy failed. Unresponsive node") return - def deploy(self): - self.provision() - if self.get("cleanProcesses"): self.clean_processes() if self.get("cleanHome"): - # self.clean_home() -> this is dangerous - pass + self.clean_home() + + self.mkdir(self.node_dir) - self.mkdir(self.home) + super(LinuxNode, self).provision() + + def deploy(self): + if self.state == ResourceState.NEW: + self.discover() + self.provision() + + # Node needs to wait until all associated interfaces are + # ready before it can finalize deployment + from neco.resources.linux.interface import LinuxInterface + ifaces = self.get_connected(LinuxInterface.rtype()) + for iface in ifaces: + if iface.state < ResourceState.READY: + self.ec.schedule(DELAY, self.deploy) + return super(LinuxNode, self).deploy() @@ -137,7 +161,7 @@ class LinuxNode(ResourceManager): super(LinuxNode, self).release() - def validate_connection(self, guid): + def valid_connection(self, guid): # TODO: Validate! return True @@ -152,38 +176,31 @@ class LinuxNode(ResourceManager): out = err = "" with self._lock: - (out, err), proc = self.run_and_wait(cmd, self.home, - pidfile = "cppid", - stdout = "cplog", - stderr = "cperr", - raise_on_error = True) - - return (out, err) + (out, err), proc = self.execute(cmd) def clean_home(self): self.logger.info("Cleaning up home") - cmd = "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \) -execdir rm -rf {} + " + cmd = ("cd %s ; " % self.home + + "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)"+ + " -execdir rm -rf {} + ") out = err = "" with self._lock: - (out, err), proc = self.run_and_wait(cmd, self.home, - pidfile = "chpid", - stdout = "chlog", - stderr = "cherr", - raise_on_error = True) - - return (out, err) + (out, err), proc = self.execute(cmd) - def upload(self, src, dst): + def upload(self, src, dst, text = False): """ Copy content to destination - src content to copy. Can be a local file, directory or text input + src content to copy. Can be a local file, directory or a list of files dst destination path on the remote host (remote is always self.host) + + text src is text input, it must be stored into a temp file before uploading """ # If source is a string input - if not os.path.isfile(src): + f = None + if text and not os.path.isfile(src): # src is text input that should be uploaded as file # create a temporal file with the content to upload f = tempfile.NamedTemporaryFile(delete=False) @@ -195,7 +212,13 @@ class LinuxNode(ResourceManager): # Build destination as @: dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst) - return self.copy(src, dst) + result = self.copy(src, dst) + + # clean up temp file + if f: + os.remove(f.name) + + return result def download(self, src, dst): if not self.localhost: @@ -203,7 +226,9 @@ class LinuxNode(ResourceManager): src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src) return self.copy(src, dst) - def install_packages(self, packages): + def install_packages(self, packages, home = None): + home = home or self.node_dir + cmd = "" if self.os in ["f12", "f14"]: cmd = rpmfuncs.install_packages_command(self.os, packages) @@ -217,15 +242,17 @@ class LinuxNode(ResourceManager): out = err = "" with self._lock: - (out, err), proc = self.run_and_wait(cmd, self.home, - pidfile = "instpkgpid", - stdout = "instpkglog", - stderr = "instpkgerr", + (out, err), proc = self.run_and_wait(cmd, home, + pidfile = "instpkg_pid", + stdout = "instpkg_log", + stderr = "instpkg_err", raise_on_error = True) return (out, err), proc - def remove_packages(self, packages): + def remove_packages(self, packages, home = None): + home = home or self.node_dir + cmd = "" if self.os in ["f12", "f14"]: cmd = rpmfuncs.remove_packages_command(self.os, packages) @@ -239,10 +266,10 @@ class LinuxNode(ResourceManager): out = err = "" with self._lock: - (out, err), proc = self.run_and_wait(cmd, self.home, - pidfile = "rmpkgpid", - stdout = "rmpkglog", - stderr = "rmpkgerr", + (out, err), proc = self.run_and_wait(cmd, home, + pidfile = "rmpkg_pid", + stdout = "rmpkg_log", + stderr = "rmpkg_err", raise_on_error = True) return (out, err), proc @@ -264,7 +291,13 @@ class LinuxNode(ResourceManager): stderr = 'stderr', sudo = False, raise_on_error = False): - + """ runs a command in background on the remote host, but waits + until the command finishes execution. + This is more robust than doing a simple synchronized 'execute', + since in the remote host the command can continue to run detached + even if network disconnections occur + """ + # run command in background in remote host (out, err), proc = self.run(command, home, pidfile = pidfile, stdin = stdin, @@ -272,21 +305,25 @@ class LinuxNode(ResourceManager): stderr = stderr, sudo = sudo) + # check no errors occurred if proc.poll() and err: msg = " Failed to run command %s on host %s" % ( command, self.get("hostname")) self.logger.error(msg) if raise_on_error: raise RuntimeError, msg - + + # Wait for pid file to be generated pid, ppid = self.wait_pid( home = home, pidfile = pidfile, raise_on_error = raise_on_error) + # wait until command finishes to execute self.wait_run(pid, ppid) - - (out, err), proc = self.check_run_error(home, stderr) + + # check if execution errors occurred + (out, err), proc = self.check_output(home, stderr) if err or out: msg = "Error while running command %s on host %s. error output: %s" % ( @@ -301,6 +338,8 @@ class LinuxNode(ResourceManager): return (out, err), proc def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False): + """ Waits until the pid file for the command is generated, + and returns the pid and ppid of the process """ pid = ppid = None delay = 1.0 for i in xrange(5): @@ -322,6 +361,7 @@ class LinuxNode(ResourceManager): return pid, ppid def wait_run(self, pid, ppid, trial = 0): + """ wait for a remote process to finish execution """ delay = 1.0 first = True bustspin = 0 @@ -344,17 +384,12 @@ class LinuxNode(ResourceManager): delay = min(30,delay*1.2) bustspin = 0 - def check_run_error(self, home, stderr = 'stderr'): + def check_output(self, home, filename): + """ checks file content """ (out, err), proc = self.execute("cat %s" % - os.path.join(home, stderr)) + os.path.join(home, filename)) return (out, err), proc - def check_run_output(self, home, stdout = 'stdout'): - (out, err), proc = self.execute("cat %s" % - os.path.join(home, stdout)) - return (out, err), proc - - def is_alive(self): if self.localhost: return True @@ -575,7 +610,7 @@ class LinuxNode(ResourceManager): self.logger.error("%s. out: %s error: %s", fail_msg, out, err) break except RuntimeError, e: - if x >= 3: + if i >= 3: self.logger.error("%s. error: %s", fail_msg, e.args) return (out, err), proc