import logging
import os
import sys
-import threading
import time
-import weakref
-from neco.execution import scheduler, tasks
from neco.util import guid
-from neco.util.timefuncs import strfnow, strfdiff, strfvalid
-from neco.util.parallel import ParallelRun
-
-_reschedule_delay = "0.1s"
+from neco.resources import ResourceFactory
class ExperimentController(object):
def __init__(self, root_dir = "/tmp", loglevel = 'error'):
# generator of globally unique ids
self._guid_generator = guid.GuidGenerator()
- # Scheduler
- self._scheduler = scheduler.HeapScheduler()
-
- # Tasks
- self._tasks = dict()
-
- # Resources
+ # Resource managers
self._resources = dict()
-
- # Event processing thread
- self._cond = threading.Condition()
- self._stop = False
- self._thread = threading.Thread(target = self._process_tasks)
- self._thread.start()
+
+ # Groups of resources
+ self._groups = dict()
# Logging
self._logger = logging.getLogger("neco.execution.ec")
def resource(self, guid):
return self._resources.get(guid)
- def terminate(self):
- self._stop = True
- self._cond.acquire()
- self._cond.notify()
- self._cond.release()
- if self._thread.is_alive():
- self._thread.join()
-
- def task_info(self, tid):
- task = self._tasks.get(tid)
- if not task:
- return (None, None)
- return (task.status, task.result)
-
- def schedule(self, date, callback, args = None, kwargs = None):
- """
- date string containing execution time for the task.
- It can be expressed as an absolute time, using
- timestamp format, or as a relative time matching
- ^\d+.\d+(h|m|s|ms|us)$
-
- callback code to be executed for the task. Must be a
- Python function, and receives args and kwargs
- as arguments.
- The callback will always be invoked passing a
- week reference to the controller as first
- argument.
- The callback must return a (status, result)
- tuple where status is one of :
- task.TaskStatus.FAIL,
- task.TaskStatus.SUCCESS,
- task.TaskStatus.RETRY,
- task.TaskStatus.RECYCLE
- """
- timestamp = strfvalid(date)
+ def resources(self):
+ return self._resources.keys()
+
+ def release(self, group = None):
+ # TODO
+ pass
+
+ def deploy(self, group = None):
+ # TODO
+ pass
+
+ def register_resource(self, rtype, guid = None):
+ # Get next available guid
+ guid = self._guid_generator.next(guid)
- args = args or []
- kwargs = kwargs or {}
-
- task = tasks.Task(timestamp, callback, args, kwargs)
- task = self._schedule(task)
-
- self._tasks[task.id] = task
-
- return task.id
-
- ###########################################################################
- #### Internal methods
- ###########################################################################
-
- def _schedule(self, task):
- task = self._scheduler.schedule(task)
-
- # Notify condition to wake up the processing thread
- self._cond.acquire()
- self._cond.notify()
- self._cond.release()
- return task
-
- def _process_tasks(self):
- runner = ParallelRun(maxthreads = 50)
- runner.start()
-
- try:
- while not self._stop:
- self._cond.acquire()
- task = self._scheduler.next()
- self._cond.release()
-
- if not task:
- # It there are not tasks in the tasks queue we need to
- # wait until a call to schedule wakes us up
- self._cond.acquire()
- self._cond.wait()
- self._cond.release()
- else:
- # If the task timestamp is in the future the thread needs to wait
- # until time elapse or until another task is scheduled
- now = strfnow()
- if now < task.timestamp:
- # Calculate time difference in seconds
- timeout = strfdiff(task.timestamp, now)
- # Re-schedule task with the same timestamp
- self._scheduler.schedule(task)
- # Sleep until timeout or until a new task awakes the condition
- self._cond.acquire()
- self._cond.wait(timeout)
- self._cond.release()
- else:
- # Process tasks in parallel
- runner.put(self._execute_task, task)
- except:
- import traceback
- err = traceback.format_exc()
- self._logger.error("Error while processing tasks in the EC: %s" % err)
-
- def _execute_task(self, task):
- # Invoke callback
- ec = weakref.ref(self)
- try:
- (task.status, task.result) = task.callback(ec, *task.args, **task.kwargs)
- except:
- import traceback
- err = traceback.format_exc()
- self._logger.error("Error while executing event: %s" % err)
-
- # task marked as FAIL
- task.status = tasks.TaskStatus.FAIL
- task.result = err
-
- if task.status == tasks.TaskStatus.RETRY:
- # Re-schedule same task in the near future
- task.timestamp = strfvalid(_reschedule_delay)
- self._schedule(task)
- elif task.status == tasks.TaskStatus.RECYCLE:
- # Re-schedule t in the future
- timestamp = strfvalid(task.result)
- self.schedule(timestamp, task.callback, task.args, task.kwargs)
+ # Instantiate RM
+ rm = ResourceFactory.create(rtype, self, guid)
+
+ # Store RM
+ self._resources[guid] = rm
+
+ return guid
+
+ def get_attributes(self, guid):
+ rm = self._resources[guid]
+ return rm.get_attributes()
+
+ def get_filters(self, guid):
+ rm = self._resources[guid]
+ return rm.get_filters()
+
+ def register_connection(self, guid1, guid2):
+ rm1 = self._resources[guid1]
+ rm2 = self._resources[guid2]
+
+ rm1.connect(guid2)
+ rm2.connect(guid1)
+
+ def register_group(self, guids, gguid = None):
+ gguid = self._guid_generator.next(gguid)
+ self._groups[gguid] = guids
+
+ def discover_resource(self, guid, filters):
+ rm = self._resources[guid]
+ return rm.discover(filters)
+
+ def provision_resource(self, guid, filters):
+ rm = self._resources[guid]
+ return rm.provision(filters)
+
+ def register_start(self, gguid1, time, after_status, gguid2):
+ if isinstance(gguid1, int):
+ gguid1 = list[gguid1]
+ if isinstance(gguid2, int):
+ gguid2 = list[gguid2]
+
+ for guid1 in gguid1:
+ for guid2 in gguid2:
+ rm = self._resources(guid1)
+ rm.start_after(time, after_status, guid2)
+
+ def register_stop(self, gguid1, time, after_status, gguid2):
+ if isinstance(gguid1, int):
+ gguid1 = list[gguid1]
+ if isinstance(gguid2, int):
+ gguid2 = list[gguid2]
+
+ for guid1 in gguid1:
+ for guid2 in gguid2:
+ rm = self._resources(guid1)
+ rm.stop_after(time, after_status, guid2)
+
+ def register_set(self, name, value, gguid1, time, after_status, gguid2):
+ if isinstance(gguid1, int):
+ gguid1 = list[gguid1]
+ if isinstance(group2, int):
+ gguid2 = list[gguid2]
+
+ for guid1 in gguid1:
+ for guid2 in gguid2:
+ rm = self._resources(guid1)
+ rm.set_after(name, value, time, after_status, guid2)
+
+ def get(self, guid, name):
+ rm = self._resources(guid)
+ return rm.get(name)
+
+ def set(self, guid, name, value):
+ rm = self._resources(guid)
+ return rm.set(name, value)
+
+ def status(self, guid):
+ rm = self._resources(guid)
+ return rm.status()
+
+ def stop(self, guid):
+ rm = self._resources(guid)
+ return rm.stop()
+import copy
import logging
import weakref
class Resource(object):
+ # static template for resource filters
+ _filters = dict()
+
+ # static template for resource attributes
+ _attributes = dict()
+
+ @classmethod
+ def _register_filter(cls, attr):
+ """ Resource subclasses will invoke this method to add a
+ filter attribute"""
+ cls._filters[attr.name] = attr
+
+ @classmethod
+ def _register_attributes(cls, attr):
+ """ Resource subclasses will invoke this method to add a
+ resource attribute"""
+ cls._attributes[attr.name] = attr
+
+ @classmethod
+ def get_filters(cls):
+ return copy.deepcopy(cls._filters.values())
+
+ @classmethod
+ def get_attributes(cls):
+ return copy.deepcopy(cls._attributes.values())
+
def __init__(self, ec, guid):
self._guid = guid
self._ec = weakref.ref(ec)
+ self._connections = set()
+ # the resource instance gets a copy of all attributes
+ # that can modify
+ self._attrs = copy.deepcopy(self._attributes)
# Logging
loglevel = "debug"
- self._logger = logging.getLogger("neco.execution.Resource.%s" %
+ self._logger = logging.getLogger("neco.execution.resource.Resource.%s" %
self.guid)
self._logger.setLevel(getattr(logging, loglevel.upper()))
def ec(self):
return self._ec()
+ def connect(self, guid):
+ if (self._validate_connection(guid)):
+ self._connections.add(guid)
+
+ def discover(self, filters):
+ pass
+
+ def provision(self, filters):
+ pass
+
+ def set(self, name, value):
+ attr = self._attrs[name]
+ attr.value = value
+
+ def get(self, name):
+ attr = self._attrs[name]
+ return attr.value
+
+ def start_after(self, time, after_status, guid):
+ pass
+
+ def stop_after(self, time, after_status, guid):
+ pass
+
+ def set_after(self, name, value, time, after_status, guid):
+ pass
+
+ def stop(self):
+ pass
+
+ def _validate_connection(self, guid):
+ # TODO: Validate!
+ return True
+
+class ResourceFactory(object):
+ def __init__(self):
+ self._resource_types = dict()
+
+ def register_type(self, rtype, rclass):
+ self._resource_types[rtype] = rclass
+
+ def create(self, rtype, ec, guid):
+ rclass = self._resource[rtype]
+ return rclass(ec, guid)