From: Alina Quereilhac Date: Thu, 28 Mar 2013 13:56:40 +0000 (+0100) Subject: Added scheduler and task processing thread to ec. Completed deploy and release methods. X-Git-Tag: nepi-3.0.0~122^2~22 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=f1cb97fc50045f85a86bf6e16e0bd58b7a891a9d;p=nepi.git Added scheduler and task processing thread to ec. Completed deploy and release methods. --- diff --git a/src/neco/execution/ec.py b/src/neco/execution/ec.py index cb1df2bd..429f5cab 100644 --- a/src/neco/execution/ec.py +++ b/src/neco/execution/ec.py @@ -4,7 +4,10 @@ import sys import time from neco.util import guid +from neco.util.timefuncs import strfnow, strfdiff, strfvalid from neco.execution.resource import ResourceFactory +from neco.execution.scheduler import HeapScheduler, Task +from neco.util.parallel import ParallelRun class ExperimentController(object): def __init__(self, root_dir = "/tmp", loglevel = 'error'): @@ -18,9 +21,15 @@ class ExperimentController(object): # Resource managers self._resources = dict() - # Groups of resources - self._groups = dict() - + # Scheduler + self._scheduler = HeapScheduler() + + # Event processing thread + self._stop = False + self._cond = threading.Condition() + self._thread = threading.Thread(target = self._process) + self._thread.start() + # Logging self._logger = logging.getLogger("neco.execution.ec") self._logger.setLevel(getattr(logging, loglevel.upper())) @@ -28,23 +37,16 @@ class ExperimentController(object): def resource(self, guid): return self._resources.get(guid) + @property def resources(self): return self._resources.keys() - def release(self, group = None): - # TODO - pass - - def deploy(self, group = None): - # TODO - pass - def register_resource(self, rtype, guid = None, creds = None): # Get next available guid guid = self._guid_generator.next(guid) # Instantiate RM - rm = ResourceFactory.create(rtype, self, guid,creds) + rm = ResourceFactory.create(rtype, self, guid, creds) # Store RM self._resources[guid] = rm @@ -66,10 +68,6 @@ class ExperimentController(object): rm1.connect(guid2) rm2.connect(guid1) - def register_group(self, guids, gguid = None): - gguid = self._guid_generator.next(gguid) - self._groups[gguid] = guids - def discover_resource(self, guid, filters): rm = self._resources[guid] return rm.discover(filters) @@ -78,36 +76,36 @@ class ExperimentController(object): rm = self._resources[guid] return rm.provision(filters) - def register_start(self, gguid1, time, after_status, gguid2): - if isinstance(gguid1, int): - gguid1 = list[gguid1] - if isinstance(gguid2, int): - gguid2 = list[gguid2] + def register_start(self, group1, time, after_status, group2): + if isinstance(group1, int): + group1 = list[group1] + if isinstance(group2, int): + group2 = list[group2] - for guid1 in gguid1: - for guid2 in gguid2: + for guid1 in group1: + for guid2 in group2: rm = self._resources(guid1) rm.start_after(time, after_status, guid2) - def register_stop(self, gguid1, time, after_status, gguid2): - if isinstance(gguid1, int): - gguid1 = list[gguid1] - if isinstance(gguid2, int): - gguid2 = list[gguid2] + def register_stop(self, group1, time, after_status, group2): + if isinstance(group1, int): + group1 = list[group1] + if isinstance(group2, int): + group2 = list[group2] - for guid1 in gguid1: - for guid2 in gguid2: + for guid1 in group1: + for guid2 in group2: rm = self._resources(guid1) rm.stop_after(time, after_status, guid2) - def register_set(self, name, value, gguid1, time, after_status, gguid2): - if isinstance(gguid1, int): - gguid1 = list[gguid1] + def register_set(self, name, value, group1, time, after_status, group2): + if isinstance(group1, int): + group1 = list[group1] if isinstance(group2, int): - gguid2 = list[gguid2] + group2 = list[group2] - for guid1 in gguid1: - for guid2 in gguid2: + for guid1 in group1: + for guid2 in group2: rm = self._resources(guid1) rm.set_after(name, value, time, after_status, guid2) @@ -127,3 +125,101 @@ class ExperimentController(object): rm = self._resources(guid) return rm.stop() + def deploy(self, group = None, start_when_all_ready = True): + if not group: + group = self.resources + + threads = [] + for guid in group: + rm = self._resources(guid1) + + kwargs = {'target': rm.deploy} + if start_when_all_ready: + towait = list(group) + towait.remove(guid) + kwargs['args'] = towait + + thread = threading.Thread(kwargs) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + def release(self, group = None): + if not group: + group = self.resources + + threads = [] + for guid in group: + rm = self._resources(guid1) + thread = threading.Thread(target=rm.release) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + def shutdown(self): + self._stop = False + self.release() + + def schedule(self, date, callback): + """ + date string containing execution time for the task. + It can be expressed as an absolute time, using + timestamp format, or as a relative time matching + ^\d+.\d+(h|m|s|ms|us)$ + + callback code to be executed for the task. Must be a + Python function, and receives args and kwargs + as arguments. + """ + timestamp = strfvalid(date) + + task = Task(timestamp, callback) + task = self._scheduler.schedule(task) + + # Notify condition to wake up the processing thread + self._cond.acquire() + self._cond.notify() + self._cond.release() + return task + + def _process(self): + runner = ParallelRun(maxthreads = 50) + runner.start() + + try: + while not self._stop: + self._cond.acquire() + task = self._scheduler.next() + self._cond.release() + + if not task: + # It there are not tasks in the tasks queue we need to + # wait until a call to schedule wakes us up + self._cond.acquire() + self._cond.wait() + self._cond.release() + else: + # If the task timestamp is in the future the thread needs to wait + # until time elapse or until another task is scheduled + now = strfnow() + if now < task.timestamp: + # Calculate time difference in seconds + timeout = strfdiff(task.timestamp, now) + # Re-schedule task with the same timestamp + self._scheduler.schedule(task) + # Sleep until timeout or until a new task awakes the condition + self._cond.acquire() + self._cond.wait(timeout) + self._cond.release() + else: + # Process tasks in parallel + runner.put(task.callback) + except: + import traceback + err = traceback.format_exc() + self._logger.error("Error while processing tasks in the EC: %s" % err) + diff --git a/src/neco/execution/resource.py b/src/neco/execution/resource.py index d3a1dc44..ebc537ce 100644 --- a/src/neco/execution/resource.py +++ b/src/neco/execution/resource.py @@ -8,7 +8,7 @@ def clsinit(cls): # Decorator to invoke class initialization method @clsinit -class Resource(object): +class ResourceManager(object): _rtype = "Resource" _filters = None _attributes = None @@ -116,6 +116,12 @@ class Resource(object): def stop(self): pass + def deploy(self, group = None): + pass + + def release(self): + pass + def _validate_connection(self, guid): # TODO: Validate! return True diff --git a/src/neco/execution/scheduler.py b/src/neco/execution/scheduler.py index 202b711a..2786adce 100644 --- a/src/neco/execution/scheduler.py +++ b/src/neco/execution/scheduler.py @@ -1,6 +1,12 @@ import itertools import heapq +class Task(object): + def __init__(self, timestamp, callback): + self.id = None + self.timestamp = timestamp + self.callback = callback + class HeapScheduler(object): """ This class is thread safe. All calls to C Extensions are made atomic by the GIL in the CPython implementation. diff --git a/src/neco/execution/tasks.py b/src/neco/execution/tasks.py deleted file mode 100644 index ee24fe80..00000000 --- a/src/neco/execution/tasks.py +++ /dev/null @@ -1,18 +0,0 @@ - -class TaskStatus: - NEW = 0 - RETRY = 1 - SUCCESS = 2 - FAIL = 3 - RECYCLE = 4 - -class Task(object): - def __init__(self, timestamp, callback, args, kwargs): - self.id = None - self.timestamp = timestamp - self.callback = callback - self.args = args - self.kwargs = kwargs - self.result = None - self.status = TaskStatus.NEW - diff --git a/src/neco/resources/linux/application.py b/src/neco/resources/linux/application.py index 8c91ab97..1ffcdf57 100644 --- a/src/neco/resources/linux/application.py +++ b/src/neco/resources/linux/application.py @@ -1,10 +1,10 @@ from neco.execution import tags -from neco.execution.resource import Resource +from neco.execution.resource import ResourceManager import cStringIO import logging -class Application(Resource): +class Application(ResourceManager): def __init__(self, box, ec): super(Application, self).__init__(box, ec) self.command = None diff --git a/src/neco/resources/linux/node.py b/src/neco/resources/linux/node.py index 81fed619..feaad462 100644 --- a/src/neco/resources/linux/node.py +++ b/src/neco/resources/linux/node.py @@ -1,4 +1,4 @@ -from neco.execution.resource import Resource +from neco.execution.resource import ResourceManager from neco.util.sshfuncs import eintr_retry, rexec, rcopy, \ rspawn, rcheck_pid, rstatus, rkill, make_control_path, RUNNING @@ -7,7 +7,7 @@ import logging import os.path import subprocess -class LinuxNode(Resource): +class LinuxNode(ResourceManager): def __init__(self, ec, guid): super(LinuxNode, self).__init__(ec, guid) self.ip = None diff --git a/src/neco/resources/omf/omf_application.py b/src/neco/resources/omf/omf_application.py index 8b6625af..19ef22d9 100644 --- a/src/neco/resources/omf/omf_application.py +++ b/src/neco/resources/omf/omf_application.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -from neco.execution.resource import Resource, clsinit +from neco.execution.resource import ResourceManager, clsinit from neco.execution.attribute import Attribute from neco.resources.omf.omf_api import OMFAPIFactory @@ -7,7 +7,7 @@ import neco import logging @clsinit -class OMFApplication(Resource): +class OMFApplication(ResourceManager): _rtype = "OMFApplication" _authorized_connections = ["OMFNode"] diff --git a/src/neco/resources/omf/omf_channel.py b/src/neco/resources/omf/omf_channel.py index bd7f0f11..026b4aa5 100644 --- a/src/neco/resources/omf/omf_channel.py +++ b/src/neco/resources/omf/omf_channel.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -from neco.execution.resource import Resource, clsinit +from neco.execution.resource import ResourceManager, clsinit from neco.execution.attribute import Attribute from neco.resources.omf.omf_api import OMFAPIFactory @@ -8,7 +8,7 @@ import neco import logging @clsinit -class OMFChannel(Resource): +class OMFChannel(ResourceManager): _rtype = "OMFChannel" _authorized_connections = ["OMFWifiInterface"] diff --git a/src/neco/resources/omf/omf_interface.py b/src/neco/resources/omf/omf_interface.py index 07d64460..40a650cf 100644 --- a/src/neco/resources/omf/omf_interface.py +++ b/src/neco/resources/omf/omf_interface.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -from neco.execution.resource import Resource, clsinit +from neco.execution.resource import ResourceManager, clsinit from neco.execution.attribute import Attribute from neco.resources.omf.omf_api import OMFAPIFactory @@ -8,7 +8,7 @@ import neco import logging @clsinit -class OMFWifiInterface(Resource): +class OMFWifiInterface(ResourceManager): _rtype = "OMFWifiInterface" _authorized_connections = ["OMFNode" , "OMFChannel"] diff --git a/src/neco/resources/omf/omf_node.py b/src/neco/resources/omf/omf_node.py index 1a98b939..9bcffe99 100644 --- a/src/neco/resources/omf/omf_node.py +++ b/src/neco/resources/omf/omf_node.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -from neco.execution.resource import Resource, clsinit +from neco.execution.resource import ResourceManager, clsinit from neco.execution.attribute import Attribute from neco.resources.omf.omf_api import OMFAPIFactory @@ -8,7 +8,7 @@ import neco import logging @clsinit -class OMFNode(Resource): +class OMFNode(ResourceManager): _rtype = "OMFNode" _authorized_connections = ["OMFApplication" , "OMFWifiInterface"] diff --git a/src/neco/resources/omf/xx_omf_resource.py b/src/neco/resources/omf/xx_omf_resource.py index 784513e2..978ac198 100644 --- a/src/neco/resources/omf/xx_omf_resource.py +++ b/src/neco/resources/omf/xx_omf_resource.py @@ -1,11 +1,11 @@ #!/usr/bin/env python -from neco.execution.resource import Resource, clsinit +from neco.execution.resource import ResourceManager, clsinit from neco.execution.attribute import Attribute from neco.resources.omf.omf_api import OMFAPIFactory @clsinit -class OMFResource(Resource): +class OMFResource(ResourceManager): _rtype = "OMFResource" @classmethod diff --git a/test/execution/resource.py b/test/execution/resource.py index 36165546..202a148e 100755 --- a/test/execution/resource.py +++ b/test/execution/resource.py @@ -1,11 +1,11 @@ #!/usr/bin/env python -from neco.execution.resource import Resource, ResourceFactory, clsinit +from neco.execution.resource import ResourceManager, ResourceFactory, clsinit from neco.execution.attribute import Attribute import unittest @clsinit -class MyResource(Resource): +class MyResource(ResourceManager): _rtype = "MyResource" @classmethod @@ -17,7 +17,7 @@ class MyResource(Resource): super(MyResource, self).__init__(ec, guid) @clsinit -class AnotherResource(Resource): +class AnotherResource(ResourceManager): _rtype = "AnotherResource" def __init__(self, ec, guid): diff --git a/test/resources/omf/omf_vlc_exp.py b/test/resources/omf/omf_vlc_exp.py index 799046fc..316e584f 100755 --- a/test/resources/omf/omf_vlc_exp.py +++ b/test/resources/omf/omf_vlc_exp.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -from neco.execution.resource import Resource, ResourceFactory +from neco.execution.resource import ResourceFactory from neco.execution.ec import ExperimentController from neco.resources.omf.omf_node import OMFNode