From: Alina Quereilhac Date: Fri, 5 Apr 2013 22:41:36 +0000 (+0200) Subject: Adding start_with_condition, stop_with_condition and set_with_condition to the EC... X-Git-Tag: nepi-3.0.0~122^2~17 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=4e6d8622b6b960689f2ef6b7e54f81535a5fd854;p=nepi.git Adding start_with_condition, stop_with_condition and set_with_condition to the EC (xxx: UNTESTED!!!) --- diff --git a/src/neco/execution/ec.py b/src/neco/execution/ec.py index 076e4dd5..1b7d4be8 100644 --- a/src/neco/execution/ec.py +++ b/src/neco/execution/ec.py @@ -6,10 +6,13 @@ import threading from neco.util import guid from neco.util.timefuncs import strfnow, strfdiff, strfvalid -from neco.execution.resource import ResourceFactory +from neco.execution.resource import ResourceFactory, ResourceAction, \ + ResourceState from neco.execution.scheduler import HeapScheduler, Task, TaskStatus from neco.util.parallel import ParallelRun +# TODO: use multiprocessing instead of threading + class ExperimentController(object): def __init__(self, root_dir = "/tmp", loglevel = 'error'): super(ExperimentController, self).__init__() @@ -75,46 +78,43 @@ class ExperimentController(object): rm1.connect(guid2) rm2.connect(guid1) - def discover_resource(self, guid, filters): - rm = self.get_resource(guid) - return rm.discover(filters) + def register_condition(self, group1, action, group2, state, + time = None): + """ Registers an action START or STOP for all RM on group1 to occur + time 'time' after all elements in group2 reached state 'state'. - def provision_resource(self, guid, filters): - rm = self.get_resource(guid) - return rm.provision(filters) + :param group1: List of guids of RMs subjected to action + :type group1: list - def register_start(self, group1, time, after_status, group2): - if isinstance(group1, int): - group1 = list[group1] - if isinstance(group2, int): - group2 = list[group2] + :param action: Action to register (either START or STOP) + :type action: ResourceAction - for guid1 in group1: - for guid2 in group2: - rm = self.get_resource(guid) - rm.start_after(time, after_status, guid2) + :param group2: List of guids of RMs to we waited for + :type group2: list - def register_stop(self, group1, time, after_status, group2): - if isinstance(group1, int): - group1 = list[group1] - if isinstance(group2, int): - group2 = list[group2] + :param state: State to wait for on RMs (STARTED, STOPPED, etc) + :type state: ResourceState - for guid1 in group1: - for guid2 in group2: - rm = self.get_resource(guid) - rm.stop_after(time, after_status, guid2) + :param time: Time to wait after group2 has reached status + :type time: string - def register_set(self, name, value, group1, time, after_status, group2): + """ if isinstance(group1, int): group1 = list[group1] if isinstance(group2, int): group2 = list[group2] for guid1 in group1: - for guid2 in group2: - rm = self.get_resource(guid) - rm.set_after(name, value, time, after_status, guid2) + rm = self.get_resource(guid) + rm.register_condition(action, group2, state, time) + + def discover(self, guid, filters): + rm = self.get_resource(guid) + return rm.discover(filters) + + def provision(self, guid, filters): + rm = self.get_resource(guid) + return rm.provision(filters) def get(self, guid, name): rm = self.get_resource(guid) @@ -124,15 +124,83 @@ class ExperimentController(object): rm = self.get_resource(guid) return rm.set(name, value) - def status(self, guid): + def state(self, guid): rm = self.get_resource(guid) - return rm.status() + return rm.state def stop(self, guid): rm = self.get_resource(guid) return rm.stop() - def deploy(self, group = None, start_when_all_ready = True): + def start(self, guid): + rm = self.get_resource(guid) + return rm.start() + + def set_with_conditions(self, name, value, group1, group2, state, + time = None): + """ Set value 'value' on attribute with name 'name' on all RMs of + group1 when 'time' has elapsed since all elements in group2 + have reached state 'state'. + + :param name: Name of attribute to set in RM + :type name: string + + :param value: Value of attribute to set in RM + :type name: string + + :param group1: List of guids of RMs subjected to action + :type group1: list + + :param action: Action to register (either START or STOP) + :type action: ResourceAction + + :param group2: List of guids of RMs to we waited for + :type group2: list + + :param state: State to wait for on RMs (STARTED, STOPPED, etc) + :type state: ResourceState + + :param time: Time to wait after group2 has reached status + :type time: string + + """ + if isinstance(group1, int): + group1 = list[group1] + if isinstance(group2, int): + group2 = list[group2] + + for guid1 in group1: + rm = self.get_resource(guid) + rm.set_with_conditions(name, value, group2, state, time) + + def stop_with_conditions(self, guid): + rm = self.get_resource(guid) + return rm.stop_with_conditions() + + def start_with_conditions(self, guid): + rm = self.get_resource(guid) + return rm.start_with_condition() + + def deploy(self, group = None, wait_all_ready = True): + """ Deploy all resource manager in group + + :param group: List of guids of RMs to deploy + :type group: list + + :param wait_all_ready: Wait until all RMs are deployed in + order to start the RMs + :type guid: int + + """ + def steps(rm): + rm.deploy() + rm.start_with_conditions() + + # Only if the RM has STOP consitions we + # schedule a stop. Otherwise the RM will stop immediately + if rm.conditions.get(ResourceAction.STOP): + rm.stop_with_conditions() + if not group: group = self.resources @@ -140,13 +208,13 @@ class ExperimentController(object): for guid in group: rm = self.get_resource(guid) - kwargs = {'target': rm.deploy} - if start_when_all_ready: + if wait_all_ready: towait = list(group) towait.remove(guid) - kwargs['args'] = towait + self.register_condition(guid, ResourceAction.START, + towait, ResourceState.DEPLOYED) - thread = threading.Thread(kwargs) + thread = threading.Thread(target = steps, args = (rm)) threads.append(thread) thread.start() diff --git a/src/neco/execution/resource.py b/src/neco/execution/resource.py index 62bf821c..deaa063e 100644 --- a/src/neco/execution/resource.py +++ b/src/neco/execution/resource.py @@ -1,7 +1,25 @@ + +from neco.util.timefuncs import strfnow, strfdiff, strfvalid + import copy +import functools import logging import weakref +_reschedule_delay = "1s" + +class ResourceAction: + START = 0 + STOP = 1 + +class ResourceState: + NEW = 0 + DEPLOYED = 1 + STARTED = 2 + STOPPED = 3 + FAILED = 4 + RELEASED = 5 + def clsinit(cls): cls._clsinit() return cls @@ -63,14 +81,25 @@ class ResourceManager(object): self._guid = guid self._ec = weakref.ref(ec) self._connections = set() + self._conditions = dict() + # the resource instance gets a copy of all attributes # that can modify self._attrs = copy.deepcopy(self._attributes) + self._state = ResourceState.NEW + + self._start_time = None + self._stop_time = None + # Logging self._logger = logging.getLogger("neco.execution.resource.Resource.%s" % self.guid) + @property + def logger(self): + return self._logger + @property def guid(self): return self._guid @@ -79,48 +108,167 @@ class ResourceManager(object): def ec(self): return self._ec() - def connect(self, guid): - if (self._validate_connection(guid)): - self._connections.add(guid) - @property def connections(self): return self._connections - def discover(self, filters): - pass + @property + def conditons(self): + return self._conditions - def provision(self, filters): - pass + @property + def start_time(self): + """ timestamp with """ + return self._start_time - def set(self, name, value): - attr = self._attrs[name] - attr.value = value + @property + def stop_time(self): + return self._stop_time - def get(self, name): - attr = self._attrs[name] - return attr.value + @property + def state(self): + return self._state - def start_after(self, time, after_status, guid): - pass + def connect(self, guid): + if (self._validate_connection(guid)): + self._connections.add(guid) - def stop_after(self, time, after_status, guid): + def discover(self, filters = None): pass - def set_after(self, name, value, time, after_status, guid): + def provision(self, filters = None): pass def start(self): - pass + if not self._state in [ResourceState.DEPLOYED, ResourceState.STOPPED]: + self.logger.error("Wrong state %s for start" % self.state) + + self._start_time = strfnow() + self._state = ResourceState.STARTED def stop(self): - pass + if not self._state in [ResourceState.STARTED]: + self.logger.error("Wrong state %s for stop" % self.state) - def deploy(self, group = None): - pass + self._stop_time = strfnow() + self._state = ResourceState.STOPPED + + def set(self, name, value): + attr = self._attrs[name] + attr.value = value + + def get(self, name): + attr = self._attrs[name] + return attr.value + + def register_condition(self, action, group, state, + time = None): + if action not in self.conditions: + self._conditions[action] = set() + + self.conditions.get(action).add((group, state, time)) + + def _needs_reschedule(self, group, state, time): + reschedule = False + delay = _reschedule_delay + + # check state and time elapsed on all RMs + for guid in group: + rm = self.ec.get_resource(guid) + # If the RMs is lower than the requested state we must + # reschedule (e.g. if RM is DEPLOYED but we required STARTED) + if rm.state < state: + reschedule = True + break + + if time: + if state == ResourceAction.START: + t = rm.start_time + elif state == ResourceAction.STOP: + t = rm.stop_time + else: + # Only keep time information for START and STOP + break + + delay = strfdiff(t, strnow()) + if delay < time: + reschedule = True + break + + return reschedule, delay + + def set_with_conditions(self, name, value, group, state, time): + reschedule = False + delay = _reschedule_delay + + ## evaluate if set conditions are met + + # only can set with conditions after the RM is started + if self.status != ResourceStatus.STARTED: + reschedule = True + else: + reschedule, delay = self._needs_reschedule(group, state, time) + + if reschedule: + callback = functools.partial(self.set_with_conditions, + name, value, group, state, time) + self.ec.schedule(delay, callback) + else: + self.set(name, value) + + def start_with_conditions(self): + reschedule = False + delay = _reschedule_delay + + ## evaluate if set conditions are met + + # only can start when RM is either STOPPED or DEPLOYED + if self.status not in [ResourceStatus.STOPPED, ResourceStatus.DEPLOYED]: + reschedule = True + else: + for action, (group, state, time) in self.conditions.iteritems(): + if action == ResourceAction.START: + reschedule, delay = self._needs_reschedule(group, state, time) + if reschedule: + break + + if reschedule: + callback = functools.partial(self.start_with_conditions, + group, state, time) + self.ec.schedule(delay, callback) + else: + self.start() + + def stop_with_conditions(self): + reschedule = False + delay = _reschedule_delay + + ## evaluate if set conditions are met + + # only can start when RM is either STOPPED or DEPLOYED + if self.status != ResourceStatus.STARTED: + reschedule = True + else: + for action, (group, state, time) in self.conditions.iteritems(): + if action == ResourceAction.STOP: + reschedule, delay = self._needs_reschedule(group, state, time) + if reschedule: + break + + if reschedule: + callback = functools.partial(self.stop_with_conditions, + group, state, time) + self.ec.schedule(delay, callback) + else: + self.stop() + + def deploy(self): + self.discover() + self.provision() + self._state = ResourceState.DEPLOYED def release(self): - pass + self._state = ResourceState.RELEASED def _validate_connection(self, guid): # TODO: Validate! diff --git a/src/neco/resources/linux/application.py b/src/neco/resources/linux/application.py index 1ffcdf57..de4f8505 100644 --- a/src/neco/resources/linux/application.py +++ b/src/neco/resources/linux/application.py @@ -1,69 +1,154 @@ -from neco.execution import tags -from neco.execution.resource import ResourceManager +from neco.execution.attribute import Attribute, Flags +from neco.execution.resource import ResourceManager, clsinit, ResourceState +from neco.resources.linux.ssh_api import SSHApiFactory -import cStringIO import logging -class Application(ResourceManager): - def __init__(self, box, ec): - super(Application, self).__init__(box, ec) - self.command = None - self.pid = None - self.ppid = None - self.stdin = None - self.del_app_home = True - self.env = None - - self.app_home = "${HOME}/app-%s" % self.box.guid +@clsinit +class LinuxApplication(ResourceManager): + _rtype = "LinuxApplication" + + @classmethod + def _register_attributes(cls): + command = Attribute("command", "Command to execute", + flags = Flags.ReadOnly) + env = Attribute("env", "Environment variables string for command execution", + flags = Flags.ReadOnly) + sudo = Attribute("sudo", "Run with root privileges", + flags = Flags.ReadOnly) + depends = Attribute("depends", + "Space-separated list of packages required to run the application", + flags = Flags.ReadOnly) + sources = Attribute("sources", + "Space-separated list of regular files to be deployed in the working " + "path prior to building. Archives won't be expanded automatically.", + flags = Flags.ReadOnly) + build = Attribute("build", + "Build commands to execute after deploying the sources. " + "Sources will be in the ${SOURCES} folder. " + "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n" + "Try to make the commands return with a nonzero exit code on error.\n" + "Also, do not install any programs here, use the 'install' attribute. This will " + "help keep the built files constrained to the build folder (which may " + "not be the home folder), and will result in faster deployment. Also, " + "make sure to clean up temporary files, to reduce bandwidth usage between " + "nodes when transferring built packages.", + flags = Flags.ReadOnly) + install = Attribute("install", + "Commands to transfer built files to their final destinations. " + "Sources will be in the initial working folder, and a special " + "tag ${SOURCES} can be used to reference the experiment's " + "home folder (where the application commands will run).\n" + "ALL sources and targets needed for execution must be copied there, " + "if building has been enabled.\n" + "That is, 'slave' nodes will not automatically get any source files. " + "'slave' nodes don't get build dependencies either, so if you need " + "make and other tools to install, be sure to provide them as " + "actual dependencies instead.", + flags = Flags.ReadOnly) + stdin = Attribute("stdin", "Standard input", flags = Flags.ReadOnly) + stdout = Attribute("stdout", "Standard output", flags = Flags.ReadOnly) + stderr = Attribute("stderr", "Standard error", flags = Flags.ReadOnly) + + tear_down = Attribute("tearDown", "Bash script to be executed before + releasing the resource", flags = Flags.ReadOnly) + + cls._register_attribute(command) + cls._register_attribute(env) + cls._register_attribute(sudo) + cls._register_attribute(depends) + cls._register_attribute(sources) + cls._register_attribute(build) + cls._register_attribute(install) + cls._register_attribute(stdin) + cls._register_attribute(stdout) + cls._register_attribute(stderr) + cls._register_attribute(tear_down) + + def __init__(self, ec, guid): + super(LinuxApplication, self).__init__(ec, guid) + self._pid = None + self._ppid = None + self._home = "${HOME}/app-%s" % self.box.guid self._node = None - - # Logging - loglevel = "debug" - self._logger = logging.getLogger("neco.resources.base.Application.%s" % self.guid) - self._logger.setLevel(getattr(logging, loglevel.upper())) + + self._logger = logging.getLogger("neco.linux.Application.%d" % guid) + + @property + def api(self): + return self.node.api @property def node(self): - if self._node: - return self._node + self._node - # XXX: What if it is connected to more than one node? - resources = self.find_resources(exact_tags = [tags.NODE]) - self._node = resources[0] is len(resources) == 1 else None - return self._node + @property + def home(self): + return self._home - def make_app_home(self): - self.node.mkdir(self.app_home) + @property + def pid(self): + return self._pid - if self.stdin: - self.node.upload(self.stdin, os.path.join(self.app_home, 'stdin')) + @property + def ppid(self): + return self._ppid - def cleanup(self): - self.kill() + def provision(self, filters = None): + # clean home + # upload + # build + # Install stuff!! + pass - def run(self): - dst = os.path.join(self.app_home, "app.sh") + def start(self): + dst = os.path.join(self.home, "app.sh") # Create shell script with the command # This way, complex commands and scripts can be ran seamlessly # sync files cmd = "" - if self.env: + env = self.get("env") + if env: for envkey, envvals in env.iteritems(): for envval in envvals: cmd += 'export %s=%s\n' % (envkey, envval) - cmd += self.command - self.node.upload(cmd, dst) + cmd += self.get("command") + self.api.upload(cmd, dst) command = 'bash ./app.sh' - stdin = 'stdin' if self.stdin else None - self.node.run(command, self.app_home, stdin = stdin) - self.pid, self.ppid = self.node.checkpid(self.app_home) + stdin = 'stdin' if self.get("stdin") else None + self.api.run(command, self.home, stdin = stdin) + self._pid, self._ppid = self.api.checkpid(self.app_home) + + def stop(self): + self._state = ResourceState.STOPPED + + def release(self): + tear_down = self.get("tearDown") + if tear_down: + self.api.execute(tear_down) + + return self.api.kill(self.pid, self.ppid) def status(self): - return self.node.status(self.pid, self.ppid) + return self.api.status(self.pid, self.ppid) + + def make_app_home(self): + self.api.mkdir(self.home) + + stdin = self.get("stdin") + if stdin: + self.api.upload(stdin, os.path.join(self.home, 'stdin')) + + def _validate_connection(self, guid): + # TODO: Validate! + return True + # XXX: What if it is connected to more than one node? + resources = self.find_resources(exact_tags = [tags.NODE]) + self._node = resources[0] is len(resources) == 1 else None + return self._node + - def kill(self): - return self.node.kill(self.pid, self.ppid) diff --git a/src/neco/resources/linux/node.py b/src/neco/resources/linux/node.py index 80bb103f..0424f1e0 100644 --- a/src/neco/resources/linux/node.py +++ b/src/neco/resources/linux/node.py @@ -1,5 +1,8 @@ -from neco.execution.resource import ResourceManager, clsinit from neco.execution.attribute import Attribute, Flags +from neco.execution.resource import ResourceManager, clsinit, ResourceState +from neco.resources.linux.ssh_api import SSHApiFactory + +import logging @clsinit class LinuxNode(ResourceManager): @@ -10,42 +13,93 @@ class LinuxNode(ResourceManager): hostname = Attribute("hostname", "Hostname of the machine") username = Attribute("username", "Local account username", flags = Flags.Credential) - password = Attribute("pasword", "Local account password", + identity = Attribute("identity", "SSH identity file", flags = Flags.Credential) + clean_home = Attribute("cleanHome", "Remove all files and directories + from home folder before starting experiment", + flags = Flags.ReadOnly) + clean_processes = Attribute("cleanProcesses", + "Kill all running processes before starting experiment", + flags = Flags.ReadOnly) + tear_down = Attribute("tearDown", "Bash script to be executed before + releasing the resource", flags = Flags.ReadOnly) cls._register_attribute(hostname) cls._register_attribute(username) - cls._register_attribute(password) + cls._register_attribute(identity) + cls._register_attribute(clean_home) + cls._register_attribute(clean_processes) + cls._register_attribute(tear_down) def __init__(self, ec, guid): super(LinuxNode, self).__init__(ec, guid) self._logger = logging.getLogger("neco.linux.Node.%d" % guid) - #elf._logger.setLevel(neco.LOGLEVEL) - - def deploy(self): - pass - def discover(self, filters): - pass + def provision(self, filters = None): + if not self.api.is_alive(): + self._state = ResourceState.FAILED + self.logger.error("Deploy failed. Unresponsive node") + return + + if self.get("cleanProcesses"): + self._clean_processes() - def provision(self, filters): - pass + if self.get("cleanHome"): + # self._clean_home() -> this is dangerous + pass - def start(self): - pass - - def stop(self): - pass - - def deploy(self, group = None): - pass + def deploy(self): + self.provision() + super(LinuxNode, self).deploy() def release(self): - pass + tear_down = self.get("tearDown") + if tear_down: + self.api.execute(tear_down) + + super(LinuxNode, self).release() def _validate_connection(self, guid): # TODO: Validate! return True + @property + def api(self): + host = self.get("host") + user = self.get("user") + identity = self.get("identity") + return SSHApiFactory.get_api(host, user, identity) + + def _clean_processes(self): + hostname = self.get("hostname") + self.logger.info("Cleaning up processes on %s", hostname) + + cmds = [ + "sudo -S killall python tcpdump || /bin/true ; " + "sudo -S killall python tcpdump || /bin/true ; " + "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ", + "sudo -S killall -u root || /bin/true ", + "sudo -S killall -u root || /bin/true ", + ] + + api = self.api + for cmd in cmds: + out, err = api.execute(cmd) + if err: + self.logger.error(err) + + def _clean_home(self): + hostname = self.get("hostname") + self.logger.info("Cleaning up home on %s", hostname) + + cmds = [ + "find . -maxdepth 1 ! -name '.bash*' ! -name '.' -execdir rm -rf {} + " + ] + + api = self.api + for cmd in cmds: + out, err = api.execute(cmd) + if err: + self.logger.error(err) diff --git a/src/neco/util/timefuncs.py b/src/neco/util/timefuncs.py index 7bd3d56a..b62c6406 100644 --- a/src/neco/util/timefuncs.py +++ b/src/neco/util/timefuncs.py @@ -5,7 +5,7 @@ _strf = "%Y%m%d%H%M%S%f" _reabs = re.compile("^\d{20}$") _rerel = re.compile("^(?P