From: Alina Quereilhac Date: Sat, 2 Mar 2013 10:50:54 +0000 (+0100) Subject: Added attributes for resources. Implemented new API for ec and resource X-Git-Tag: nepi-3.0.0~122^2~28 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=b92fb13c1c7bd579b728a2bd93f4cb613c3395cf;p=nepi.git Added attributes for resources. Implemented new API for ec and resource --- diff --git a/src/neco/execution/attribute.py b/src/neco/execution/attribute.py new file mode 100644 index 00000000..9af394b6 --- /dev/null +++ b/src/neco/execution/attribute.py @@ -0,0 +1,68 @@ + +### Attribute Types +class Types: + String = "STRING" + Bool = "BOOL" + Enum = "ENUM" + Double = "DOUBLE" + Integer = "INTEGER" + +### Attribute Flags +class Flags: + # Attribute can be modified by the user + NoFlags = 0x00 + # Attribute is not modifiable by the user + ReadOnly = 0x01 + # Attribute is an access credential + Credential = 0x02 + +class Attribute(object): + def __init__(self, name, help, type = Types.String, + flags = Flags.NoFlags, default_value = None): + self._name = name + self._help = help + self._type = type + self._flags = flags + self._default = self._value = default_value + + @property + def name(self): + return self._name + + @property + def default(self): + return self._default_value + + @property + def type(self): + return self._type + + @property + def help(self): + return self._help + + @property + def flags(self): + return self._flags + + def has_flag(self, flag): + return (self._flags & flag) == flag + + def get_value(self): + return self._value + + def set_value(self, value): + if self.is_valid_value(value): + self._value = value + self._modified = True + else: + raise ValueError("Invalid value %s for attribute %s" % + (str(value), self.name)) + + value = property(get_value, set_value) + + def is_valid_value(self, value): + """ Attribute subclasses will override this method to add + adequate validation""" + return True + diff --git a/src/neco/execution/callbacks.py b/src/neco/execution/callbacks.py deleted file mode 100644 index a30411f9..00000000 --- a/src/neco/execution/callbacks.py +++ /dev/null @@ -1,20 +0,0 @@ - -def deploy(ec_weakref, xml): - from neco.util.parser import XMLParser - - # parse xml and build topology graph - parser = XMLParser() - box = parser.from_xml(xml) - - # instantiate resource boxes - - - # allocate physical resources - # configure physical resources - # allocate virtual resources - # configure virtual resources - # allocate software resources - # configure software resources - # schedule application start/stop - - diff --git a/src/neco/execution/ec.py b/src/neco/execution/ec.py index 01a6aba2..6d365f62 100644 --- a/src/neco/execution/ec.py +++ b/src/neco/execution/ec.py @@ -1,16 +1,10 @@ import logging import os import sys -import threading import time -import weakref -from neco.execution import scheduler, tasks from neco.util import guid -from neco.util.timefuncs import strfnow, strfdiff, strfvalid -from neco.util.parallel import ParallelRun - -_reschedule_delay = "0.1s" +from neco.resources import ResourceFactory class ExperimentController(object): def __init__(self, root_dir = "/tmp", loglevel = 'error'): @@ -21,20 +15,11 @@ class ExperimentController(object): # generator of globally unique ids self._guid_generator = guid.GuidGenerator() - # Scheduler - self._scheduler = scheduler.HeapScheduler() - - # Tasks - self._tasks = dict() - - # Resources + # Resource managers self._resources = dict() - - # Event processing thread - self._cond = threading.Condition() - self._stop = False - self._thread = threading.Thread(target = self._process_tasks) - self._thread.start() + + # Groups of resources + self._groups = dict() # Logging self._logger = logging.getLogger("neco.execution.ec") @@ -43,122 +28,102 @@ class ExperimentController(object): def resource(self, guid): return self._resources.get(guid) - def terminate(self): - self._stop = True - self._cond.acquire() - self._cond.notify() - self._cond.release() - if self._thread.is_alive(): - self._thread.join() - - def task_info(self, tid): - task = self._tasks.get(tid) - if not task: - return (None, None) - return (task.status, task.result) - - def schedule(self, date, callback, args = None, kwargs = None): - """ - date string containing execution time for the task. - It can be expressed as an absolute time, using - timestamp format, or as a relative time matching - ^\d+.\d+(h|m|s|ms|us)$ - - callback code to be executed for the task. Must be a - Python function, and receives args and kwargs - as arguments. - The callback will always be invoked passing a - week reference to the controller as first - argument. - The callback must return a (status, result) - tuple where status is one of : - task.TaskStatus.FAIL, - task.TaskStatus.SUCCESS, - task.TaskStatus.RETRY, - task.TaskStatus.RECYCLE - """ - timestamp = strfvalid(date) + def resources(self): + return self._resources.keys() + + def release(self, group = None): + # TODO + pass + + def deploy(self, group = None): + # TODO + pass + + def register_resource(self, rtype, guid = None): + # Get next available guid + guid = self._guid_generator.next(guid) - args = args or [] - kwargs = kwargs or {} - - task = tasks.Task(timestamp, callback, args, kwargs) - task = self._schedule(task) - - self._tasks[task.id] = task - - return task.id - - ########################################################################### - #### Internal methods - ########################################################################### - - def _schedule(self, task): - task = self._scheduler.schedule(task) - - # Notify condition to wake up the processing thread - self._cond.acquire() - self._cond.notify() - self._cond.release() - return task - - def _process_tasks(self): - runner = ParallelRun(maxthreads = 50) - runner.start() - - try: - while not self._stop: - self._cond.acquire() - task = self._scheduler.next() - self._cond.release() - - if not task: - # It there are not tasks in the tasks queue we need to - # wait until a call to schedule wakes us up - self._cond.acquire() - self._cond.wait() - self._cond.release() - else: - # If the task timestamp is in the future the thread needs to wait - # until time elapse or until another task is scheduled - now = strfnow() - if now < task.timestamp: - # Calculate time difference in seconds - timeout = strfdiff(task.timestamp, now) - # Re-schedule task with the same timestamp - self._scheduler.schedule(task) - # Sleep until timeout or until a new task awakes the condition - self._cond.acquire() - self._cond.wait(timeout) - self._cond.release() - else: - # Process tasks in parallel - runner.put(self._execute_task, task) - except: - import traceback - err = traceback.format_exc() - self._logger.error("Error while processing tasks in the EC: %s" % err) - - def _execute_task(self, task): - # Invoke callback - ec = weakref.ref(self) - try: - (task.status, task.result) = task.callback(ec, *task.args, **task.kwargs) - except: - import traceback - err = traceback.format_exc() - self._logger.error("Error while executing event: %s" % err) - - # task marked as FAIL - task.status = tasks.TaskStatus.FAIL - task.result = err - - if task.status == tasks.TaskStatus.RETRY: - # Re-schedule same task in the near future - task.timestamp = strfvalid(_reschedule_delay) - self._schedule(task) - elif task.status == tasks.TaskStatus.RECYCLE: - # Re-schedule t in the future - timestamp = strfvalid(task.result) - self.schedule(timestamp, task.callback, task.args, task.kwargs) + # Instantiate RM + rm = ResourceFactory.create(rtype, self, guid) + + # Store RM + self._resources[guid] = rm + + return guid + + def get_attributes(self, guid): + rm = self._resources[guid] + return rm.get_attributes() + + def get_filters(self, guid): + rm = self._resources[guid] + return rm.get_filters() + + def register_connection(self, guid1, guid2): + rm1 = self._resources[guid1] + rm2 = self._resources[guid2] + + rm1.connect(guid2) + rm2.connect(guid1) + + def register_group(self, guids, gguid = None): + gguid = self._guid_generator.next(gguid) + self._groups[gguid] = guids + + def discover_resource(self, guid, filters): + rm = self._resources[guid] + return rm.discover(filters) + + def provision_resource(self, guid, filters): + rm = self._resources[guid] + return rm.provision(filters) + + def register_start(self, gguid1, time, after_status, gguid2): + if isinstance(gguid1, int): + gguid1 = list[gguid1] + if isinstance(gguid2, int): + gguid2 = list[gguid2] + + for guid1 in gguid1: + for guid2 in gguid2: + rm = self._resources(guid1) + rm.start_after(time, after_status, guid2) + + def register_stop(self, gguid1, time, after_status, gguid2): + if isinstance(gguid1, int): + gguid1 = list[gguid1] + if isinstance(gguid2, int): + gguid2 = list[gguid2] + + for guid1 in gguid1: + for guid2 in gguid2: + rm = self._resources(guid1) + rm.stop_after(time, after_status, guid2) + + def register_set(self, name, value, gguid1, time, after_status, gguid2): + if isinstance(gguid1, int): + gguid1 = list[gguid1] + if isinstance(group2, int): + gguid2 = list[gguid2] + + for guid1 in gguid1: + for guid2 in gguid2: + rm = self._resources(guid1) + rm.set_after(name, value, time, after_status, guid2) + + def get(self, guid, name): + rm = self._resources(guid) + return rm.get(name) + + def set(self, guid, name, value): + rm = self._resources(guid) + return rm.set(name, value) + + def status(self, guid): + rm = self._resources(guid) + return rm.status() + + def stop(self, guid): + rm = self._resources(guid) + return rm.stop() diff --git a/src/neco/execution/resource.py b/src/neco/execution/resource.py index ae9ab792..86d930e5 100644 --- a/src/neco/execution/resource.py +++ b/src/neco/execution/resource.py @@ -1,14 +1,45 @@ +import copy import logging import weakref class Resource(object): + # static template for resource filters + _filters = dict() + + # static template for resource attributes + _attributes = dict() + + @classmethod + def _register_filter(cls, attr): + """ Resource subclasses will invoke this method to add a + filter attribute""" + cls._filters[attr.name] = attr + + @classmethod + def _register_attributes(cls, attr): + """ Resource subclasses will invoke this method to add a + resource attribute""" + cls._attributes[attr.name] = attr + + @classmethod + def get_filters(cls): + return copy.deepcopy(cls._filters.values()) + + @classmethod + def get_attributes(cls): + return copy.deepcopy(cls._attributes.values()) + def __init__(self, ec, guid): self._guid = guid self._ec = weakref.ref(ec) + self._connections = set() + # the resource instance gets a copy of all attributes + # that can modify + self._attrs = copy.deepcopy(self._attributes) # Logging loglevel = "debug" - self._logger = logging.getLogger("neco.execution.Resource.%s" % + self._logger = logging.getLogger("neco.execution.resource.Resource.%s" % self.guid) self._logger.setLevel(getattr(logging, loglevel.upper())) @@ -20,4 +51,48 @@ class Resource(object): def ec(self): return self._ec() + def connect(self, guid): + if (self._validate_connection(guid)): + self._connections.add(guid) + + def discover(self, filters): + pass + + def provision(self, filters): + pass + + def set(self, name, value): + attr = self._attrs[name] + attr.value = value + + def get(self, name): + attr = self._attrs[name] + return attr.value + + def start_after(self, time, after_status, guid): + pass + + def stop_after(self, time, after_status, guid): + pass + + def set_after(self, name, value, time, after_status, guid): + pass + + def stop(self): + pass + + def _validate_connection(self, guid): + # TODO: Validate! + return True + +class ResourceFactory(object): + def __init__(self): + self._resource_types = dict() + + def register_type(self, rtype, rclass): + self._resource_types[rtype] = rclass + + def create(self, rtype, ec, guid): + rclass = self._resource[rtype] + return rclass(ec, guid)