From 7c534b4f1a01e6933602c306b82853da0d5840ef Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Sun, 5 May 2013 20:49:13 +0200 Subject: [PATCH] Added Linux Application --- src/neco/execution/attribute.py | 28 +- src/neco/execution/ec.py | 53 +++- src/neco/execution/resource.py | 89 +++++- src/neco/execution/trace.py | 20 ++ src/neco/resources/linux/application.py | 367 ++++++++++++++++++++---- src/neco/resources/linux/channel.py | 24 ++ src/neco/resources/linux/debfuncs.py | 26 ++ src/neco/resources/linux/node.py | 161 +++++++---- src/neco/resources/linux/rpmfuncs.py | 42 +++ src/neco/util/execfuncs.py | 221 ++++++++++++++ test/execution/resource.py | 16 +- test/resources/linux/application.py | 79 +++++ test/resources/linux/interface.py | 67 +++++ test/resources/linux/node.py | 114 +++----- test/resources/linux/test_utils.py | 47 +++ 15 files changed, 1138 insertions(+), 216 deletions(-) create mode 100644 src/neco/execution/trace.py create mode 100644 src/neco/resources/linux/channel.py create mode 100644 src/neco/resources/linux/debfuncs.py create mode 100644 src/neco/resources/linux/rpmfuncs.py create mode 100644 src/neco/util/execfuncs.py create mode 100644 test/resources/linux/application.py create mode 100644 test/resources/linux/interface.py create mode 100644 test/resources/linux/test_utils.py diff --git a/src/neco/execution/attribute.py b/src/neco/execution/attribute.py index 48d7848e..1282dfec 100644 --- a/src/neco/execution/attribute.py +++ b/src/neco/execution/attribute.py @@ -13,17 +13,24 @@ class Flags: NoFlags = 0x00 # Attribute is not modifiable by the user ReadOnly = 0x01 + # Attribute is not modifiable by the user during runtime + ExecReadOnly = 0x02 # Attribute is an access credential - Credential = 0x02 + Credential = 0x04 class Attribute(object): def __init__(self, name, help, type = Types.String, - flags = Flags.NoFlags, default = None): + flags = Flags.NoFlags, default = None, allowed = None, + set_hook = None): self._name = name self._help = help self._type = type self._flags = flags + self._allowed = allowed self._default = self._value = default + # callback to be invoked upon changing the + # attribute value + self.set_hook = set_hook @property def name(self): @@ -45,6 +52,10 @@ class Attribute(object): def flags(self): return self._flags + @property + def allowed(self): + return self._allowed + def has_flag(self, flag): return (self._flags & flag) == flag @@ -52,7 +63,18 @@ class Attribute(object): return self._value def set_value(self, value): - if self.is_valid_value(value): + valid = True + + if self.type == Types.Enum: + valid = value in self._allowed + + valid = valid and self.is_valid_value(value) + + if valid: + if self.set_hook: + # Hook receives old value, new value + value = self.set_hook(self._value, value) + self._value = value else: raise ValueError("Invalid value %s for attribute %s" % diff --git a/src/neco/execution/ec.py b/src/neco/execution/ec.py index 1e55cc04..07a4ee6d 100644 --- a/src/neco/execution/ec.py +++ b/src/neco/execution/ec.py @@ -5,20 +5,24 @@ import time import threading from neco.util import guid +from neco.util.parallel import ParallelRun from neco.util.timefuncs import strfnow, strfdiff, strfvalid from neco.execution.resource import ResourceFactory, ResourceAction, \ ResourceState from neco.execution.scheduler import HeapScheduler, Task, TaskStatus -from neco.util.parallel import ParallelRun +from neco.execution.trace import TraceAttr # TODO: use multiprocessing instead of threading class ExperimentController(object): - def __init__(self, root_dir = "/tmp"): + def __init__(self, exp_id = None, root_dir = "/tmp"): super(ExperimentController, self).__init__() # root directory to store files self._root_dir = root_dir + # experiment identifier given by the user + self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex') + # generator of globally unique ids self._guid_generator = guid.GuidGenerator() @@ -47,6 +51,12 @@ class ExperimentController(object): def logger(self): return self._logger + @property + def exp_id(self): + exp_id = self._exp_id + if not exp_id.startswith("nepi-"): + exp_id = "nepi-" + exp_id + return exp_id def get_task(self, tid): return self._tasks.get(tid) @@ -125,6 +135,39 @@ class ExperimentController(object): rm = self.get_resource(guid1) rm.register_condition(action, group2, state, time) + def register_trace(self, guid, name): + """ Enable trace + + :param name: Name of the trace + :type name: str + """ + rm = self.get_resource(guid) + rm.register_trace(name) + + def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0): + """ Get information on collected trace + + :param name: Name of the trace + :type name: str + + :param attr: Can be one of: + - TraceAttr.ALL (complete trace content), + - TraceAttr.STREAM (block in bytes to read starting at offset), + - TraceAttr.PATH (full path to the trace file), + - TraceAttr.SIZE (size of trace file). + :type attr: str + + :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM + :type name: int + + :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM + :type name: int + + :rtype: str + """ + rm = self.get_resource(guid) + return rm.trace(name, attr, block, offset) + def discover(self, guid, filters): rm = self.get_resource(guid) return rm.discover(filters) @@ -198,13 +241,13 @@ class ExperimentController(object): rm = self.get_resource(guid) return rm.start_with_condition() - def deploy(self, group = None, wait_all_deployed = True): + def deploy(self, group = None, wait_all_ready = True): """ Deploy all resource manager in group :param group: List of guids of RMs to deploy :type group: list - :param wait_all_deployed: Wait until all RMs are deployed in + :param wait_all_ready: Wait until all RMs are ready in order to start the RMs :type guid: int @@ -227,7 +270,7 @@ class ExperimentController(object): for guid in group: rm = self.get_resource(guid) - if wait_all_deployed: + if wait_all_ready: towait = list(group) towait.remove(guid) self.register_condition(guid, ResourceAction.START, diff --git a/src/neco/execution/resource.py b/src/neco/execution/resource.py index f7010b0b..7669804d 100644 --- a/src/neco/execution/resource.py +++ b/src/neco/execution/resource.py @@ -1,4 +1,5 @@ from neco.util.timefuncs import strfnow, strfdiff, strfvalid +from neco.execution.trace import TraceAttr import copy import functools @@ -19,8 +20,9 @@ class ResourceState: READY = 3 STARTED = 4 STOPPED = 5 - FAILED = 6 - RELEASED = 7 + FINISHED = 6 + FAILED = 7 + RELEASED = 8 def clsinit(cls): cls._clsinit() @@ -32,6 +34,7 @@ class ResourceManager(object): _rtype = "Resource" _filters = None _attributes = None + _traces = None @classmethod def _register_filter(cls, attr): @@ -50,17 +53,34 @@ class ResourceManager(object): cls._attributes[attr.name] = attr @classmethod - def _register_filters(cls): + def _register_trace(cls, trace): """ Resource subclasses will invoke this method to add a - filter attribute + resource trace + + """ + cls._traces[trace.name] = trace + + + @classmethod + def _register_filters(cls): + """ Resource subclasses will invoke this method to register + resource filters """ pass @classmethod def _register_attributes(cls): - """ Resource subclasses will invoke this method to add a - resource attribute + """ Resource subclasses will invoke this method to register + resource attributes + + """ + pass + + @classmethod + def _register_traces(cls): + """ Resource subclasses will invoke this method to register + resource traces """ pass @@ -81,6 +101,10 @@ class ResourceManager(object): cls._attributes = dict() cls._register_attributes() + # static template for resource traces + cls._traces = dict() + cls._register_traces() + @classmethod def rtype(cls): return cls._rtype @@ -99,6 +123,13 @@ class ResourceManager(object): """ return copy.deepcopy(cls._attributes.values()) + @classmethod + def get_traces(cls): + """ Returns a copy of the traces + + """ + return copy.deepcopy(cls._traces.values()) + def __init__(self, ec, guid): self._guid = guid self._ec = weakref.ref(ec) @@ -106,9 +137,11 @@ class ResourceManager(object): self._conditions = dict() # the resource instance gets a copy of all attributes - # that can modify self._attrs = copy.deepcopy(self._attributes) + # the resource instance gets a copy of all traces + self._trcs = copy.deepcopy(self._traces) + self._state = ResourceState.NEW self._start_time = None @@ -216,7 +249,6 @@ class ResourceManager(object): :type name: str :param name: Value of the attribute :type name: str - :rtype: Boolean """ attr = self._attrs[name] attr.value = value @@ -231,6 +263,38 @@ class ResourceManager(object): attr = self._attrs[name] return attr.value + def register_trace(self, name): + """ Enable trace + + :param name: Name of the trace + :type name: str + """ + trace = self._trcs[name] + trace.enabled = True + + def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0): + """ Get information on collected trace + + :param name: Name of the trace + :type name: str + + :param attr: Can be one of: + - TraceAttr.ALL (complete trace content), + - TraceAttr.STREAM (block in bytes to read starting at offset), + - TraceAttr.PATH (full path to the trace file), + - TraceAttr.SIZE (size of trace file). + :type attr: str + + :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM + :type name: int + + :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM + :type name: int + + :rtype: str + """ + pass + def register_condition(self, action, group, state, time = None): """ Registers a condition on the resource manager to allow execution @@ -259,6 +323,14 @@ class ResourceManager(object): conditions.append((group, state, time)) + def get_connected(self, rtype): + connected = [] + for guid in self.connections: + rm = self.ec.get_resource(guid) + if rm.rtype() == rtype: + connected.append(rm) + return connected + def _needs_reschedule(self, group, state, time): """ Internal method that verify if 'time' has elapsed since all elements in 'group' have reached state 'state'. @@ -361,6 +433,7 @@ class ResourceManager(object): # only can start when RM is either STOPPED or READY if self.state not in [ResourceState.STOPPED, ResourceState.READY]: reschedule = True + self.logger.debug("---- RESCHEDULING START ---- state %s " % self.state ) else: self.logger.debug("---- START CONDITIONS ---- %s" % self.conditions.get(ResourceAction.START)) diff --git a/src/neco/execution/trace.py b/src/neco/execution/trace.py new file mode 100644 index 00000000..382a6bb9 --- /dev/null +++ b/src/neco/execution/trace.py @@ -0,0 +1,20 @@ +class TraceAttr: + ALL = 'all' + STREAM = 'stream' + PATH = 'path' + SIZE = 'size' + +class Trace(object): + def __init__(self, name, help): + self._name = name + self._help = help + self.enabled = False + + @property + def name(self): + return self._name + + @property + def help(self): + return self._help + diff --git a/src/neco/resources/linux/application.py b/src/neco/resources/linux/application.py index befb22d2..321c696c 100644 --- a/src/neco/resources/linux/application.py +++ b/src/neco/resources/linux/application.py @@ -1,8 +1,15 @@ -from neco.execution.attribute import Attribute, Flags +from neco.execution.attribute import Attribute, Flags, Types +from neco.execution.trace import Trace, TraceAttr from neco.execution.resource import ResourceManager, clsinit, ResourceState -from neco.resources.linux.ssh_api import SSHApiFactory +from neco.resources.linux.node import LinuxNode +from neco.util import sshfuncs import logging +import os + +DELAY ="1s" + +# TODO: Resolve wildcards in commands!! @clsinit class LinuxApplication(ResourceManager): @@ -11,20 +18,24 @@ class LinuxApplication(ResourceManager): @classmethod def _register_attributes(cls): command = Attribute("command", "Command to execute", - flags = Flags.ReadOnly) + flags = Flags.ExecReadOnly) forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections", - flags = Flags.ReadOnly) + flags = Flags.ExecReadOnly) env = Attribute("env", "Environment variables string for command execution", - flags = Flags.ReadOnly) + flags = Flags.ExecReadOnly) sudo = Attribute("sudo", "Run with root privileges", - flags = Flags.ReadOnly) + flags = Flags.ExecReadOnly) depends = Attribute("depends", "Space-separated list of packages required to run the application", - flags = Flags.ReadOnly) + flags = Flags.ExecReadOnly) sources = Attribute("sources", "Space-separated list of regular files to be deployed in the working " "path prior to building. Archives won't be expanded automatically.", - flags = Flags.ReadOnly) + flags = Flags.ExecReadOnly) + code = Attribute("code", + "Plain text source code to be uploaded to the server. It will be stored " + "under ${SOURCES}/code", + flags = Flags.ExecReadOnly) build = Attribute("build", "Build commands to execute after deploying the sources. " "Sources will be in the ${SOURCES} folder. " @@ -48,12 +59,18 @@ class LinuxApplication(ResourceManager): "make and other tools to install, be sure to provide them as " "actual dependencies instead.", flags = Flags.ReadOnly) - stdin = Attribute("stdin", "Standard input", flags = Flags.ReadOnly) - stdout = Attribute("stdout", "Standard output", flags = Flags.ReadOnly) - stderr = Attribute("stderr", "Standard error", flags = Flags.ReadOnly) + stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly) + stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly) + stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly) + update_home = Attribute("updateHome", "If application hash has changed remove old directory and" + "re-upload before starting experiment. If not keep the same directory", + default = True, + type = Types.Bool, + flags = Flags.ExecReadOnly) - tear_down = Attribute("tearDown", "Bash script to be executed before - releasing the resource", flags = Flags.ReadOnly) + tear_down = Attribute("tearDown", "Bash script to be executed before " + "releasing the resource", + flags = Flags.ReadOnly) cls._register_attribute(command) cls._register_attribute(forward_x11) @@ -61,29 +78,50 @@ class LinuxApplication(ResourceManager): cls._register_attribute(sudo) cls._register_attribute(depends) cls._register_attribute(sources) + cls._register_attribute(code) cls._register_attribute(build) cls._register_attribute(install) cls._register_attribute(stdin) cls._register_attribute(stdout) cls._register_attribute(stderr) + cls._register_attribute(update_home) cls._register_attribute(tear_down) + @classmethod + def _register_traces(cls): + stdout = Trace("stdout", "Standard output stream") + stderr = Trace("stderr", "Standard error stream") + buildlog = Trace("buildlog", "Output of the build process") + + cls._register_trace(stdout) + cls._register_trace(stderr) + cls._register_trace(buildlog) + def __init__(self, ec, guid): super(LinuxApplication, self).__init__(ec, guid) self._pid = None self._ppid = None - self._home = "app-%s" % self.box.guid - self._node = None + self._home = "app-%s" % self.guid self._logger = logging.getLogger("neco.linux.Application.%d" % guid) @property def node(self): - self._node + node = self.get_connected(LinuxNode.rtype()) + if node: return node[0] + return None @property def home(self): - return self._home # + node home + return os.path.join(self.node.exp_dir, self._home) + + @property + def src_dir(self): + return os.path.join(self.home, 'src') + + @property + def build_dir(self): + return os.path.join(self.home, 'build') @property def pid(self): @@ -93,62 +131,254 @@ class LinuxApplication(ResourceManager): def ppid(self): return self._ppid + def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0): + path = os.path.join(self.home, name) + + cmd = "(test -f %s && echo 'success') || echo 'error'" % path + (out, err), proc = self.node.execute(cmd) + + if (err and proc.poll()) or out.find("error") != -1: + err_msg = " Couldn't find trace %s on host %s. Error: %s" % ( + name, self.node.get("hostname"), err) + self.logger.error(err_msg) + return None + + if attr == TraceAttr.PATH: + return path + + if attr == TraceAttr.ALL: + (out, err), proc = self.node.check_output(self.home, name) + + if err and proc.poll(): + err_msg = " Couldn't read trace %s on host %s. Error: %s" % ( + name, self.node.get("hostname"), err) + self.logger.error(err_msg) + return None + + return out + + if attr == TraceAttr.STREAM: + cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset) + elif attr == TraceAttr.SIZE: + cmd = "stat -c%%s %s " % path + + (out, err), proc = self.node.execute(cmd) + + if err and proc.poll(): + err_msg = " Couldn't find trace %s on host %s. Error: %s" % ( + name, self.node.get("hostname"), err) + self.logger.error(err_msg) + return None + + if attr == TraceAttr.SIZE: + out = int(out.strip()) + + return out + def provision(self, filters = None): - # verify home hash or clean home + # TODO: verify home hash or clean home + + # create home dir for application + self.node.mkdir(self.home) + # upload sources - # build - # Install stuff!! - # upload app command - pass + self.upload_sources() + + # upload code + self.upload_code() + + # install dependencies + self.install_dependencies() + + # build + self.build() + + # Install + self.install() + + super(LinuxApplication, self).provision() + + def upload_sources(self): + # check if sources need to be uploaded and upload them + sources = self.get("sources") + if sources: + self.logger.debug(" Uploading sources %s" % sources) + + # create dir for sources + self.node.mkdir(self.src_dir) + + sources = self.sources.split(' ') + + http_sources = list() + for source in list(sources): + if source.startswith("http") or source.startswith("https"): + http_sources.append(source) + sources.remove(source) + + # Download http sources + for source in http_sources: + dst = os.path.join(self.src_dir, source.split("/")[-1]) + command = "wget -o %s %s" % (dst, source) + self.node.execute(command) + + self.node.upload(sources, self.src_dir) + + def upload_code(self): + code = self.get("code") + if code: + # create dir for sources + self.node.mkdir(self.src_dir) + + self.logger.debug(" Uploading code '%s'" % code) + + dst = os.path.join(self.src_dir, "code") + self.node.upload(sources, dst, text = True) + + def install_dependencies(self): + depends = self.get("depends") + if depends: + self.logger.debug(" Installing dependencies %s" % depends) + self.node.install_packages(depends, home = self.home) + + def build(self): + build = self.get("build") + if build: + self.logger.debug(" Building sources '%s'" % build) + + # create dir for build + self.node.mkdir(self.build_dir) + + cmd = self.replace_paths(build) + + (out, err), proc = self.run_and_wait(cmd, self.home, + pidfile = "build_pid", + stdout = "build_log", + stderr = "build_err", + raise_on_error = True) + + def install(self): + install = self.get("install") + if install: + self.logger.debug(" Installing sources '%s'" % install) + + cmd = self.replace_paths(install) + + (out, err), proc = self.run_and_wait(cmd, self.home, + pidfile = "install_pid", + stdout = "install_log", + stderr = "install_err", + raise_on_error = True) def deploy(self): # Wait until node is associated and deployed - self.provision() - pass + node = self.node + if not node or node.state < ResourceState.READY: + self.ec.schedule(DELAY, self.deploy) + else: + self.discover() + self.provision() + + super(LinuxApplication, self).deploy() def start(self): - dst = os.path.join(self.home, "app.sh") - - # Create shell script with the command - # This way, complex commands and scripts can be ran seamlessly - # sync files - cmd = "" + command = self.replace_paths(self.get("command")) env = self.get("env") - if env: - for envkey, envvals in env.iteritems(): - for envval in envvals: - cmd += 'export %s=%s\n' % (envkey, envval) + stdin = 'stdin' if self.get("stdin") else None + sudo = self.get('sudo') or False + x11 = self.get("forwardX11") or False + err_msg = "Failed to run command %s on host %s" % ( + command, self.node.get("hostname")) + failed = False - cmd += self.get("command") - self.api.upload(cmd, dst) + super(LinuxApplication, self).start() - command = 'bash ./app.sh' - stdin = 'stdin' if self.get("stdin") else None - self.api.run(command, self.home, stdin = stdin) - self._pid, self._ppid = self.api.checkpid(self.app_home) + if x11: + (out, err), proc = self.node.execute(command, + sudo = sudo, + stdin = stdin, + stdout = 'stdout', + stderr = 'stderr', + env = env, + forward_x11 = x11) + + if proc.poll() and err: + failed = True + else: + (out, err), proc = self.node.run(command, self.home, + stdin = stdin, + sudo = sudo) + + if proc.poll() and err: + failed = True + + if not failed: + pid, ppid = self.node.wait_pid(home = self.home) + if pid: self._pid = int(pid) + if ppid: self._ppid = int(ppid) + + if not self.pid or not self.ppid: + failed = True + + (out, chkerr), proc = self.node.check_output(self.home, 'stderr') + + if failed or out or chkerr: + # check if execution errors occurred + if err: + err_msg = "%s. Proc error: %s" % (err_msg, err) + + err_msg = "%s. Run error: %s " % (err_msg, out) + + if chkerr: + err_msg = "%s. Failed to check error: %s" % (err_msg, chkerr) + + self.logger.error(err_msg) + self.state = ResourceState.FAILED def stop(self): - # Kill - self._state = ResourceState.STOPPED + state = self.state + if state == ResourceState.STARTED: + (out, err), proc = self.node.kill(self.pid, self.ppid) + + if out or err: + # check if execution errors occurred + err_msg = " Failed to STOP command '%s' on host %s. Check error: %s. Run error: %s" % ( + self.get("command"), self.node.get("hostname"), err, out) + self.logger.error(err_msg) + self._state = ResourceState.FAILED + stopped = False + else: + super(LinuxApplication, self).stop() def release(self): tear_down = self.get("tearDown") if tear_down: - self.api.execute(tear_down) + self.node.execute(tear_down) - return self.api.kill(self.pid, self.ppid) + self.stop() + if self.state == ResourceState.STOPPED: + super(LinuxApplication, self).release() + + @property + def state(self): + if self._state == ResourceState.STARTED: + (out, err), proc = self.node.check_output(self.home, 'stderr') - def status(self): - return self.api.status(self.pid, self.ppid) + if out or err: + # check if execution errors occurred + err_msg = " Failed to execute command '%s' on host %s. Check error: %s. Run error: %s" % ( + self.get("command"), self.node.get("hostname"), err, out) + self.logger.error(err_msg) + self._state = ResourceState.FAILED - def make_app_home(self): - self.api.mkdir(self.home) + elif self.pid and self.ppid: + status = self.node.status(self.pid, self.ppid) - stdin = self.get("stdin") - if stdin: - self.api.upload(stdin, os.path.join(self.home, 'stdin')) + if status == sshfuncs.FINISHED: + self._state = ResourceState.FINISHED - def _validate_connection(self, guid): + return self._state + + def valid_connection(self, guid): # TODO: Validate! return True # XXX: What if it is connected to more than one node? @@ -156,3 +386,36 @@ class LinuxApplication(ResourceManager): self._node = resources[0] if len(resources) == 1 else None return self._node + def hash_app(self): + """ Generates a hash representing univokely the application. + Is used to determine whether the home directory should be cleaned + or not. + + """ + command = self.get("command") + forwards_x11 = self.get("forwardX11") + env = self.get("env") + sudo = self.get("sudo") + depends = self.get("depends") + sources = self.get("sources") + cls._register_attribute(sources) + cls._register_attribute(build) + cls._register_attribute(install) + cls._register_attribute(stdin) + cls._register_attribute(stdout) + cls._register_attribute(stderr) + cls._register_attribute(tear_down) + skey = "".join(map(str, args)) + return hashlib.md5(skey).hexdigest() + + def replace_paths(self, command): + """ + Replace all special path tags with shell-escaped actual paths. + """ + return ( command + .replace("${SOURCES}", self.src_dir) + .replace("${BUILD}", self.build_dir) + .replace("${APPHOME}", self.home) + .replace("${NODEHOME}", self.node.home) ) + + diff --git a/src/neco/resources/linux/channel.py b/src/neco/resources/linux/channel.py new file mode 100644 index 00000000..23f87b5a --- /dev/null +++ b/src/neco/resources/linux/channel.py @@ -0,0 +1,24 @@ +from neco.execution.attribute import Attribute, Flags +from neco.execution.resource import ResourceManager, clsinit, ResourceState +from neco.resources.linux.node import LinuxNode + +import collections +import logging +import os +import random +import re +import tempfile +import time +import threading + +@clsinit +class LinuxChannel(ResourceManager): + _rtype = "LinuxChannel" + + def __init__(self, ec, guid): + super(LinuxChannel, self).__init__(ec, guid) + self._logger = logging.getLogger("neco.linux.Channel.%d " % self.guid) + + def valid_connection(self, guid): + # TODO: Validate! + return True diff --git a/src/neco/resources/linux/debfuncs.py b/src/neco/resources/linux/debfuncs.py new file mode 100644 index 00000000..7cf72605 --- /dev/null +++ b/src/neco/resources/linux/debfuncs.py @@ -0,0 +1,26 @@ +# TODO: Investigate using http://nixos.org/nix/ + +def install_packages_command(os, packages): + if not isinstance(packages, list): + packages = [packages] + + cmd = "" + for p in packages: + cmd += " ( dpkg -s %(package)s || sudo apt-get -y install %(package)s ) ; " % { + 'package': p} + + #cmd = (dpkg -s vim || sudo dpkg -s install vim) ; (...) + return cmd + +def remove_packages_command(os, packages): + if not isinstance(packages, list): + packages = [packages] + + cmd = "" + for p in packages: + cmd += " ( dpkg -s %(package)s && sudo apt-get -y purge %(package)s ) ; " % { + 'package': p} + + #cmd = (dpkg -s vim || sudo apt-get -y purge vim) ; (...) + return cmd + diff --git a/src/neco/resources/linux/node.py b/src/neco/resources/linux/node.py index b482747b..a030eb43 100644 --- a/src/neco/resources/linux/node.py +++ b/src/neco/resources/linux/node.py @@ -13,6 +13,9 @@ import time import threading # TODO: Verify files and dirs exists already +# TODO: Blacklist node! + +DELAY ="1s" @clsinit class LinuxNode(ResourceManager): @@ -20,32 +23,35 @@ class LinuxNode(ResourceManager): @classmethod def _register_attributes(cls): - hostname = Attribute("hostname", "Hostname of the machine") + hostname = Attribute("hostname", "Hostname of the machine", + flags = Flags.ExecReadOnly) username = Attribute("username", "Local account username", flags = Flags.Credential) - port = Attribute("port", "SSH port", flags = Flags.Credential) + port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly) - home = Attribute("home", - "Experiment home directory to store all experiment related files") + home = Attribute("home", + "Experiment home directory to store all experiment related files", + flags = Flags.ExecReadOnly) identity = Attribute("identity", "SSH identity file", flags = Flags.Credential) server_key = Attribute("serverKey", "Server public key", - flags = Flags.Credential) + flags = Flags.ExecReadOnly) clean_home = Attribute("cleanHome", "Remove all files and directories " + \ " from home folder before starting experiment", - flags = Flags.ReadOnly) + flags = Flags.ExecReadOnly) clean_processes = Attribute("cleanProcesses", - "Kill all running processes before starting experiment", - flags = Flags.ReadOnly) + "Kill all running processes before starting experiment", + flags = Flags.ExecReadOnly) tear_down = Attribute("tearDown", "Bash script to be executed before " + \ - "releasing the resource", flags = Flags.ReadOnly) + "releasing the resource", + flags = Flags.ExecReadOnly) cls._register_attribute(hostname) cls._register_attribute(username) @@ -60,7 +66,6 @@ class LinuxNode(ResourceManager): def __init__(self, ec, guid): super(LinuxNode, self).__init__(ec, guid) self._os = None - self._home = "nepi-exp-%s" % os.urandom(8).encode('hex') # lock to avoid concurrency issues on methods used by applications self._lock = threading.Lock() @@ -69,10 +74,17 @@ class LinuxNode(ResourceManager): @property def home(self): - home = self.get("home") - if home and not home.startswith("nepi-"): - home = "nepi-" + home - return home or self._home + return self.get("home") or "/tmp" + + @property + def exp_dir(self): + exp_dir = os.path.join(self.home, self.ec.exp_id) + return exp_dir if exp_dir.startswith('/') else "${HOME}/" + + @property + def node_dir(self): + node_dir = "node-%d" % self.guid + return os.path.join(self.exp_dir, node_dir) @property def os(self): @@ -116,17 +128,29 @@ class LinuxNode(ResourceManager): self.logger.error("Deploy failed. Unresponsive node") return - def deploy(self): - self.provision() - if self.get("cleanProcesses"): self.clean_processes() if self.get("cleanHome"): - # self.clean_home() -> this is dangerous - pass + self.clean_home() + + self.mkdir(self.node_dir) - self.mkdir(self.home) + super(LinuxNode, self).provision() + + def deploy(self): + if self.state == ResourceState.NEW: + self.discover() + self.provision() + + # Node needs to wait until all associated interfaces are + # ready before it can finalize deployment + from neco.resources.linux.interface import LinuxInterface + ifaces = self.get_connected(LinuxInterface.rtype()) + for iface in ifaces: + if iface.state < ResourceState.READY: + self.ec.schedule(DELAY, self.deploy) + return super(LinuxNode, self).deploy() @@ -137,7 +161,7 @@ class LinuxNode(ResourceManager): super(LinuxNode, self).release() - def validate_connection(self, guid): + def valid_connection(self, guid): # TODO: Validate! return True @@ -152,38 +176,31 @@ class LinuxNode(ResourceManager): out = err = "" with self._lock: - (out, err), proc = self.run_and_wait(cmd, self.home, - pidfile = "cppid", - stdout = "cplog", - stderr = "cperr", - raise_on_error = True) - - return (out, err) + (out, err), proc = self.execute(cmd) def clean_home(self): self.logger.info("Cleaning up home") - cmd = "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \) -execdir rm -rf {} + " + cmd = ("cd %s ; " % self.home + + "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)"+ + " -execdir rm -rf {} + ") out = err = "" with self._lock: - (out, err), proc = self.run_and_wait(cmd, self.home, - pidfile = "chpid", - stdout = "chlog", - stderr = "cherr", - raise_on_error = True) - - return (out, err) + (out, err), proc = self.execute(cmd) - def upload(self, src, dst): + def upload(self, src, dst, text = False): """ Copy content to destination - src content to copy. Can be a local file, directory or text input + src content to copy. Can be a local file, directory or a list of files dst destination path on the remote host (remote is always self.host) + + text src is text input, it must be stored into a temp file before uploading """ # If source is a string input - if not os.path.isfile(src): + f = None + if text and not os.path.isfile(src): # src is text input that should be uploaded as file # create a temporal file with the content to upload f = tempfile.NamedTemporaryFile(delete=False) @@ -195,7 +212,13 @@ class LinuxNode(ResourceManager): # Build destination as @: dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst) - return self.copy(src, dst) + result = self.copy(src, dst) + + # clean up temp file + if f: + os.remove(f.name) + + return result def download(self, src, dst): if not self.localhost: @@ -203,7 +226,9 @@ class LinuxNode(ResourceManager): src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src) return self.copy(src, dst) - def install_packages(self, packages): + def install_packages(self, packages, home = None): + home = home or self.node_dir + cmd = "" if self.os in ["f12", "f14"]: cmd = rpmfuncs.install_packages_command(self.os, packages) @@ -217,15 +242,17 @@ class LinuxNode(ResourceManager): out = err = "" with self._lock: - (out, err), proc = self.run_and_wait(cmd, self.home, - pidfile = "instpkgpid", - stdout = "instpkglog", - stderr = "instpkgerr", + (out, err), proc = self.run_and_wait(cmd, home, + pidfile = "instpkg_pid", + stdout = "instpkg_log", + stderr = "instpkg_err", raise_on_error = True) return (out, err), proc - def remove_packages(self, packages): + def remove_packages(self, packages, home = None): + home = home or self.node_dir + cmd = "" if self.os in ["f12", "f14"]: cmd = rpmfuncs.remove_packages_command(self.os, packages) @@ -239,10 +266,10 @@ class LinuxNode(ResourceManager): out = err = "" with self._lock: - (out, err), proc = self.run_and_wait(cmd, self.home, - pidfile = "rmpkgpid", - stdout = "rmpkglog", - stderr = "rmpkgerr", + (out, err), proc = self.run_and_wait(cmd, home, + pidfile = "rmpkg_pid", + stdout = "rmpkg_log", + stderr = "rmpkg_err", raise_on_error = True) return (out, err), proc @@ -264,7 +291,13 @@ class LinuxNode(ResourceManager): stderr = 'stderr', sudo = False, raise_on_error = False): - + """ runs a command in background on the remote host, but waits + until the command finishes execution. + This is more robust than doing a simple synchronized 'execute', + since in the remote host the command can continue to run detached + even if network disconnections occur + """ + # run command in background in remote host (out, err), proc = self.run(command, home, pidfile = pidfile, stdin = stdin, @@ -272,21 +305,25 @@ class LinuxNode(ResourceManager): stderr = stderr, sudo = sudo) + # check no errors occurred if proc.poll() and err: msg = " Failed to run command %s on host %s" % ( command, self.get("hostname")) self.logger.error(msg) if raise_on_error: raise RuntimeError, msg - + + # Wait for pid file to be generated pid, ppid = self.wait_pid( home = home, pidfile = pidfile, raise_on_error = raise_on_error) + # wait until command finishes to execute self.wait_run(pid, ppid) - - (out, err), proc = self.check_run_error(home, stderr) + + # check if execution errors occurred + (out, err), proc = self.check_output(home, stderr) if err or out: msg = "Error while running command %s on host %s. error output: %s" % ( @@ -301,6 +338,8 @@ class LinuxNode(ResourceManager): return (out, err), proc def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False): + """ Waits until the pid file for the command is generated, + and returns the pid and ppid of the process """ pid = ppid = None delay = 1.0 for i in xrange(5): @@ -322,6 +361,7 @@ class LinuxNode(ResourceManager): return pid, ppid def wait_run(self, pid, ppid, trial = 0): + """ wait for a remote process to finish execution """ delay = 1.0 first = True bustspin = 0 @@ -344,17 +384,12 @@ class LinuxNode(ResourceManager): delay = min(30,delay*1.2) bustspin = 0 - def check_run_error(self, home, stderr = 'stderr'): + def check_output(self, home, filename): + """ checks file content """ (out, err), proc = self.execute("cat %s" % - os.path.join(home, stderr)) + os.path.join(home, filename)) return (out, err), proc - def check_run_output(self, home, stdout = 'stdout'): - (out, err), proc = self.execute("cat %s" % - os.path.join(home, stdout)) - return (out, err), proc - - def is_alive(self): if self.localhost: return True @@ -575,7 +610,7 @@ class LinuxNode(ResourceManager): self.logger.error("%s. out: %s error: %s", fail_msg, out, err) break except RuntimeError, e: - if x >= 3: + if i >= 3: self.logger.error("%s. error: %s", fail_msg, e.args) return (out, err), proc diff --git a/src/neco/resources/linux/rpmfuncs.py b/src/neco/resources/linux/rpmfuncs.py new file mode 100644 index 00000000..7f44887c --- /dev/null +++ b/src/neco/resources/linux/rpmfuncs.py @@ -0,0 +1,42 @@ +RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm' +RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm' + +# TODO: Investigate using http://nixos.org/nix/ + +def install_packages_command(os, packages): + if not isinstance(packages, list): + packages = [packages] + + cmd = "( %s )" % install_rpmfusion_command(os) + for p in packages: + cmd += " ; ( rpm -q %(package)s || sudo yum -y install %(package)s ) " % { + 'package': p} + + #cmd = ((rpm -q rpmfusion-free-release || sudo -s rpm -i ...) ; (rpm -q vim || sudo yum -y install vim)) + return " ( %s )" % cmd + +def remove_packages_command(os, packages): + if not isinstance(packages, list): + packages = [packages] + + cmd = "" + for p in packages: + cmd += " ( rpm -q %(package)s && sudo yum -y remove %(package)s ) ; " % { + 'package': p} + + #cmd = (rpm -q vim || sudo yum -y remove vim) ; (...) + return cmd + +def install_rpmfusion_command(os): + cmd = "rpm -q rpmfusion-free-release || sudo -S rpm -i %(package)s" + + if os == "f12": + cmd = cmd % {'package': RPM_FUSION_URL_F12} + elif os == "f14": + # This one works for f13+ + cmd = cmd % {'package': RPM_FUSION_URL} + else: + cmd = "" + + return cmd + diff --git a/src/neco/util/execfuncs.py b/src/neco/util/execfuncs.py new file mode 100644 index 00000000..2ff76ee1 --- /dev/null +++ b/src/neco/util/execfuncs.py @@ -0,0 +1,221 @@ +from neco.util.sshfuncs import RUNNING, FINISHED, NOT_STARTED, STDOUT + +import subprocess + +def lexec(command, + user = None, + sudo = False, + stdin = None, + env = None): + """ + Executes a local command, returns ((stdout,stderr),process) + """ + if env: + export = '' + for envkey, envval in env.iteritems(): + export += '%s=%s ' % (envkey, envval) + command = "%s %s" % (export, command) + + if sudo: + command = "sudo %s" % command + elif user: + command = "su %s ; %s " % (user, command) + + p = subprocess.Popen(command, + stdout = subprocess.PIPE, + stderr = subprocess.PIPE, + stdin = stdin) + + out, err = p.communicate() + return ((out, err), proc) + +def lcopy(source, dest, recursive = False): + """ + Copies from/to localy. + """ + + if TRACE: + print "scp", source, dest + + command = ["cp"] + if recursive: + command.append("-R") + + command.append(src) + command.append(dst) + + p = subprocess.Popen(command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + out, err = p.communicate() + return ((out, err), proc) + +def lspawn(command, pidfile, + stdout = '/dev/null', + stderr = STDOUT, + stdin = '/dev/null', + home = None, + create_home = False, + sudo = False, + user = None): + """ + Spawn a local command such that it will continue working asynchronously. + + Parameters: + command: the command to run - it should be a single line. + + pidfile: path of a (ideally unique to this task) pidfile for tracking the process. + + stdout: path of a file to redirect standard output to - must be a string. + Defaults to /dev/null + stderr: path of a file to redirect standard error to - string or the special STDOUT value + to redirect to the same file stdout was redirected to. Defaults to STDOUT. + stdin: path of a file with input to be piped into the command's standard input + + home: path of a folder to use as working directory - should exist, unless you specify create_home + + create_home: if True, the home folder will be created first with mkdir -p + + sudo: whether the command needs to be executed as root + + Returns: + (stdout, stderr), process + + Of the spawning process, which only captures errors at spawning time. + Usually only useful for diagnostics. + """ + # Start process in a "daemonized" way, using nohup and heavy + # stdin/out redirection to avoid connection issues + if stderr is STDOUT: + stderr = '&1' + else: + stderr = ' ' + stderr + + daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % { + 'command' : command, + 'pidfile' : shell_escape(pidfile), + 'stdout' : stdout, + 'stderr' : stderr, + 'stdin' : stdin, + } + + cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % { + 'command' : shell_escape(daemon_command), + 'sudo' : 'sudo -S' if sudo else '', + 'pidfile' : shell_escape(pidfile), + 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '', + 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home else '', + } + + (out,err),proc = lexec(cmd) + + if proc.wait(): + raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,) + + return (out,err),proc + +def lcheckpid(pidfile): + """ + Check the pidfile of a process spawned with remote_spawn. + + Parameters: + pidfile: the pidfile passed to remote_span + + Returns: + + A (pid, ppid) tuple useful for calling remote_status and remote_kill, + or None if the pidfile isn't valid yet (maybe the process is still starting). + """ + + (out,err),proc = lexec("cat %s" % pidfile ) + + if proc.wait(): + return None + + if out: + try: + return map(int,out.strip().split(' ',1)) + except: + # Ignore, many ways to fail that don't matter that much + return None + +def lstatus(pid, ppid): + """ + Check the status of a process spawned with remote_spawn. + + Parameters: + pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid + + Returns: + + One of NOT_STARTED, RUNNING, FINISHED + """ + + (out,err),proc = lexec( + # Check only by pid. pid+ppid does not always work (especially with sudo) + " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % { + 'ppid' : ppid, + 'pid' : pid, + }) + + if proc.wait(): + return NOT_STARTED + + status = False + if out: + status = (out.strip() == 'wait') + else: + return NOT_STARTED + return RUNNING if status else FINISHED + + +def lkill(pid, ppid, sudo = False): + """ + Kill a process spawned with lspawn. + + First tries a SIGTERM, and if the process does not end in 10 seconds, + it sends a SIGKILL. + + Parameters: + pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid + + sudo: whether the command was run with sudo - careful killing like this. + + Returns: + + Nothing, should have killed the process + """ + + subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid } + cmd = """ +SUBKILL="%(subkill)s" ; +%(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true +%(sudo)s kill %(pid)d $SUBKILL || /bin/true +for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do + sleep 0.2 + if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then + break + else + %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true + %(sudo)s kill %(pid)d $SUBKILL || /bin/true + fi + sleep 1.8 +done +if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then + %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true + %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true +fi +""" + if nowait: + cmd = "( %s ) >/dev/null 2>/dev/null 0) @skipIfNotAlive - def t_run(self, node): - node.mkdir(self.home, clean = True) + def t_run(self, host, user): + node, ec = create_node(host, user) - command = "ping %s" % self.target - dst = os.path.join(self.home, "app.sh") - node.upload(command, dst) + app_home = os.path.join(node.exp_dir, "my-app") + node.mkdir(app_home, clean = True) - cmd = "bash ./app.sh" - node.run(cmd, self.home) - pid, ppid = node.checkpid(self.home) + command = "ping %s" % self.target + node.run(command, app_home) + pid, ppid = node.checkpid(app_home) status = node.status(pid, ppid) self.assertTrue(status, RUNNING) @@ -99,17 +63,20 @@ class LinuxNodeTestCase(unittest.TestCase): status = node.status(pid, ppid) self.assertTrue(status, FINISHED) - (out, err), proc = node.check_run_output(self.home) + (out, err), proc = node.check_output(app_home, "stdout") expected = """64 bytes from""" self.assertTrue(out.find(expected) > 0) - node.rmdir(self.home) + node.rmdir(app_home) @skipIfNotAlive - def t_install(self, node): - node.mkdir(self.home, clean = True) + def t_install(self, host, user): + node, ec = create_node(host, user) + + app_home = os.path.join(node.exp_dir, "my-app") + node.mkdir(app_home, clean = True) prog = """#include @@ -121,35 +88,36 @@ main (void) } """ # upload the test program - dst = os.path.join(self.home, "hello.c") - node.upload(prog, dst) + dst = os.path.join(app_home, "hello.c") + node.upload(prog, dst, text = True) # install gcc node.install_packages('gcc') # compile the program using gcc - command = "cd %s; gcc -Wall hello.c -o hello" % self.home + command = "cd %s; gcc -Wall hello.c -o hello" % app_home (out, err), proc = node.execute(command) # execute the program and get the output from stdout - command = "%s/hello" % self.home + command = "%s/hello" % app_home (out, err), proc = node.execute(command) self.assertEquals(out, "Hello, world!\n") # execute the program and get the output from a file - command = "%(home)s/hello > %(home)s/hello.out" % {'home':self.home} + command = "%(home)s/hello > %(home)s/hello.out" % { + 'home': app_home} (out, err), proc = node.execute(command) # retrieve the output file - src = os.path.join(self.home, "hello.out") + src = os.path.join(app_home, "hello.out") f = tempfile.NamedTemporaryFile(delete=False) dst = f.name node.download(src, dst) f.close() node.remove_packages('gcc') - node.rmdir(self.home) + node.rmdir(app_home) f = open(dst, "r") out = f.read() @@ -158,27 +126,27 @@ main (void) self.assertEquals(out, "Hello, world!\n") def test_execute_fedora(self): - self.t_execute(self.node_fedora) + self.t_execute(self.fedora_host, self.fedora_user) def test_execute_ubuntu(self): - self.t_execute(self.node_ubuntu) + self.t_execute(self.ubuntu_host, self.ubuntu_user) def test_run_fedora(self): - self.t_run(self.node_fedora) + self.t_run(self.fedora_host, self.fedora_user) def test_run_ubuntu(self): - self.t_run(self.node_ubuntu) + self.t_run(self.ubuntu_host, self.ubuntu_user) def test_intall_fedora(self): - self.t_install(self.node_fedora) + self.t_install(self.fedora_host, self.fedora_user) def test_install_ubuntu(self): - self.t_install(self.node_ubuntu) + self.t_install(self.ubuntu_host, self.ubuntu_user) @skipInteractive def test_xterm_ubuntu(self): """ Interactive test. Should not run automatically """ - self.t_xterm(self.node_ubuntu) + self.t_xterm(self.ubuntu_host, self.ubuntu_user) if __name__ == '__main__': diff --git a/test/resources/linux/test_utils.py b/test/resources/linux/test_utils.py new file mode 100644 index 00000000..885af79c --- /dev/null +++ b/test/resources/linux/test_utils.py @@ -0,0 +1,47 @@ +from neco.resources.linux.node import LinuxNode + +import os + +class DummyEC(object): + @property + def exp_id(self): + return "nepi-1" + +def create_node(hostname, username): + ec = DummyEC() + node = LinuxNode(ec, 1) + node.set("hostname", hostname) + node.set("username", username) + + # If we don't return the reference to the EC + # it will be released by the garbage collector since + # the resources only save a weak refernce to it. + return node, ec + +def skipIfNotAlive(func): + name = func.__name__ + def wrapped(*args, **kwargs): + node, ec = create_node(args[1], args[2]) + + if not node.is_alive(): + print "*** WARNING: Skipping test %s: Node %s is not alive\n" % ( + name, node.get("hostname")) + return + + return func(*args, **kwargs) + + return wrapped + +def skipInteractive(func): + name = func.__name__ + def wrapped(*args, **kwargs): + mode = os.environ.get("NEPI_INTERACTIVE", False) in ['True', 'true', 'yes', 'YES'] + if not mode: + print "*** WARNING: Skipping test %s: Interactive mode off \n" % name + return + + return func(*args, **kwargs) + + return wrapped + + -- 2.43.0