Added scheduler and task processing thread to ec. Completed deploy and release methods.
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Thu, 28 Mar 2013 13:56:40 +0000 (14:56 +0100)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Thu, 28 Mar 2013 13:56:40 +0000 (14:56 +0100)
13 files changed:
src/neco/execution/ec.py
src/neco/execution/resource.py
src/neco/execution/scheduler.py
src/neco/execution/tasks.py [deleted file]
src/neco/resources/linux/application.py
src/neco/resources/linux/node.py
src/neco/resources/omf/omf_application.py
src/neco/resources/omf/omf_channel.py
src/neco/resources/omf/omf_interface.py
src/neco/resources/omf/omf_node.py
src/neco/resources/omf/xx_omf_resource.py
test/execution/resource.py
test/resources/omf/omf_vlc_exp.py

index cb1df2b..429f5ca 100644 (file)
@@ -4,7 +4,10 @@ import sys
 import time
 
 from neco.util import guid
+from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
 from neco.execution.resource import ResourceFactory
+from neco.execution.scheduler import HeapScheduler, Task
+from neco.util.parallel import ParallelRun
 
 class ExperimentController(object):
     def __init__(self, root_dir = "/tmp", loglevel = 'error'):
@@ -18,9 +21,15 @@ class ExperimentController(object):
         # Resource managers
         self._resources = dict()
 
-        # Groups of resources
-        self._groups = dict()
-       
+        # Scheduler
+        self._scheduler = HeapScheduler()
+
+        # Event processing thread
+        self._stop = False
+        self._cond = threading.Condition()
+        self._thread = threading.Thread(target = self._process)
+        self._thread.start()
+
         # Logging
         self._logger = logging.getLogger("neco.execution.ec")
         self._logger.setLevel(getattr(logging, loglevel.upper()))
@@ -28,23 +37,16 @@ class ExperimentController(object):
     def resource(self, guid):
         return self._resources.get(guid)
 
+    @property
     def resources(self):
         return self._resources.keys()
 
-    def release(self, group = None):
-        # TODO
-        pass
-
-    def deploy(self, group = None):
-        # TODO
-        pass
-
     def register_resource(self, rtype, guid = None, creds = None):
         # Get next available guid
         guid = self._guid_generator.next(guid)
         
         # Instantiate RM
-        rm = ResourceFactory.create(rtype, self, guid,creds)
+        rm = ResourceFactory.create(rtype, self, guid, creds)
 
         # Store RM
         self._resources[guid] = rm
@@ -66,10 +68,6 @@ class ExperimentController(object):
         rm1.connect(guid2)
         rm2.connect(guid1)
 
-    def register_group(self, guids, gguid = None):
-        gguid = self._guid_generator.next(gguid)
-        self._groups[gguid] = guids
-
     def discover_resource(self, guid, filters):
         rm = self._resources[guid]
         return rm.discover(filters)
@@ -78,36 +76,36 @@ class ExperimentController(object):
         rm = self._resources[guid]
         return rm.provision(filters)
 
-    def register_start(self, gguid1, time, after_status, gguid2):
-        if isinstance(gguid1, int):
-            gguid1 = list[gguid1]
-        if isinstance(gguid2, int):
-            gguid2 = list[gguid2]
+    def register_start(self, group1, time, after_status, group2):
+        if isinstance(group1, int):
+            group1 = list[group1]
+        if isinstance(group2, int):
+            group2 = list[group2]
 
-        for guid1 in gguid1:
-            for guid2 in gguid2:
+        for guid1 in group1:
+            for guid2 in group2:
                 rm = self._resources(guid1)
                 rm.start_after(time, after_status, guid2)
 
-    def register_stop(self, gguid1, time, after_status, gguid2):
-        if isinstance(gguid1, int):
-            gguid1 = list[gguid1]
-        if isinstance(gguid2, int):
-            gguid2 = list[gguid2]
+    def register_stop(self, group1, time, after_status, group2):
+        if isinstance(group1, int):
+            group1 = list[group1]
+        if isinstance(group2, int):
+            group2 = list[group2]
 
-        for guid1 in gguid1:
-            for guid2 in gguid2:
+        for guid1 in group1:
+            for guid2 in group2:
                 rm = self._resources(guid1)
                 rm.stop_after(time, after_status, guid2)
 
-    def register_set(self, name, value, gguid1, time, after_status, gguid2):
-        if isinstance(gguid1, int):
-            gguid1 = list[gguid1]
+    def register_set(self, name, value, group1, time, after_status, group2):
+        if isinstance(group1, int):
+            group1 = list[group1]
         if isinstance(group2, int):
-            gguid2 = list[gguid2]
+            group2 = list[group2]
 
-        for guid1 in gguid1:
-            for guid2 in gguid2:
+        for guid1 in group1:
+            for guid2 in group2:
                 rm = self._resources(guid1)
                 rm.set_after(name, value, time, after_status, guid2)
 
@@ -127,3 +125,101 @@ class ExperimentController(object):
         rm = self._resources(guid)
         return rm.stop()
 
+    def deploy(self, group = None, start_when_all_ready = True):
+        if not group:
+            group = self.resources
+
+        threads = []
+        for guid in group:
+            rm = self._resources(guid1)
+
+            kwargs = {'target': rm.deploy}
+            if start_when_all_ready:
+                towait = list(group)
+                towait.remove(guid)
+                kwargs['args'] = towait
+
+            thread = threading.Thread(kwargs)
+            threads.append(thread)
+            thread.start()
+
+        for thread in threads:
+            thread.join()
+
+    def release(self, group = None):
+        if not group:
+            group = self.resources
+
+        threads = []
+        for guid in group:
+            rm = self._resources(guid1)
+            thread = threading.Thread(target=rm.release)
+            threads.append(thread)
+            thread.start()
+
+        for thread in threads:
+            thread.join()
+
+    def shutdown(self):
+        self._stop = False
+        self.release()
+
+    def schedule(self, date, callback):
+        """
+            date    string containing execution time for the task.
+                    It can be expressed as an absolute time, using
+                    timestamp format, or as a relative time matching
+                    ^\d+.\d+(h|m|s|ms|us)$
+
+            callback    code to be executed for the task. Must be a
+                        Python function, and receives args and kwargs
+                        as arguments.
+        """
+        timestamp = strfvalid(date)
+        
+        task = Task(timestamp, callback)
+        task = self._scheduler.schedule(task)
+
+        # Notify condition to wake up the processing thread
+        self._cond.acquire()
+        self._cond.notify()
+        self._cond.release()
+        return task
+     
+    def _process(self):
+        runner = ParallelRun(maxthreads = 50)
+        runner.start()
+
+        try:
+            while not self._stop:
+                self._cond.acquire()
+                task = self._scheduler.next()
+                self._cond.release()
+
+                if not task:
+                    # It there are not tasks in the tasks queue we need to 
+                    # wait until a call to schedule wakes us up
+                    self._cond.acquire()
+                    self._cond.wait()
+                    self._cond.release()
+                else: 
+                    # If the task timestamp is in the future the thread needs to wait
+                    # until time elapse or until another task is scheduled
+                    now = strfnow()
+                    if now < task.timestamp:
+                        # Calculate time difference in seconds
+                        timeout = strfdiff(task.timestamp, now)
+                        # Re-schedule task with the same timestamp
+                        self._scheduler.schedule(task)
+                        # Sleep until timeout or until a new task awakes the condition
+                        self._cond.acquire()
+                        self._cond.wait(timeout)
+                        self._cond.release()
+                    else:
+                        # Process tasks in parallel
+                        runner.put(task.callback)
+        except:  
+            import traceback
+            err = traceback.format_exc()
+            self._logger.error("Error while processing tasks in the EC: %s" % err)
index d3a1dc4..ebc537c 100644 (file)
@@ -8,7 +8,7 @@ def clsinit(cls):
 
 # Decorator to invoke class initialization method
 @clsinit
-class Resource(object):
+class ResourceManager(object):
     _rtype = "Resource"
     _filters = None
     _attributes = None
@@ -116,6 +116,12 @@ class Resource(object):
     def stop(self):
         pass
 
+    def deploy(self, group = None):
+        pass
+
+    def release(self):
+        pass
+
     def _validate_connection(self, guid):
         # TODO: Validate!
         return True
index 202b711..2786adc 100644 (file)
@@ -1,6 +1,12 @@
 import itertools
 import heapq
 
+class Task(object):
+    def __init__(self, timestamp, callback):
+        self.id = None 
+        self.timestamp = timestamp
+        self.callback = callback
+
 class HeapScheduler(object):
     """ This class is thread safe.
     All calls to C Extensions are made atomic by the GIL in the CPython implementation.
diff --git a/src/neco/execution/tasks.py b/src/neco/execution/tasks.py
deleted file mode 100644 (file)
index ee24fe8..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-
-class TaskStatus:
-    NEW = 0
-    RETRY = 1
-    SUCCESS = 2
-    FAIL = 3
-    RECYCLE = 4
-
-class Task(object):
-    def __init__(self, timestamp, callback, args, kwargs):
-        self.id = None 
-        self.timestamp = timestamp
-        self.callback = callback
-        self.args = args
-        self.kwargs = kwargs
-        self.result = None
-        self.status = TaskStatus.NEW
-
index 8c91ab9..1ffcdf5 100644 (file)
@@ -1,10 +1,10 @@
 from neco.execution import tags
-from neco.execution.resource import Resource
+from neco.execution.resource import ResourceManager
 
 import cStringIO
 import logging
 
-class Application(Resource):
+class Application(ResourceManager):
     def __init__(self, box, ec):
         super(Application, self).__init__(box, ec)
         self.command = None
index 81fed61..feaad46 100644 (file)
@@ -1,4 +1,4 @@
-from neco.execution.resource import Resource
+from neco.execution.resource import ResourceManager
 from neco.util.sshfuncs import eintr_retry, rexec, rcopy, \
         rspawn, rcheck_pid, rstatus, rkill, make_control_path, RUNNING 
 
@@ -7,7 +7,7 @@ import logging
 import os.path
 import subprocess
 
-class LinuxNode(Resource):
+class LinuxNode(ResourceManager):
     def __init__(self, ec, guid):
         super(LinuxNode, self).__init__(ec, guid)
         self.ip = None
index 8b6625a..19ef22d 100644 (file)
@@ -1,5 +1,5 @@
 #!/usr/bin/env python
-from neco.execution.resource import Resource, clsinit
+from neco.execution.resource import ResourceManager, clsinit
 from neco.execution.attribute import Attribute
 from neco.resources.omf.omf_api import OMFAPIFactory
 
@@ -7,7 +7,7 @@ import neco
 import logging
 
 @clsinit
-class OMFApplication(Resource):
+class OMFApplication(ResourceManager):
     _rtype = "OMFApplication"
     _authorized_connections = ["OMFNode"]
 
index bd7f0f1..026b4aa 100644 (file)
@@ -1,5 +1,5 @@
 #!/usr/bin/env python
-from neco.execution.resource import Resource, clsinit
+from neco.execution.resource import ResourceManager, clsinit
 from neco.execution.attribute import Attribute
 
 from neco.resources.omf.omf_api import OMFAPIFactory
@@ -8,7 +8,7 @@ import neco
 import logging
 
 @clsinit
-class OMFChannel(Resource):
+class OMFChannel(ResourceManager):
     _rtype = "OMFChannel"
     _authorized_connections = ["OMFWifiInterface"]
 
index 07d6446..40a650c 100644 (file)
@@ -1,5 +1,5 @@
 #!/usr/bin/env python
-from neco.execution.resource import Resource, clsinit
+from neco.execution.resource import ResourceManager, clsinit
 from neco.execution.attribute import Attribute
 
 from neco.resources.omf.omf_api import OMFAPIFactory
@@ -8,7 +8,7 @@ import neco
 import logging
 
 @clsinit
-class OMFWifiInterface(Resource):
+class OMFWifiInterface(ResourceManager):
     _rtype = "OMFWifiInterface"
     _authorized_connections = ["OMFNode" , "OMFChannel"]
 
index 1a98b93..9bcffe9 100644 (file)
@@ -1,5 +1,5 @@
 #!/usr/bin/env python
-from neco.execution.resource import Resource, clsinit
+from neco.execution.resource import ResourceManager, clsinit
 from neco.execution.attribute import Attribute
 
 from neco.resources.omf.omf_api import OMFAPIFactory
@@ -8,7 +8,7 @@ import neco
 import logging
 
 @clsinit
-class OMFNode(Resource):
+class OMFNode(ResourceManager):
     _rtype = "OMFNode"
     _authorized_connections = ["OMFApplication" , "OMFWifiInterface"]
 
index 784513e..978ac19 100644 (file)
@@ -1,11 +1,11 @@
 #!/usr/bin/env python
-from neco.execution.resource import Resource, clsinit
+from neco.execution.resource import ResourceManager, clsinit
 from neco.execution.attribute import Attribute
 
 from neco.resources.omf.omf_api import OMFAPIFactory
 
 @clsinit
-class OMFResource(Resource):
+class OMFResource(ResourceManager):
     _rtype = "OMFResource"
 
     @classmethod
index 3616554..202a148 100755 (executable)
@@ -1,11 +1,11 @@
 #!/usr/bin/env python
-from neco.execution.resource import Resource, ResourceFactory, clsinit
+from neco.execution.resource import ResourceManager, ResourceFactory, clsinit
 from neco.execution.attribute import Attribute
 
 import unittest
 
 @clsinit
-class MyResource(Resource):
+class MyResource(ResourceManager):
     _rtype = "MyResource"
 
     @classmethod
@@ -17,7 +17,7 @@ class MyResource(Resource):
         super(MyResource, self).__init__(ec, guid)
 
 @clsinit
-class AnotherResource(Resource):
+class AnotherResource(ResourceManager):
     _rtype = "AnotherResource"
 
     def __init__(self, ec, guid):
index 799046f..316e584 100755 (executable)
@@ -1,5 +1,5 @@
 #!/usr/bin/env python
-from neco.execution.resource import Resource, ResourceFactory
+from neco.execution.resource import ResourceFactory
 from neco.execution.ec import ExperimentController
 
 from neco.resources.omf.omf_node import OMFNode