From 02ff35b09ad8054fa4f9b8c19555b4dc480f7585 Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Sat, 30 Mar 2013 17:31:36 +0100 Subject: [PATCH] Fixing ec unit tests --- src/neco/execution/ec.py | 80 +++++++++++++++++++++++---------- src/neco/execution/scheduler.py | 8 ++++ test/execution/ec.py | 51 ++++++++++++--------- 3 files changed, 95 insertions(+), 44 deletions(-) diff --git a/src/neco/execution/ec.py b/src/neco/execution/ec.py index 429f5cab..0b15c3a0 100644 --- a/src/neco/execution/ec.py +++ b/src/neco/execution/ec.py @@ -2,15 +2,16 @@ import logging import os import sys import time +import threading from neco.util import guid from neco.util.timefuncs import strfnow, strfdiff, strfvalid from neco.execution.resource import ResourceFactory -from neco.execution.scheduler import HeapScheduler, Task +from neco.execution.scheduler import HeapScheduler, Task, TaskStatus from neco.util.parallel import ParallelRun class ExperimentController(object): - def __init__(self, root_dir = "/tmp", loglevel = 'error'): + def __init__(self, root_dir = "/tmp", loglevel = 'error'): super(ExperimentController, self).__init__() # root directory to store files self._root_dir = root_dir @@ -24,6 +25,9 @@ class ExperimentController(object): # Scheduler self._scheduler = HeapScheduler() + # Tasks + self._tasks = dict() + # Event processing thread self._stop = False self._cond = threading.Condition() @@ -34,7 +38,10 @@ class ExperimentController(object): self._logger = logging.getLogger("neco.execution.ec") self._logger.setLevel(getattr(logging, loglevel.upper())) - def resource(self, guid): + def get_task(self, tid): + return self._tasks.get(tid) + + def get_resource(self, guid): return self._resources.get(guid) @property @@ -54,26 +61,26 @@ class ExperimentController(object): return guid def get_attributes(self, guid): - rm = self._resources[guid] + rm = self.get_resource(guid) return rm.get_attributes() def get_filters(self, guid): - rm = self._resources[guid] + rm = self.get_resource(guid) return rm.get_filters() def register_connection(self, guid1, guid2): - rm1 = self._resources[guid1] - rm2 = self._resources[guid2] + rm1 = self.get_resource(guid1) + rm2 = self.get_resource(guid2) rm1.connect(guid2) rm2.connect(guid1) def discover_resource(self, guid, filters): - rm = self._resources[guid] + rm = self.get_resource(guid) return rm.discover(filters) def provision_resource(self, guid, filters): - rm = self._resources[guid] + rm = self.get_resource(guid) return rm.provision(filters) def register_start(self, group1, time, after_status, group2): @@ -84,7 +91,7 @@ class ExperimentController(object): for guid1 in group1: for guid2 in group2: - rm = self._resources(guid1) + rm = self.get_resource(guid) rm.start_after(time, after_status, guid2) def register_stop(self, group1, time, after_status, group2): @@ -95,7 +102,7 @@ class ExperimentController(object): for guid1 in group1: for guid2 in group2: - rm = self._resources(guid1) + rm = self.get_resource(guid) rm.stop_after(time, after_status, guid2) def register_set(self, name, value, group1, time, after_status, group2): @@ -106,23 +113,23 @@ class ExperimentController(object): for guid1 in group1: for guid2 in group2: - rm = self._resources(guid1) + rm = self.get_resource(guid) rm.set_after(name, value, time, after_status, guid2) def get(self, guid, name): - rm = self._resources(guid) + rm = self.get_resource(guid) return rm.get(name) def set(self, guid, name, value): - rm = self._resources(guid) + rm = self.get_resource(guid) return rm.set(name, value) def status(self, guid): - rm = self._resources(guid) + rm = self.get_resource(guid) return rm.status() def stop(self, guid): - rm = self._resources(guid) + rm = self.get_resource(guid) return rm.stop() def deploy(self, group = None, start_when_all_ready = True): @@ -131,7 +138,7 @@ class ExperimentController(object): threads = [] for guid in group: - rm = self._resources(guid1) + rm = self.get_resource(guid) kwargs = {'target': rm.deploy} if start_when_all_ready: @@ -152,7 +159,7 @@ class ExperimentController(object): threads = [] for guid in group: - rm = self._resources(guid1) + rm = self.get_resource(guid) thread = threading.Thread(target=rm.release) threads.append(thread) thread.start() @@ -161,10 +168,16 @@ class ExperimentController(object): thread.join() def shutdown(self): - self._stop = False self.release() + + self._stop = True + self._cond.acquire() + self._cond.notify() + self._cond.release() + if self._thread.is_alive(): + self._thread.join() - def schedule(self, date, callback): + def schedule(self, date, callback, track = False): """ date string containing execution time for the task. It can be expressed as an absolute time, using @@ -174,17 +187,24 @@ class ExperimentController(object): callback code to be executed for the task. Must be a Python function, and receives args and kwargs as arguments. + + track if set to True, the task will be retrivable with + the get_task() method """ timestamp = strfvalid(date) task = Task(timestamp, callback) task = self._scheduler.schedule(task) + if track: + self._tasks[task.id] = task + # Notify condition to wake up the processing thread self._cond.acquire() self._cond.notify() self._cond.release() - return task + + return task.id def _process(self): runner = ParallelRun(maxthreads = 50) @@ -217,9 +237,23 @@ class ExperimentController(object): self._cond.release() else: # Process tasks in parallel - runner.put(task.callback) + runner.put(self._execute, task) except: import traceback err = traceback.format_exc() self._logger.error("Error while processing tasks in the EC: %s" % err) - + + def _execute(self, task): + # Invoke callback + task.status = TaskStatus.DONE + + try: + task.result = task.callback() + except: + import traceback + err = traceback.format_exc() + self._logger.error("Error while executing event: %s" % err) + + task.result = err + task.status = TaskStatus.ERROR + diff --git a/src/neco/execution/scheduler.py b/src/neco/execution/scheduler.py index 2786adce..00609261 100644 --- a/src/neco/execution/scheduler.py +++ b/src/neco/execution/scheduler.py @@ -1,11 +1,19 @@ import itertools import heapq +class TaskStatus: + NEW = 0 + DONE = 1 + ERROR = 2 + + class Task(object): def __init__(self, timestamp, callback): self.id = None self.timestamp = timestamp self.callback = callback + self.result = None + self.status = TaskStatus.NEW class HeapScheduler(object): """ This class is thread safe. diff --git a/test/execution/ec.py b/test/execution/ec.py index eda875ee..fba3cd5b 100755 --- a/test/execution/ec.py +++ b/test/execution/ec.py @@ -1,50 +1,59 @@ #!/usr/bin/env python from neco.execution.ec import ExperimentController -from neco.execution.tasks import TaskStatus +from neco.execution.scheduler import TaskStatus import datetime +import time import unittest class ExecuteControllersTestCase(unittest.TestCase): def test_schedule_print(self): - def myfunc(ec_weakref): - result = id(ec_weakref()) - return (TaskStatus.SUCCESS, result) + def myfunc(): + return 'hola!' + ec = ExperimentController() + try: - ec = ExperimentController() + tid = ec.schedule("0s", myfunc, track=True) + + while True: + task = ec.get_task(tid) + if task.status != TaskStatus.NEW: + break + + time.sleep(1) - tid = ec.schedule("0s", myfunc) - status = None - while status != TaskStatus.SUCCESS: - (status, result) = ec.task_info(tid) + self.assertEquals('hola!', task.result) - self.assertEquals(id(ec), result) finally: - ec.terminate() + ec.shutdown() def test_schedule_date(self): - def get_time(ec_weakref): - timestamp = datetime.datetime.now() - return (TaskStatus.SUCCESS, timestamp) + def get_time(): + return datetime.datetime.now() - try: - ec = ExperimentController() + ec = ExperimentController() + try: schedule_time = datetime.datetime.now() - tid = ec.schedule("4s", get_time) - status = None - while status != TaskStatus.SUCCESS: - (status, execution_time) = ec.task_info(tid) + tid = ec.schedule("4s", get_time, track=True) + + while True: + task = ec.get_task(tid) + if task.status != TaskStatus.NEW: + break + + time.sleep(1) + execution_time = task.result delta = execution_time - schedule_time self.assertTrue(delta > datetime.timedelta(seconds=4)) self.assertTrue(delta < datetime.timedelta(seconds=5)) finally: - ec.terminate() + ec.shutdown() if __name__ == '__main__': -- 2.43.0