Fixing ec unit tests
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Sat, 30 Mar 2013 16:31:36 +0000 (17:31 +0100)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Sat, 30 Mar 2013 16:31:36 +0000 (17:31 +0100)
src/neco/execution/ec.py
src/neco/execution/scheduler.py
test/execution/ec.py

index 429f5ca..0b15c3a 100644 (file)
@@ -2,15 +2,16 @@ import logging
 import os
 import sys
 import time
 import os
 import sys
 import time
+import threading
 
 from neco.util import guid
 from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
 from neco.execution.resource import ResourceFactory
 
 from neco.util import guid
 from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
 from neco.execution.resource import ResourceFactory
-from neco.execution.scheduler import HeapScheduler, Task
+from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
 from neco.util.parallel import ParallelRun
 
 class ExperimentController(object):
 from neco.util.parallel import ParallelRun
 
 class ExperimentController(object):
-    def __init__(self, root_dir = "/tmp", loglevel = 'error'):
+    def __init__(self, root_dir = "/tmp", loglevel = 'error'): 
         super(ExperimentController, self).__init__()
         # root directory to store files
         self._root_dir = root_dir
         super(ExperimentController, self).__init__()
         # root directory to store files
         self._root_dir = root_dir
@@ -24,6 +25,9 @@ class ExperimentController(object):
         # Scheduler
         self._scheduler = HeapScheduler()
 
         # Scheduler
         self._scheduler = HeapScheduler()
 
+        # Tasks
+        self._tasks = dict()
+
         # Event processing thread
         self._stop = False
         self._cond = threading.Condition()
         # Event processing thread
         self._stop = False
         self._cond = threading.Condition()
@@ -34,7 +38,10 @@ class ExperimentController(object):
         self._logger = logging.getLogger("neco.execution.ec")
         self._logger.setLevel(getattr(logging, loglevel.upper()))
 
         self._logger = logging.getLogger("neco.execution.ec")
         self._logger.setLevel(getattr(logging, loglevel.upper()))
 
-    def resource(self, guid):
+    def get_task(self, tid):
+        return self._tasks.get(tid)
+
+    def get_resource(self, guid):
         return self._resources.get(guid)
 
     @property
         return self._resources.get(guid)
 
     @property
@@ -54,26 +61,26 @@ class ExperimentController(object):
         return guid
 
     def get_attributes(self, guid):
         return guid
 
     def get_attributes(self, guid):
-        rm = self._resources[guid]
+        rm = self.get_resource(guid)
         return rm.get_attributes()
 
     def get_filters(self, guid):
         return rm.get_attributes()
 
     def get_filters(self, guid):
-        rm = self._resources[guid]
+        rm = self.get_resource(guid)
         return rm.get_filters()
 
     def register_connection(self, guid1, guid2):
         return rm.get_filters()
 
     def register_connection(self, guid1, guid2):
-        rm1 = self._resources[guid1]
-        rm2 = self._resources[guid2]
+        rm1 = self.get_resource(guid1)
+        rm2 = self.get_resource(guid2)
 
         rm1.connect(guid2)
         rm2.connect(guid1)
 
     def discover_resource(self, guid, filters):
 
         rm1.connect(guid2)
         rm2.connect(guid1)
 
     def discover_resource(self, guid, filters):
-        rm = self._resources[guid]
+        rm = self.get_resource(guid)
         return rm.discover(filters)
 
     def provision_resource(self, guid, filters):
         return rm.discover(filters)
 
     def provision_resource(self, guid, filters):
-        rm = self._resources[guid]
+        rm = self.get_resource(guid)
         return rm.provision(filters)
 
     def register_start(self, group1, time, after_status, group2):
         return rm.provision(filters)
 
     def register_start(self, group1, time, after_status, group2):
@@ -84,7 +91,7 @@ class ExperimentController(object):
 
         for guid1 in group1:
             for guid2 in group2:
 
         for guid1 in group1:
             for guid2 in group2:
-                rm = self._resources(guid1)
+                rm = self.get_resource(guid)
                 rm.start_after(time, after_status, guid2)
 
     def register_stop(self, group1, time, after_status, group2):
                 rm.start_after(time, after_status, guid2)
 
     def register_stop(self, group1, time, after_status, group2):
@@ -95,7 +102,7 @@ class ExperimentController(object):
 
         for guid1 in group1:
             for guid2 in group2:
 
         for guid1 in group1:
             for guid2 in group2:
-                rm = self._resources(guid1)
+                rm = self.get_resource(guid)
                 rm.stop_after(time, after_status, guid2)
 
     def register_set(self, name, value, group1, time, after_status, group2):
                 rm.stop_after(time, after_status, guid2)
 
     def register_set(self, name, value, group1, time, after_status, group2):
@@ -106,23 +113,23 @@ class ExperimentController(object):
 
         for guid1 in group1:
             for guid2 in group2:
 
         for guid1 in group1:
             for guid2 in group2:
-                rm = self._resources(guid1)
+                rm = self.get_resource(guid)
                 rm.set_after(name, value, time, after_status, guid2)
 
     def get(self, guid, name):
                 rm.set_after(name, value, time, after_status, guid2)
 
     def get(self, guid, name):
-        rm = self._resources(guid)
+        rm = self.get_resource(guid)
         return rm.get(name)
 
     def set(self, guid, name, value):
         return rm.get(name)
 
     def set(self, guid, name, value):
-        rm = self._resources(guid)
+        rm = self.get_resource(guid)
         return rm.set(name, value)
 
     def status(self, guid):
         return rm.set(name, value)
 
     def status(self, guid):
-        rm = self._resources(guid)
+        rm = self.get_resource(guid)
         return rm.status()
 
     def stop(self, guid):
         return rm.status()
 
     def stop(self, guid):
-        rm = self._resources(guid)
+        rm = self.get_resource(guid)
         return rm.stop()
 
     def deploy(self, group = None, start_when_all_ready = True):
         return rm.stop()
 
     def deploy(self, group = None, start_when_all_ready = True):
@@ -131,7 +138,7 @@ class ExperimentController(object):
 
         threads = []
         for guid in group:
 
         threads = []
         for guid in group:
-            rm = self._resources(guid1)
+            rm = self.get_resource(guid)
 
             kwargs = {'target': rm.deploy}
             if start_when_all_ready:
 
             kwargs = {'target': rm.deploy}
             if start_when_all_ready:
@@ -152,7 +159,7 @@ class ExperimentController(object):
 
         threads = []
         for guid in group:
 
         threads = []
         for guid in group:
-            rm = self._resources(guid1)
+            rm = self.get_resource(guid)
             thread = threading.Thread(target=rm.release)
             threads.append(thread)
             thread.start()
             thread = threading.Thread(target=rm.release)
             threads.append(thread)
             thread.start()
@@ -161,10 +168,16 @@ class ExperimentController(object):
             thread.join()
 
     def shutdown(self):
             thread.join()
 
     def shutdown(self):
-        self._stop = False
         self.release()
         self.release()
+        
+        self._stop = True
+        self._cond.acquire()
+        self._cond.notify()
+        self._cond.release()
+        if self._thread.is_alive():
+           self._thread.join()
 
 
-    def schedule(self, date, callback):
+    def schedule(self, date, callback, track = False):
         """
             date    string containing execution time for the task.
                     It can be expressed as an absolute time, using
         """
             date    string containing execution time for the task.
                     It can be expressed as an absolute time, using
@@ -174,17 +187,24 @@ class ExperimentController(object):
             callback    code to be executed for the task. Must be a
                         Python function, and receives args and kwargs
                         as arguments.
             callback    code to be executed for the task. Must be a
                         Python function, and receives args and kwargs
                         as arguments.
+
+            track   if set to True, the task will be retrivable with
+                    the get_task() method
         """
         timestamp = strfvalid(date)
         
         task = Task(timestamp, callback)
         task = self._scheduler.schedule(task)
 
         """
         timestamp = strfvalid(date)
         
         task = Task(timestamp, callback)
         task = self._scheduler.schedule(task)
 
+        if track:
+            self._tasks[task.id] = task
+  
         # Notify condition to wake up the processing thread
         self._cond.acquire()
         self._cond.notify()
         self._cond.release()
         # Notify condition to wake up the processing thread
         self._cond.acquire()
         self._cond.notify()
         self._cond.release()
-        return task
+
+        return task.id
      
     def _process(self):
         runner = ParallelRun(maxthreads = 50)
      
     def _process(self):
         runner = ParallelRun(maxthreads = 50)
@@ -217,9 +237,23 @@ class ExperimentController(object):
                         self._cond.release()
                     else:
                         # Process tasks in parallel
                         self._cond.release()
                     else:
                         # Process tasks in parallel
-                        runner.put(task.callback)
+                        runner.put(self._execute, task)
         except:  
             import traceback
             err = traceback.format_exc()
             self._logger.error("Error while processing tasks in the EC: %s" % err)
         except:  
             import traceback
             err = traceback.format_exc()
             self._logger.error("Error while processing tasks in the EC: %s" % err)
+
+    def _execute(self, task):
+        # Invoke callback
+        task.status = TaskStatus.DONE
+
+        try:
+            task.result = task.callback()
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self._logger.error("Error while executing event: %s" % err)
+
+            task.result = err
+            task.status = TaskStatus.ERROR
+
index 2786adc..0060926 100644 (file)
@@ -1,11 +1,19 @@
 import itertools
 import heapq
 
 import itertools
 import heapq
 
+class TaskStatus:
+    NEW = 0
+    DONE = 1
+    ERROR = 2
+
+
 class Task(object):
     def __init__(self, timestamp, callback):
         self.id = None 
         self.timestamp = timestamp
         self.callback = callback
 class Task(object):
     def __init__(self, timestamp, callback):
         self.id = None 
         self.timestamp = timestamp
         self.callback = callback
+        self.result = None
+        self.status = TaskStatus.NEW
 
 class HeapScheduler(object):
     """ This class is thread safe.
 
 class HeapScheduler(object):
     """ This class is thread safe.
index eda875e..fba3cd5 100755 (executable)
@@ -1,50 +1,59 @@
 #!/usr/bin/env python
 
 from neco.execution.ec import ExperimentController 
 #!/usr/bin/env python
 
 from neco.execution.ec import ExperimentController 
-from neco.execution.tasks import TaskStatus
+from neco.execution.scheduler import TaskStatus
 
 import datetime
 
 import datetime
+import time
 import unittest
 
 class ExecuteControllersTestCase(unittest.TestCase):
     def test_schedule_print(self):
 import unittest
 
 class ExecuteControllersTestCase(unittest.TestCase):
     def test_schedule_print(self):
-        def myfunc(ec_weakref):
-            result = id(ec_weakref())
-            return (TaskStatus.SUCCESS, result)
+        def myfunc():
+            return 'hola!' 
 
 
+        ec = ExperimentController()
+        
         try:
         try:
-            ec = ExperimentController()
+            tid = ec.schedule("0s", myfunc, track=True)
+            
+            while True:
+                task = ec.get_task(tid)
+                if task.status != TaskStatus.NEW:
+                    break
+
+                time.sleep(1)
 
 
-            tid = ec.schedule("0s", myfunc)
-            status = None
-            while status != TaskStatus.SUCCESS:
-                (status, result) = ec.task_info(tid)
+            self.assertEquals('hola!', task.result)
 
 
-            self.assertEquals(id(ec), result)
         finally:
         finally:
-            ec.terminate()
+            ec.shutdown()
 
     def test_schedule_date(self):
 
     def test_schedule_date(self):
-        def get_time(ec_weakref):
-            timestamp = datetime.datetime.now() 
-            return (TaskStatus.SUCCESS, timestamp)
+        def get_time():
+            return datetime.datetime.now() 
 
 
-        try:
-            ec = ExperimentController()
+        ec = ExperimentController()
 
 
+        try:
             schedule_time = datetime.datetime.now()
             
             schedule_time = datetime.datetime.now()
             
-            tid = ec.schedule("4s", get_time)
-            status = None
-            while status != TaskStatus.SUCCESS:
-                (status, execution_time) = ec.task_info(tid)
+            tid = ec.schedule("4s", get_time, track=True)
+
+            while True:
+                task = ec.get_task(tid)
+                if task.status != TaskStatus.NEW:
+                    break
+
+                time.sleep(1)
 
 
+            execution_time = task.result
             delta = execution_time - schedule_time
             self.assertTrue(delta > datetime.timedelta(seconds=4))
             self.assertTrue(delta < datetime.timedelta(seconds=5))
 
         finally:
             delta = execution_time - schedule_time
             self.assertTrue(delta > datetime.timedelta(seconds=4))
             self.assertTrue(delta < datetime.timedelta(seconds=5))
 
         finally:
-            ec.terminate()
+            ec.shutdown()
 
 
 if __name__ == '__main__':
 
 
 if __name__ == '__main__':