Added attributes for resources. Implemented new API for ec and resource
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Sat, 2 Mar 2013 10:50:54 +0000 (11:50 +0100)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Sat, 2 Mar 2013 10:50:54 +0000 (11:50 +0100)
src/neco/execution/attribute.py [new file with mode: 0644]
src/neco/execution/callbacks.py [deleted file]
src/neco/execution/ec.py
src/neco/execution/resource.py

diff --git a/src/neco/execution/attribute.py b/src/neco/execution/attribute.py
new file mode 100644 (file)
index 0000000..9af394b
--- /dev/null
@@ -0,0 +1,68 @@
+
+### Attribute Types
+class Types:
+    String  = "STRING"
+    Bool    = "BOOL"
+    Enum    = "ENUM"
+    Double  = "DOUBLE"
+    Integer = "INTEGER"
+
+### Attribute Flags
+class Flags:
+    # Attribute can be modified by the user 
+    NoFlags         = 0x00
+    # Attribute is not modifiable by the user
+    ReadOnly        = 0x01
+    # Attribute is an access credential
+    Credential      = 0x02
+
+class Attribute(object):
+    def __init__(self, name, help, type = Types.String,
+            flags = Flags.NoFlags, default_value = None):
+        self._name = name
+        self._help = help
+        self._type = type
+        self._flags = flags
+        self._default = self._value = default_value
+
+    @property
+    def name(self):
+        return self._name
+
+    @property
+    def default(self):
+        return self._default_value
+
+    @property
+    def type(self):
+        return self._type
+
+    @property
+    def help(self):
+        return self._help
+
+    @property
+    def flags(self):
+        return self._flags
+
+    def has_flag(self, flag):
+        return (self._flags & flag) == flag
+
+    def get_value(self):
+        return self._value
+
+    def set_value(self, value):
+        if self.is_valid_value(value):
+            self._value = value
+            self._modified = True
+        else:
+            raise ValueError("Invalid value %s for attribute %s" %
+                    (str(value), self.name))
+
+    value = property(get_value, set_value)
+
+    def is_valid_value(self, value):
+        """ Attribute subclasses will override this method to add 
+        adequate validation"""
+        return True
+
diff --git a/src/neco/execution/callbacks.py b/src/neco/execution/callbacks.py
deleted file mode 100644 (file)
index a30411f..0000000
+++ /dev/null
@@ -1,20 +0,0 @@
-
-def deploy(ec_weakref, xml):
-    from neco.util.parser import XMLParser    
-
-    # parse xml and build topology graph
-    parser = XMLParser()
-    box = parser.from_xml(xml)
-
-    # instantiate resource boxes
-    
-
-    # allocate physical resources
-    # configure physical resources
-    # allocate virtual resources
-    # configure virtual resources
-    # allocate software resources
-    # configure software resources
-    # schedule application start/stop
-
-
index 01a6aba..6d365f6 100644 (file)
@@ -1,16 +1,10 @@
 import logging
 import os
 import sys
-import threading
 import time
-import weakref
 
-from neco.execution import scheduler, tasks
 from neco.util import guid
-from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
-from neco.util.parallel import ParallelRun
-
-_reschedule_delay = "0.1s"
+from neco.resources import ResourceFactory
 
 class ExperimentController(object):
     def __init__(self, root_dir = "/tmp", loglevel = 'error'):
@@ -21,20 +15,11 @@ class ExperimentController(object):
         # generator of globally unique ids
         self._guid_generator = guid.GuidGenerator()
         
-        # Scheduler
-        self._scheduler = scheduler.HeapScheduler()
-
-        # Tasks
-        self._tasks = dict()
-        # Resources
+        # Resource managers
         self._resources = dict()
-       
-        # Event processing thread
-        self._cond = threading.Condition()
-        self._stop = False
-        self._thread = threading.Thread(target = self._process_tasks)
-        self._thread.start()
+
+        # Groups of resources
+        self._groups = dict()
        
         # Logging
         self._logger = logging.getLogger("neco.execution.ec")
@@ -43,122 +28,102 @@ class ExperimentController(object):
     def resource(self, guid):
         return self._resources.get(guid)
 
-    def terminate(self):
-        self._stop = True
-        self._cond.acquire()
-        self._cond.notify()
-        self._cond.release()
-        if self._thread.is_alive():
-           self._thread.join()
-
-    def task_info(self, tid):
-        task = self._tasks.get(tid)
-        if not task:
-            return (None, None)
-        return (task.status, task.result)
-
-    def schedule(self, date, callback, args = None, kwargs = None):
-        """
-            date    string containing execution time for the task.
-                    It can be expressed as an absolute time, using
-                    timestamp format, or as a relative time matching
-                    ^\d+.\d+(h|m|s|ms|us)$
-
-            callback    code to be executed for the task. Must be a
-                        Python function, and receives args and kwargs
-                        as arguments.
-                        The callback will always be invoked passing a 
-                        week reference to the controller as first 
-                        argument.
-                        The callback must return a (status, result) 
-                        tuple where status is one of : 
-                        task.TaskStatus.FAIL, 
-                        task.TaskStatus.SUCCESS, 
-                        task.TaskStatus.RETRY, 
-                        task.TaskStatus.RECYCLE 
-        """
-        timestamp = strfvalid(date)
+    def resources(self):
+        return self._resources.keys()
+
+    def release(self, group = None):
+        # TODO
+        pass
+
+    def deploy(self, group = None):
+        # TODO
+        pass
+
+    def register_resource(self, rtype, guid = None):
+        # Get next available guid
+        guid = self._guid_generator.next(guid)
         
-        args = args or []
-        kwargs = kwargs or {}
-
-        task = tasks.Task(timestamp, callback, args, kwargs)
-        task = self._schedule(task)
-
-        self._tasks[task.id] = task
-
-        return task.id
-
-    ###########################################################################
-    #### Internal methods
-    ###########################################################################
-
-    def _schedule(self, task):
-        task = self._scheduler.schedule(task)
-
-        # Notify condition to wake up the processing thread
-        self._cond.acquire()
-        self._cond.notify()
-        self._cond.release()
-        return task
-     
-    def _process_tasks(self):
-        runner = ParallelRun(maxthreads = 50)
-        runner.start()
-
-        try:
-            while not self._stop:
-                self._cond.acquire()
-                task = self._scheduler.next()
-                self._cond.release()
-
-                if not task:
-                    # It there are not tasks in the tasks queue we need to 
-                    # wait until a call to schedule wakes us up
-                    self._cond.acquire()
-                    self._cond.wait()
-                    self._cond.release()
-                else: 
-                    # If the task timestamp is in the future the thread needs to wait
-                    # until time elapse or until another task is scheduled
-                    now = strfnow()
-                    if now < task.timestamp:
-                        # Calculate time difference in seconds
-                        timeout = strfdiff(task.timestamp, now)
-                        # Re-schedule task with the same timestamp
-                        self._scheduler.schedule(task)
-                        # Sleep until timeout or until a new task awakes the condition
-                        self._cond.acquire()
-                        self._cond.wait(timeout)
-                        self._cond.release()
-                    else:
-                        # Process tasks in parallel
-                        runner.put(self._execute_task, task)
-        except:  
-            import traceback
-            err = traceback.format_exc()
-            self._logger.error("Error while processing tasks in the EC: %s" % err)
-    def _execute_task(self, task):
-        # Invoke callback
-        ec = weakref.ref(self)
-        try:
-            (task.status, task.result) = task.callback(ec, *task.args, **task.kwargs)
-        except:
-            import traceback
-            err = traceback.format_exc()
-            self._logger.error("Error while executing event: %s" % err)
-
-            # task marked as FAIL
-            task.status = tasks.TaskStatus.FAIL
-            task.result = err
-
-        if task.status == tasks.TaskStatus.RETRY:
-            # Re-schedule same task in the near future
-            task.timestamp = strfvalid(_reschedule_delay)
-            self._schedule(task)
-        elif task.status == tasks.TaskStatus.RECYCLE:
-            # Re-schedule t in the future
-            timestamp = strfvalid(task.result)
-            self.schedule(timestamp, task.callback, task.args, task.kwargs)
+        # Instantiate RM
+        rm = ResourceFactory.create(rtype, self, guid)
+
+        # Store RM
+        self._resources[guid] = rm
+
+        return guid
+
+    def get_attributes(self, guid):
+        rm = self._resources[guid]
+        return rm.get_attributes()
+
+    def get_filters(self, guid):
+        rm = self._resources[guid]
+        return rm.get_filters()
+
+    def register_connection(self, guid1, guid2):
+        rm1 = self._resources[guid1]
+        rm2 = self._resources[guid2]
+
+        rm1.connect(guid2)
+        rm2.connect(guid1)
+
+    def register_group(self, guids, gguid = None):
+        gguid = self._guid_generator.next(gguid)
+        self._groups[gguid] = guids
+
+    def discover_resource(self, guid, filters):
+        rm = self._resources[guid]
+        return rm.discover(filters)
+
+    def provision_resource(self, guid, filters):
+        rm = self._resources[guid]
+        return rm.provision(filters)
+
+    def register_start(self, gguid1, time, after_status, gguid2):
+        if isinstance(gguid1, int):
+            gguid1 = list[gguid1]
+        if isinstance(gguid2, int):
+            gguid2 = list[gguid2]
+
+        for guid1 in gguid1:
+            for guid2 in gguid2:
+                rm = self._resources(guid1)
+                rm.start_after(time, after_status, guid2)
+
+    def register_stop(self, gguid1, time, after_status, gguid2):
+        if isinstance(gguid1, int):
+            gguid1 = list[gguid1]
+        if isinstance(gguid2, int):
+            gguid2 = list[gguid2]
+
+        for guid1 in gguid1:
+            for guid2 in gguid2:
+                rm = self._resources(guid1)
+                rm.stop_after(time, after_status, guid2)
+
+    def register_set(self, name, value, gguid1, time, after_status, gguid2):
+        if isinstance(gguid1, int):
+            gguid1 = list[gguid1]
+        if isinstance(group2, int):
+            gguid2 = list[gguid2]
+
+        for guid1 in gguid1:
+            for guid2 in gguid2:
+                rm = self._resources(guid1)
+                rm.set_after(name, value, time, after_status, guid2)
+
+    def get(self, guid, name):
+        rm = self._resources(guid)
+        return rm.get(name)
+
+    def set(self, guid, name, value):
+        rm = self._resources(guid)
+        return rm.set(name, value)
+
+    def status(self, guid):
+        rm = self._resources(guid)
+        return rm.status()
+
+    def stop(self, guid):
+        rm = self._resources(guid)
+        return rm.stop()
 
index ae9ab79..86d930e 100644 (file)
@@ -1,14 +1,45 @@
+import copy
 import logging
 import weakref
 
 class Resource(object):
+    # static template for resource filters
+    _filters = dict()
+    
+    # static template for resource attributes
+    _attributes = dict()
+
+    @classmethod
+    def _register_filter(cls, attr):
+        """ Resource subclasses will invoke this method to add a 
+        filter attribute"""
+        cls._filters[attr.name] = attr
+
+    @classmethod
+    def _register_attributes(cls, attr):
+        """ Resource subclasses will invoke this method to add a 
+        resource attribute"""
+        cls._attributes[attr.name] = attr
+
+    @classmethod
+    def get_filters(cls):
+        return copy.deepcopy(cls._filters.values())
+
+    @classmethod
+    def get_attributes(cls):
+        return copy.deepcopy(cls._attributes.values())
+
     def __init__(self, ec, guid):
         self._guid = guid
         self._ec = weakref.ref(ec)
+        self._connections = set()
+        # the resource instance gets a copy of all attributes
+        # that can modify
+        self._attrs = copy.deepcopy(self._attributes)
 
         # Logging
         loglevel = "debug"
-        self._logger = logging.getLogger("neco.execution.Resource.%s" % 
+        self._logger = logging.getLogger("neco.execution.resource.Resource.%s" % 
             self.guid)
         self._logger.setLevel(getattr(logging, loglevel.upper()))
 
@@ -20,4 +51,48 @@ class Resource(object):
     def ec(self):
         return self._ec()
 
+    def connect(self, guid):
+        if (self._validate_connection(guid)):
+            self._connections.add(guid)
+
+    def discover(self, filters):
+        pass
+
+    def provision(self, filters):
+        pass
+
+    def set(self, name, value):
+        attr = self._attrs[name]
+        attr.value = value
+
+    def get(self, name):
+        attr = self._attrs[name]
+        return attr.value
+
+    def start_after(self, time, after_status, guid):
+        pass
+
+    def stop_after(self, time, after_status, guid):
+        pass
+
+    def set_after(self, name, value, time, after_status, guid):
+        pass
+
+    def stop(self):
+        pass
+
+    def _validate_connection(self, guid):
+        # TODO: Validate!
+        return True
+
+class ResourceFactory(object):
+    def __init__(self):
+        self._resource_types = dict()
+
+    def register_type(self, rtype, rclass):
+        self._resource_types[rtype] = rclass
+
+    def create(self, rtype, ec, guid):
+        rclass = self._resource[rtype]
+        return rclass(ec, guid)