import os
import sys
import time
+import threading
from neco.util import guid
from neco.util.timefuncs import strfnow, strfdiff, strfvalid
from neco.execution.resource import ResourceFactory
-from neco.execution.scheduler import HeapScheduler, Task
+from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
from neco.util.parallel import ParallelRun
class ExperimentController(object):
- def __init__(self, root_dir = "/tmp", loglevel = 'error'):
+ def __init__(self, root_dir = "/tmp", loglevel = 'error'):
super(ExperimentController, self).__init__()
# root directory to store files
self._root_dir = root_dir
# Scheduler
self._scheduler = HeapScheduler()
+ # Tasks
+ self._tasks = dict()
+
# Event processing thread
self._stop = False
self._cond = threading.Condition()
self._logger = logging.getLogger("neco.execution.ec")
self._logger.setLevel(getattr(logging, loglevel.upper()))
- def resource(self, guid):
+ def get_task(self, tid):
+ return self._tasks.get(tid)
+
+ def get_resource(self, guid):
return self._resources.get(guid)
@property
return guid
def get_attributes(self, guid):
- rm = self._resources[guid]
+ rm = self.get_resource(guid)
return rm.get_attributes()
def get_filters(self, guid):
- rm = self._resources[guid]
+ rm = self.get_resource(guid)
return rm.get_filters()
def register_connection(self, guid1, guid2):
- rm1 = self._resources[guid1]
- rm2 = self._resources[guid2]
+ rm1 = self.get_resource(guid1)
+ rm2 = self.get_resource(guid2)
rm1.connect(guid2)
rm2.connect(guid1)
def discover_resource(self, guid, filters):
- rm = self._resources[guid]
+ rm = self.get_resource(guid)
return rm.discover(filters)
def provision_resource(self, guid, filters):
- rm = self._resources[guid]
+ rm = self.get_resource(guid)
return rm.provision(filters)
def register_start(self, group1, time, after_status, group2):
for guid1 in group1:
for guid2 in group2:
- rm = self._resources(guid1)
+ rm = self.get_resource(guid)
rm.start_after(time, after_status, guid2)
def register_stop(self, group1, time, after_status, group2):
for guid1 in group1:
for guid2 in group2:
- rm = self._resources(guid1)
+ rm = self.get_resource(guid)
rm.stop_after(time, after_status, guid2)
def register_set(self, name, value, group1, time, after_status, group2):
for guid1 in group1:
for guid2 in group2:
- rm = self._resources(guid1)
+ rm = self.get_resource(guid)
rm.set_after(name, value, time, after_status, guid2)
def get(self, guid, name):
- rm = self._resources(guid)
+ rm = self.get_resource(guid)
return rm.get(name)
def set(self, guid, name, value):
- rm = self._resources(guid)
+ rm = self.get_resource(guid)
return rm.set(name, value)
def status(self, guid):
- rm = self._resources(guid)
+ rm = self.get_resource(guid)
return rm.status()
def stop(self, guid):
- rm = self._resources(guid)
+ rm = self.get_resource(guid)
return rm.stop()
def deploy(self, group = None, start_when_all_ready = True):
threads = []
for guid in group:
- rm = self._resources(guid1)
+ rm = self.get_resource(guid)
kwargs = {'target': rm.deploy}
if start_when_all_ready:
threads = []
for guid in group:
- rm = self._resources(guid1)
+ rm = self.get_resource(guid)
thread = threading.Thread(target=rm.release)
threads.append(thread)
thread.start()
thread.join()
def shutdown(self):
- self._stop = False
self.release()
+
+ self._stop = True
+ self._cond.acquire()
+ self._cond.notify()
+ self._cond.release()
+ if self._thread.is_alive():
+ self._thread.join()
- def schedule(self, date, callback):
+ def schedule(self, date, callback, track = False):
"""
date string containing execution time for the task.
It can be expressed as an absolute time, using
callback code to be executed for the task. Must be a
Python function, and receives args and kwargs
as arguments.
+
+ track if set to True, the task will be retrivable with
+ the get_task() method
"""
timestamp = strfvalid(date)
task = Task(timestamp, callback)
task = self._scheduler.schedule(task)
+ if track:
+ self._tasks[task.id] = task
+
# Notify condition to wake up the processing thread
self._cond.acquire()
self._cond.notify()
self._cond.release()
- return task
+
+ return task.id
def _process(self):
runner = ParallelRun(maxthreads = 50)
self._cond.release()
else:
# Process tasks in parallel
- runner.put(task.callback)
+ runner.put(self._execute, task)
except:
import traceback
err = traceback.format_exc()
self._logger.error("Error while processing tasks in the EC: %s" % err)
-
+
+ def _execute(self, task):
+ # Invoke callback
+ task.status = TaskStatus.DONE
+
+ try:
+ task.result = task.callback()
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self._logger.error("Error while executing event: %s" % err)
+
+ task.result = err
+ task.status = TaskStatus.ERROR
+
#!/usr/bin/env python
from neco.execution.ec import ExperimentController
-from neco.execution.tasks import TaskStatus
+from neco.execution.scheduler import TaskStatus
import datetime
+import time
import unittest
class ExecuteControllersTestCase(unittest.TestCase):
def test_schedule_print(self):
- def myfunc(ec_weakref):
- result = id(ec_weakref())
- return (TaskStatus.SUCCESS, result)
+ def myfunc():
+ return 'hola!'
+ ec = ExperimentController()
+
try:
- ec = ExperimentController()
+ tid = ec.schedule("0s", myfunc, track=True)
+
+ while True:
+ task = ec.get_task(tid)
+ if task.status != TaskStatus.NEW:
+ break
+
+ time.sleep(1)
- tid = ec.schedule("0s", myfunc)
- status = None
- while status != TaskStatus.SUCCESS:
- (status, result) = ec.task_info(tid)
+ self.assertEquals('hola!', task.result)
- self.assertEquals(id(ec), result)
finally:
- ec.terminate()
+ ec.shutdown()
def test_schedule_date(self):
- def get_time(ec_weakref):
- timestamp = datetime.datetime.now()
- return (TaskStatus.SUCCESS, timestamp)
+ def get_time():
+ return datetime.datetime.now()
- try:
- ec = ExperimentController()
+ ec = ExperimentController()
+ try:
schedule_time = datetime.datetime.now()
- tid = ec.schedule("4s", get_time)
- status = None
- while status != TaskStatus.SUCCESS:
- (status, execution_time) = ec.task_info(tid)
+ tid = ec.schedule("4s", get_time, track=True)
+
+ while True:
+ task = ec.get_task(tid)
+ if task.status != TaskStatus.NEW:
+ break
+
+ time.sleep(1)
+ execution_time = task.result
delta = execution_time - schedule_time
self.assertTrue(delta > datetime.timedelta(seconds=4))
self.assertTrue(delta < datetime.timedelta(seconds=5))
finally:
- ec.terminate()
+ ec.shutdown()
if __name__ == '__main__':