from neco.util import guid
from neco.util.timefuncs import strfnow, strfdiff, strfvalid
-from neco.execution.resource import ResourceFactory
+from neco.execution.resource import ResourceFactory, ResourceAction, \
+ ResourceState
from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
from neco.util.parallel import ParallelRun
+# TODO: use multiprocessing instead of threading
+
class ExperimentController(object):
def __init__(self, root_dir = "/tmp", loglevel = 'error'):
super(ExperimentController, self).__init__()
rm1.connect(guid2)
rm2.connect(guid1)
- def discover_resource(self, guid, filters):
- rm = self.get_resource(guid)
- return rm.discover(filters)
+ def register_condition(self, group1, action, group2, state,
+ time = None):
+ """ Registers an action START or STOP for all RM on group1 to occur
+ time 'time' after all elements in group2 reached state 'state'.
- def provision_resource(self, guid, filters):
- rm = self.get_resource(guid)
- return rm.provision(filters)
+ :param group1: List of guids of RMs subjected to action
+ :type group1: list
- def register_start(self, group1, time, after_status, group2):
- if isinstance(group1, int):
- group1 = list[group1]
- if isinstance(group2, int):
- group2 = list[group2]
+ :param action: Action to register (either START or STOP)
+ :type action: ResourceAction
- for guid1 in group1:
- for guid2 in group2:
- rm = self.get_resource(guid)
- rm.start_after(time, after_status, guid2)
+ :param group2: List of guids of RMs to we waited for
+ :type group2: list
- def register_stop(self, group1, time, after_status, group2):
- if isinstance(group1, int):
- group1 = list[group1]
- if isinstance(group2, int):
- group2 = list[group2]
+ :param state: State to wait for on RMs (STARTED, STOPPED, etc)
+ :type state: ResourceState
- for guid1 in group1:
- for guid2 in group2:
- rm = self.get_resource(guid)
- rm.stop_after(time, after_status, guid2)
+ :param time: Time to wait after group2 has reached status
+ :type time: string
- def register_set(self, name, value, group1, time, after_status, group2):
+ """
if isinstance(group1, int):
group1 = list[group1]
if isinstance(group2, int):
group2 = list[group2]
for guid1 in group1:
- for guid2 in group2:
- rm = self.get_resource(guid)
- rm.set_after(name, value, time, after_status, guid2)
+ rm = self.get_resource(guid)
+ rm.register_condition(action, group2, state, time)
+
+ def discover(self, guid, filters):
+ rm = self.get_resource(guid)
+ return rm.discover(filters)
+
+ def provision(self, guid, filters):
+ rm = self.get_resource(guid)
+ return rm.provision(filters)
def get(self, guid, name):
rm = self.get_resource(guid)
rm = self.get_resource(guid)
return rm.set(name, value)
- def status(self, guid):
+ def state(self, guid):
rm = self.get_resource(guid)
- return rm.status()
+ return rm.state
def stop(self, guid):
rm = self.get_resource(guid)
return rm.stop()
- def deploy(self, group = None, start_when_all_ready = True):
+ def start(self, guid):
+ rm = self.get_resource(guid)
+ return rm.start()
+
+ def set_with_conditions(self, name, value, group1, group2, state,
+ time = None):
+ """ Set value 'value' on attribute with name 'name' on all RMs of
+ group1 when 'time' has elapsed since all elements in group2
+ have reached state 'state'.
+
+ :param name: Name of attribute to set in RM
+ :type name: string
+
+ :param value: Value of attribute to set in RM
+ :type name: string
+
+ :param group1: List of guids of RMs subjected to action
+ :type group1: list
+
+ :param action: Action to register (either START or STOP)
+ :type action: ResourceAction
+
+ :param group2: List of guids of RMs to we waited for
+ :type group2: list
+
+ :param state: State to wait for on RMs (STARTED, STOPPED, etc)
+ :type state: ResourceState
+
+ :param time: Time to wait after group2 has reached status
+ :type time: string
+
+ """
+ if isinstance(group1, int):
+ group1 = list[group1]
+ if isinstance(group2, int):
+ group2 = list[group2]
+
+ for guid1 in group1:
+ rm = self.get_resource(guid)
+ rm.set_with_conditions(name, value, group2, state, time)
+
+ def stop_with_conditions(self, guid):
+ rm = self.get_resource(guid)
+ return rm.stop_with_conditions()
+
+ def start_with_conditions(self, guid):
+ rm = self.get_resource(guid)
+ return rm.start_with_condition()
+
+ def deploy(self, group = None, wait_all_ready = True):
+ """ Deploy all resource manager in group
+
+ :param group: List of guids of RMs to deploy
+ :type group: list
+
+ :param wait_all_ready: Wait until all RMs are deployed in
+ order to start the RMs
+ :type guid: int
+
+ """
+ def steps(rm):
+ rm.deploy()
+ rm.start_with_conditions()
+
+ # Only if the RM has STOP consitions we
+ # schedule a stop. Otherwise the RM will stop immediately
+ if rm.conditions.get(ResourceAction.STOP):
+ rm.stop_with_conditions()
+
if not group:
group = self.resources
for guid in group:
rm = self.get_resource(guid)
- kwargs = {'target': rm.deploy}
- if start_when_all_ready:
+ if wait_all_ready:
towait = list(group)
towait.remove(guid)
- kwargs['args'] = towait
+ self.register_condition(guid, ResourceAction.START,
+ towait, ResourceState.DEPLOYED)
- thread = threading.Thread(kwargs)
+ thread = threading.Thread(target = steps, args = (rm))
threads.append(thread)
thread.start()
+
+from neco.util.timefuncs import strfnow, strfdiff, strfvalid
+
import copy
+import functools
import logging
import weakref
+_reschedule_delay = "1s"
+
+class ResourceAction:
+ START = 0
+ STOP = 1
+
+class ResourceState:
+ NEW = 0
+ DEPLOYED = 1
+ STARTED = 2
+ STOPPED = 3
+ FAILED = 4
+ RELEASED = 5
+
def clsinit(cls):
cls._clsinit()
return cls
self._guid = guid
self._ec = weakref.ref(ec)
self._connections = set()
+ self._conditions = dict()
+
# the resource instance gets a copy of all attributes
# that can modify
self._attrs = copy.deepcopy(self._attributes)
+ self._state = ResourceState.NEW
+
+ self._start_time = None
+ self._stop_time = None
+
# Logging
self._logger = logging.getLogger("neco.execution.resource.Resource.%s" %
self.guid)
+ @property
+ def logger(self):
+ return self._logger
+
@property
def guid(self):
return self._guid
def ec(self):
return self._ec()
- def connect(self, guid):
- if (self._validate_connection(guid)):
- self._connections.add(guid)
-
@property
def connections(self):
return self._connections
- def discover(self, filters):
- pass
+ @property
+ def conditons(self):
+ return self._conditions
- def provision(self, filters):
- pass
+ @property
+ def start_time(self):
+ """ timestamp with """
+ return self._start_time
- def set(self, name, value):
- attr = self._attrs[name]
- attr.value = value
+ @property
+ def stop_time(self):
+ return self._stop_time
- def get(self, name):
- attr = self._attrs[name]
- return attr.value
+ @property
+ def state(self):
+ return self._state
- def start_after(self, time, after_status, guid):
- pass
+ def connect(self, guid):
+ if (self._validate_connection(guid)):
+ self._connections.add(guid)
- def stop_after(self, time, after_status, guid):
+ def discover(self, filters = None):
pass
- def set_after(self, name, value, time, after_status, guid):
+ def provision(self, filters = None):
pass
def start(self):
- pass
+ if not self._state in [ResourceState.DEPLOYED, ResourceState.STOPPED]:
+ self.logger.error("Wrong state %s for start" % self.state)
+
+ self._start_time = strfnow()
+ self._state = ResourceState.STARTED
def stop(self):
- pass
+ if not self._state in [ResourceState.STARTED]:
+ self.logger.error("Wrong state %s for stop" % self.state)
- def deploy(self, group = None):
- pass
+ self._stop_time = strfnow()
+ self._state = ResourceState.STOPPED
+
+ def set(self, name, value):
+ attr = self._attrs[name]
+ attr.value = value
+
+ def get(self, name):
+ attr = self._attrs[name]
+ return attr.value
+
+ def register_condition(self, action, group, state,
+ time = None):
+ if action not in self.conditions:
+ self._conditions[action] = set()
+
+ self.conditions.get(action).add((group, state, time))
+
+ def _needs_reschedule(self, group, state, time):
+ reschedule = False
+ delay = _reschedule_delay
+
+ # check state and time elapsed on all RMs
+ for guid in group:
+ rm = self.ec.get_resource(guid)
+ # If the RMs is lower than the requested state we must
+ # reschedule (e.g. if RM is DEPLOYED but we required STARTED)
+ if rm.state < state:
+ reschedule = True
+ break
+
+ if time:
+ if state == ResourceAction.START:
+ t = rm.start_time
+ elif state == ResourceAction.STOP:
+ t = rm.stop_time
+ else:
+ # Only keep time information for START and STOP
+ break
+
+ delay = strfdiff(t, strnow())
+ if delay < time:
+ reschedule = True
+ break
+
+ return reschedule, delay
+
+ def set_with_conditions(self, name, value, group, state, time):
+ reschedule = False
+ delay = _reschedule_delay
+
+ ## evaluate if set conditions are met
+
+ # only can set with conditions after the RM is started
+ if self.status != ResourceStatus.STARTED:
+ reschedule = True
+ else:
+ reschedule, delay = self._needs_reschedule(group, state, time)
+
+ if reschedule:
+ callback = functools.partial(self.set_with_conditions,
+ name, value, group, state, time)
+ self.ec.schedule(delay, callback)
+ else:
+ self.set(name, value)
+
+ def start_with_conditions(self):
+ reschedule = False
+ delay = _reschedule_delay
+
+ ## evaluate if set conditions are met
+
+ # only can start when RM is either STOPPED or DEPLOYED
+ if self.status not in [ResourceStatus.STOPPED, ResourceStatus.DEPLOYED]:
+ reschedule = True
+ else:
+ for action, (group, state, time) in self.conditions.iteritems():
+ if action == ResourceAction.START:
+ reschedule, delay = self._needs_reschedule(group, state, time)
+ if reschedule:
+ break
+
+ if reschedule:
+ callback = functools.partial(self.start_with_conditions,
+ group, state, time)
+ self.ec.schedule(delay, callback)
+ else:
+ self.start()
+
+ def stop_with_conditions(self):
+ reschedule = False
+ delay = _reschedule_delay
+
+ ## evaluate if set conditions are met
+
+ # only can start when RM is either STOPPED or DEPLOYED
+ if self.status != ResourceStatus.STARTED:
+ reschedule = True
+ else:
+ for action, (group, state, time) in self.conditions.iteritems():
+ if action == ResourceAction.STOP:
+ reschedule, delay = self._needs_reschedule(group, state, time)
+ if reschedule:
+ break
+
+ if reschedule:
+ callback = functools.partial(self.stop_with_conditions,
+ group, state, time)
+ self.ec.schedule(delay, callback)
+ else:
+ self.stop()
+
+ def deploy(self):
+ self.discover()
+ self.provision()
+ self._state = ResourceState.DEPLOYED
def release(self):
- pass
+ self._state = ResourceState.RELEASED
def _validate_connection(self, guid):
# TODO: Validate!
-from neco.execution import tags
-from neco.execution.resource import ResourceManager
+from neco.execution.attribute import Attribute, Flags
+from neco.execution.resource import ResourceManager, clsinit, ResourceState
+from neco.resources.linux.ssh_api import SSHApiFactory
-import cStringIO
import logging
-class Application(ResourceManager):
- def __init__(self, box, ec):
- super(Application, self).__init__(box, ec)
- self.command = None
- self.pid = None
- self.ppid = None
- self.stdin = None
- self.del_app_home = True
- self.env = None
-
- self.app_home = "${HOME}/app-%s" % self.box.guid
+@clsinit
+class LinuxApplication(ResourceManager):
+ _rtype = "LinuxApplication"
+
+ @classmethod
+ def _register_attributes(cls):
+ command = Attribute("command", "Command to execute",
+ flags = Flags.ReadOnly)
+ env = Attribute("env", "Environment variables string for command execution",
+ flags = Flags.ReadOnly)
+ sudo = Attribute("sudo", "Run with root privileges",
+ flags = Flags.ReadOnly)
+ depends = Attribute("depends",
+ "Space-separated list of packages required to run the application",
+ flags = Flags.ReadOnly)
+ sources = Attribute("sources",
+ "Space-separated list of regular files to be deployed in the working "
+ "path prior to building. Archives won't be expanded automatically.",
+ flags = Flags.ReadOnly)
+ build = Attribute("build",
+ "Build commands to execute after deploying the sources. "
+ "Sources will be in the ${SOURCES} folder. "
+ "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
+ "Try to make the commands return with a nonzero exit code on error.\n"
+ "Also, do not install any programs here, use the 'install' attribute. This will "
+ "help keep the built files constrained to the build folder (which may "
+ "not be the home folder), and will result in faster deployment. Also, "
+ "make sure to clean up temporary files, to reduce bandwidth usage between "
+ "nodes when transferring built packages.",
+ flags = Flags.ReadOnly)
+ install = Attribute("install",
+ "Commands to transfer built files to their final destinations. "
+ "Sources will be in the initial working folder, and a special "
+ "tag ${SOURCES} can be used to reference the experiment's "
+ "home folder (where the application commands will run).\n"
+ "ALL sources and targets needed for execution must be copied there, "
+ "if building has been enabled.\n"
+ "That is, 'slave' nodes will not automatically get any source files. "
+ "'slave' nodes don't get build dependencies either, so if you need "
+ "make and other tools to install, be sure to provide them as "
+ "actual dependencies instead.",
+ flags = Flags.ReadOnly)
+ stdin = Attribute("stdin", "Standard input", flags = Flags.ReadOnly)
+ stdout = Attribute("stdout", "Standard output", flags = Flags.ReadOnly)
+ stderr = Attribute("stderr", "Standard error", flags = Flags.ReadOnly)
+
+ tear_down = Attribute("tearDown", "Bash script to be executed before
+ releasing the resource", flags = Flags.ReadOnly)
+
+ cls._register_attribute(command)
+ cls._register_attribute(env)
+ cls._register_attribute(sudo)
+ cls._register_attribute(depends)
+ cls._register_attribute(sources)
+ cls._register_attribute(build)
+ cls._register_attribute(install)
+ cls._register_attribute(stdin)
+ cls._register_attribute(stdout)
+ cls._register_attribute(stderr)
+ cls._register_attribute(tear_down)
+
+ def __init__(self, ec, guid):
+ super(LinuxApplication, self).__init__(ec, guid)
+ self._pid = None
+ self._ppid = None
+ self._home = "${HOME}/app-%s" % self.box.guid
self._node = None
-
- # Logging
- loglevel = "debug"
- self._logger = logging.getLogger("neco.resources.base.Application.%s" % self.guid)
- self._logger.setLevel(getattr(logging, loglevel.upper()))
+
+ self._logger = logging.getLogger("neco.linux.Application.%d" % guid)
+
+ @property
+ def api(self):
+ return self.node.api
@property
def node(self):
- if self._node:
- return self._node
+ self._node
- # XXX: What if it is connected to more than one node?
- resources = self.find_resources(exact_tags = [tags.NODE])
- self._node = resources[0] is len(resources) == 1 else None
- return self._node
+ @property
+ def home(self):
+ return self._home
- def make_app_home(self):
- self.node.mkdir(self.app_home)
+ @property
+ def pid(self):
+ return self._pid
- if self.stdin:
- self.node.upload(self.stdin, os.path.join(self.app_home, 'stdin'))
+ @property
+ def ppid(self):
+ return self._ppid
- def cleanup(self):
- self.kill()
+ def provision(self, filters = None):
+ # clean home
+ # upload
+ # build
+ # Install stuff!!
+ pass
- def run(self):
- dst = os.path.join(self.app_home, "app.sh")
+ def start(self):
+ dst = os.path.join(self.home, "app.sh")
# Create shell script with the command
# This way, complex commands and scripts can be ran seamlessly
# sync files
cmd = ""
- if self.env:
+ env = self.get("env")
+ if env:
for envkey, envvals in env.iteritems():
for envval in envvals:
cmd += 'export %s=%s\n' % (envkey, envval)
- cmd += self.command
- self.node.upload(cmd, dst)
+ cmd += self.get("command")
+ self.api.upload(cmd, dst)
command = 'bash ./app.sh'
- stdin = 'stdin' if self.stdin else None
- self.node.run(command, self.app_home, stdin = stdin)
- self.pid, self.ppid = self.node.checkpid(self.app_home)
+ stdin = 'stdin' if self.get("stdin") else None
+ self.api.run(command, self.home, stdin = stdin)
+ self._pid, self._ppid = self.api.checkpid(self.app_home)
+
+ def stop(self):
+ self._state = ResourceState.STOPPED
+
+ def release(self):
+ tear_down = self.get("tearDown")
+ if tear_down:
+ self.api.execute(tear_down)
+
+ return self.api.kill(self.pid, self.ppid)
def status(self):
- return self.node.status(self.pid, self.ppid)
+ return self.api.status(self.pid, self.ppid)
+
+ def make_app_home(self):
+ self.api.mkdir(self.home)
+
+ stdin = self.get("stdin")
+ if stdin:
+ self.api.upload(stdin, os.path.join(self.home, 'stdin'))
+
+ def _validate_connection(self, guid):
+ # TODO: Validate!
+ return True
+ # XXX: What if it is connected to more than one node?
+ resources = self.find_resources(exact_tags = [tags.NODE])
+ self._node = resources[0] is len(resources) == 1 else None
+ return self._node
+
- def kill(self):
- return self.node.kill(self.pid, self.ppid)
-from neco.execution.resource import ResourceManager, clsinit
from neco.execution.attribute import Attribute, Flags
+from neco.execution.resource import ResourceManager, clsinit, ResourceState
+from neco.resources.linux.ssh_api import SSHApiFactory
+
+import logging
@clsinit
class LinuxNode(ResourceManager):
hostname = Attribute("hostname", "Hostname of the machine")
username = Attribute("username", "Local account username",
flags = Flags.Credential)
- password = Attribute("pasword", "Local account password",
+ identity = Attribute("identity", "SSH identity file",
flags = Flags.Credential)
+ clean_home = Attribute("cleanHome", "Remove all files and directories
+ from home folder before starting experiment",
+ flags = Flags.ReadOnly)
+ clean_processes = Attribute("cleanProcesses",
+ "Kill all running processes before starting experiment",
+ flags = Flags.ReadOnly)
+ tear_down = Attribute("tearDown", "Bash script to be executed before
+ releasing the resource", flags = Flags.ReadOnly)
cls._register_attribute(hostname)
cls._register_attribute(username)
- cls._register_attribute(password)
+ cls._register_attribute(identity)
+ cls._register_attribute(clean_home)
+ cls._register_attribute(clean_processes)
+ cls._register_attribute(tear_down)
def __init__(self, ec, guid):
super(LinuxNode, self).__init__(ec, guid)
self._logger = logging.getLogger("neco.linux.Node.%d" % guid)
- #elf._logger.setLevel(neco.LOGLEVEL)
-
- def deploy(self):
- pass
- def discover(self, filters):
- pass
+ def provision(self, filters = None):
+ if not self.api.is_alive():
+ self._state = ResourceState.FAILED
+ self.logger.error("Deploy failed. Unresponsive node")
+ return
+
+ if self.get("cleanProcesses"):
+ self._clean_processes()
- def provision(self, filters):
- pass
+ if self.get("cleanHome"):
+ # self._clean_home() -> this is dangerous
+ pass
- def start(self):
- pass
-
- def stop(self):
- pass
-
- def deploy(self, group = None):
- pass
+ def deploy(self):
+ self.provision()
+ super(LinuxNode, self).deploy()
def release(self):
- pass
+ tear_down = self.get("tearDown")
+ if tear_down:
+ self.api.execute(tear_down)
+
+ super(LinuxNode, self).release()
def _validate_connection(self, guid):
# TODO: Validate!
return True
+ @property
+ def api(self):
+ host = self.get("host")
+ user = self.get("user")
+ identity = self.get("identity")
+ return SSHApiFactory.get_api(host, user, identity)
+
+ def _clean_processes(self):
+ hostname = self.get("hostname")
+ self.logger.info("Cleaning up processes on %s", hostname)
+
+ cmds = [
+ "sudo -S killall python tcpdump || /bin/true ; "
+ "sudo -S killall python tcpdump || /bin/true ; "
+ "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ",
+ "sudo -S killall -u root || /bin/true ",
+ "sudo -S killall -u root || /bin/true ",
+ ]
+
+ api = self.api
+ for cmd in cmds:
+ out, err = api.execute(cmd)
+ if err:
+ self.logger.error(err)
+
+ def _clean_home(self):
+ hostname = self.get("hostname")
+ self.logger.info("Cleaning up home on %s", hostname)
+
+ cmds = [
+ "find . -maxdepth 1 ! -name '.bash*' ! -name '.' -execdir rm -rf {} + "
+ ]
+
+ api = self.api
+ for cmd in cmds:
+ out, err = api.execute(cmd)
+ if err:
+ self.logger.error(err)
_reabs = re.compile("^\d{20}$")
_rerel = re.compile("^(?P<time>\d+(.\d+)?)(?P<units>h|m|s|ms|us)$")
-# Work around to fix "ImportError: Failed to import _strptime because the import lockis held by another thread."
+# Work around to fix "ImportError: Failed to import _strptime because the import lock is held by another thread."
datetime.datetime.strptime("20120807124732894211", _strf)
def strfnow():
class EC(object):
pass
-
-class ResourceTestCase(unittest.TestCase):
+class ResourceFactoryTestCase(unittest.TestCase):
def test_add_resource_factory(self):
ResourceFactory.register_type(MyResource)
ResourceFactory.register_type(AnotherResource)
self.assertEquals(AnotherResource.rtype(), "AnotherResource")
self.assertEquals(len(AnotherResource._attributes), 0)
- #self.assertEquals(OmfNode.rtype(), "OmfNode")
- #self.assertEquals(len(OmfNode._attributes), 0)
-
self.assertEquals(len(ResourceFactory.resource_types()), 2)
+# TODO:!!!
+class ResourceManagerTestCase(unittest.TestCase):
+ def test_start_with_condition(self):
+ pass
+
+ def test_stop_with_condition(self):
+ pass
+
+ def test_set_with_condition(self):
+ pass
+
+
if __name__ == '__main__':
unittest.main()