Debug the deploy part and start with condition
authorJulien Tribino <julien.tribino@inria.fr>
Mon, 22 Apr 2013 10:37:17 +0000 (12:37 +0200)
committerJulien Tribino <julien.tribino@inria.fr>
Mon, 22 Apr 2013 10:37:17 +0000 (12:37 +0200)
13 files changed:
examples/automated_vlc_experiment_plexus.py [new file with mode: 0644]
examples/manual_vlc_experiment_plexus.py [new file with mode: 0644]
src/neco/execution/ec.py
src/neco/execution/resource.py
src/neco/resources/linux/application.py
src/neco/resources/omf/omf_api.py
src/neco/resources/omf/omf_application.py
src/neco/resources/omf/omf_channel.py
src/neco/resources/omf/omf_client.py
src/neco/resources/omf/omf_interface.py
src/neco/resources/omf/omf_messages_5_4.py
src/neco/resources/omf/omf_node.py
test/resources/omf/omf_vlc_exp.py

diff --git a/examples/automated_vlc_experiment_plexus.py b/examples/automated_vlc_experiment_plexus.py
new file mode 100644 (file)
index 0000000..2337531
--- /dev/null
@@ -0,0 +1,128 @@
+#!/usr/bin/env python
+from neco.execution.resource import ResourceFactory, ResourceAction, ResourceState
+from neco.execution.ec import ExperimentController
+
+from neco.resources.omf.omf_node import OMFNode
+from neco.resources.omf.omf_application import OMFApplication
+from neco.resources.omf.omf_interface import OMFWifiInterface
+from neco.resources.omf.omf_channel import OMFChannel
+
+import logging
+import time
+
+logging.basicConfig()
+
+# Create the EC
+ec = ExperimentController()
+
+# Register the different RM that will be used
+ResourceFactory.register_type(OMFNode)
+ResourceFactory.register_type(OMFWifiInterface)
+ResourceFactory.register_type(OMFChannel)
+ResourceFactory.register_type(OMFApplication)
+
+# Create and Configure the Nodes
+node1 = ec.register_resource("OMFNode")
+ec.set(node1, 'hostname', 'omf.plexus.wlab17')
+ec.set(node1, 'xmppSlice', "nepi")
+ec.set(node1, 'xmppHost', "xmpp-plexus.onelab.eu")
+ec.set(node1, 'xmppPort', "5222")
+ec.set(node1, 'xmppPassword', "1234")
+
+node2 = ec.register_resource("OMFNode")
+ec.set(node2, 'hostname', "omf.plexus.wlab37")
+ec.set(node2, 'xmppSlice', "nepi")
+ec.set(node2, 'xmppHost', "xmpp-plexus.onelab.eu")
+ec.set(node2, 'xmppPort', "5222")
+ec.set(node2, 'xmppPassword', "1234")
+
+# Create and Configure the Interfaces
+iface1 = ec.register_resource("OMFWifiInterface")
+ec.set(iface1, 'alias', "w0")
+ec.set(iface1, 'mode', "adhoc")
+ec.set(iface1, 'type', "g")
+ec.set(iface1, 'essid', "vlcexp")
+#ec.set(iface1, 'ap', "11:22:33:44:55:66")
+ec.set(iface1, 'ip', "10.0.0.17")
+ec.set(iface1, 'xmppSlice', "nepi")
+ec.set(iface1, 'xmppHost', "xmpp-plexus.onelab.eu")
+ec.set(iface1, 'xmppPort', "5222")
+ec.set(iface1, 'xmppPassword', "1234")
+
+iface2 = ec.register_resource("OMFWifiInterface")
+ec.set(iface2, 'alias', "w0")
+ec.set(iface2, 'mode', "adhoc")
+ec.set(iface2, 'type', 'g')
+ec.set(iface2, 'essid', "vlcexp")
+#ec.set(iface2, 'ap', "11:22:33:44:55:66")
+ec.set(iface2, 'ip', "10.0.0.37")
+ec.set(iface2, 'xmppSlice', "nepi")
+ec.set(iface2, 'xmppHost', "xmpp-plexus.onelab.eu")
+ec.set(iface2, 'xmppPort', "5222")
+ec.set(iface2, 'xmppPassword', "1234")
+
+# Create and Configure the Channel
+channel = ec.register_resource("OMFChannel")
+ec.set(channel, 'channel', "6")
+ec.set(channel, 'xmppSlice', "nepi")
+ec.set(channel, 'xmppHost', "xmpp-plexus.onelab.eu")
+ec.set(channel, 'xmppPort', "5222")
+ec.set(channel, 'xmppPassword', "1234")
+
+# Create and Configure the Application
+app1 = ec.register_resource("OMFApplication")
+ec.set(app1, 'appid', 'Vlc#1')
+ec.set(app1, 'path', "/opt/vlc-1.1.13/cvlc")
+ec.set(app1, 'args', "/opt/10-by-p0d.avi --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts}'")
+ec.set(app1, 'env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
+ec.set(app1, 'xmppSlice', "nepi")
+ec.set(app1, 'xmppHost', "xmpp-plexus.onelab.eu")
+ec.set(app1, 'xmppPort', "5222")
+ec.set(app1, 'xmppPassword', "1234")
+
+app2 = ec.register_resource("OMFApplication")
+ec.set(app2, 'appid', 'Vlc#2')
+ec.set(app2, 'path', "/opt/vlc-1.1.13/cvlc")
+ec.set(app2, 'args', "rtp://10.0.0.37:1234")
+ec.set(app2, 'env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
+ec.set(app2, 'xmppSlice', "nepi")
+ec.set(app2, 'xmppHost', "xmpp-plexus.onelab.eu")
+ec.set(app2, 'xmppPort', "5222")
+ec.set(app2, 'xmppPassword', "1234")
+
+app3 = ec.register_resource("OMFApplication")
+ec.set(app3, 'appid', 'Kill#2')
+ec.set(app3, 'path', "/usr/bin/killall")
+ec.set(app3, 'args', "vlc")
+ec.set(app3, 'env', " ")
+ec.set(app3, 'xmppSlice', "nepi")
+ec.set(app3, 'xmppHost', "xmpp-plexus.onelab.eu")
+ec.set(app3, 'xmppPort', "5222")
+ec.set(app3, 'xmppPassword', "1234")
+
+# Connection
+ec.register_connection(app3, node1)
+ec.register_connection(app1, node1)
+ec.register_connection(node1, iface1)
+ec.register_connection(iface1, channel)
+ec.register_connection(iface2, channel)
+ec.register_connection(node2, iface2)
+ec.register_connection(app2, node2)
+
+# Condition
+#      Topology behaviour : It should not be done by the user, but ....
+ec.register_condition([iface1, iface2, channel], ResourceAction.START, [node1, node2], ResourceState.STARTED , 2)
+ec.register_condition(channel, ResourceAction.START, [iface1, iface2], ResourceState.STARTED , 1)
+ec.register_condition(app1, ResourceAction.START, channel, ResourceState.STARTED , 1)
+
+#      User Behaviour
+ec.register_condition(app2, ResourceAction.START, app1, ResourceState.STARTED , 4)
+ec.register_condition([app1, app2], ResourceAction.STOP, app2, ResourceState.STARTED , 20)
+ec.register_condition(app3, ResourceAction.START, app2, ResourceState.STARTED , 25)
+
+# Deploy
+ec.deploy()
+
+# Stop Experiment
+time.sleep(45)
+ec.shutdown()
diff --git a/examples/manual_vlc_experiment_plexus.py b/examples/manual_vlc_experiment_plexus.py
new file mode 100644 (file)
index 0000000..ebdd8e1
--- /dev/null
@@ -0,0 +1,170 @@
+#!/usr/bin/env python
+from neco.execution.resource import ResourceFactory
+from neco.execution.ec import ExperimentController
+
+from neco.resources.omf.omf_node import OMFNode
+from neco.resources.omf.omf_application import OMFApplication
+from neco.resources.omf.omf_interface import OMFWifiInterface
+from neco.resources.omf.omf_channel import OMFChannel
+
+import logging
+import time
+
+logging.basicConfig()
+
+# Create the EC
+ec = ExperimentController()
+
+# Register the different RM that will be used
+ResourceFactory.register_type(OMFNode)
+ResourceFactory.register_type(OMFWifiInterface)
+ResourceFactory.register_type(OMFChannel)
+ResourceFactory.register_type(OMFApplication)
+
+# Create and Configure the Nodes
+guid = ec.register_resource("OMFNode")
+node1 = ec.get_resource(guid)
+node1.set('hostname', 'omf.plexus.wlab17')
+node1.set('xmppSlice', "nepi")
+node1.set('xmppHost', "xmpp-plexus.onelab.eu")
+node1.set('xmppPort', "5222")
+node1.set('xmppPassword', "1234")
+
+guid = ec.register_resource("OMFNode")
+node2 = ec.get_resource(guid)
+node2.set('hostname', "omf.plexus.wlab37")
+node2.set('xmppSlice', "nepi")
+node2.set('xmppHost', "xmpp-plexus.onelab.eu")
+node2.set('xmppPort', "5222")
+node2.set('xmppPassword', "1234")
+
+# Create and Configure the Interfaces
+guid = ec.register_resource("OMFWifiInterface")
+iface1 = ec.get_resource(guid)
+iface1.set('alias', "w0")
+iface1.set('mode', "adhoc")
+iface1.set('type', "g")
+iface1.set('essid', "helloworld")
+iface1.set('ip', "10.0.0.17")
+iface1.set('xmppSlice', "nepi")
+iface1.set('xmppHost', "xmpp-plexus.onelab.eu")
+iface1.set('xmppPort', "5222")
+iface1.set('xmppPassword', "1234")
+
+guid = ec.register_resource("OMFWifiInterface")
+iface2 = ec.get_resource(guid)
+iface2.set('alias', "w0")
+iface2.set('mode', "adhoc")
+iface2.set('type', 'g')
+iface2.set('essid', "helloworld")
+iface2.set('ip', "10.0.0.37")
+iface2.set('xmppSlice', "nepi")
+iface2.set('xmppHost', "xmpp-plexus.onelab.eu")
+iface2.set('xmppPort', "5222")
+iface2.set('xmppPassword', "1234")
+
+# Create and Configure the Channel
+guid = ec.register_resource("OMFChannel")
+channel = ec.get_resource(guid)
+channel.set('channel', "6")
+channel.set('xmppSlice', "nepi")
+channel.set('xmppHost', "xmpp-plexus.onelab.eu")
+channel.set('xmppPort', "5222")
+channel.set('xmppPassword', "1234")
+
+# Create and Configure the Application
+guid = ec.register_resource("OMFApplication")
+app1 = ec.get_resource(guid)
+app1.set('appid', 'Vlc#1')
+app1.set('path', "/opt/vlc-1.1.13/cvlc")
+app1.set('args', "/opt/10-by-p0d.avi --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts}'")
+app1.set('env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
+app1.set('xmppSlice', "nepi")
+app1.set('xmppHost', "xmpp-plexus.onelab.eu")
+app1.set('xmppPort', "5222")
+app1.set('xmppPassword', "1234")
+
+guid = ec.register_resource("OMFApplication")
+app2 = ec.get_resource(guid)
+app2.set('appid', 'Vlc#2')
+app2.set('path', "/opt/vlc-1.1.13/cvlc")
+app2.set('args', "rtp://10.0.0.37:1234")
+app2.set('env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
+app2.set('xmppSlice', "nepi")
+app2.set('xmppHost', "xmpp-plexus.onelab.eu")
+app2.set('xmppPort', "5222")
+app2.set('xmppPassword', "1234")
+
+guid = ec.register_resource("OMFApplication")
+app3 = ec.get_resource(guid)
+app3.set('appid', 'Kill#2')
+app3.set('path', "/usr/bin/killall")
+app3.set('args', "vlc")
+app3.set('env', " ")
+app3.set('xmppSlice', "nepi")
+app3.set('xmppHost', "xmpp-plexus.onelab.eu")
+app3.set('xmppPort', "5222")
+app3.set('xmppPassword', "1234")
+
+# Connection
+app3.connect(node1.guid)
+node1.connect(app3.guid)
+
+app1.connect(node1.guid)
+node1.connect(app1.guid)
+
+node1.connect(iface1.guid)
+iface1.connect(node1.guid)
+
+iface1.connect(channel.guid)
+channel.connect(iface1.guid)
+
+channel.connect(iface2.guid)
+iface2.connect(channel.guid)
+
+iface2.connect(node2.guid)
+node2.connect(iface2.guid)
+
+node2.connect(app2.guid)
+app2.connect(node2.guid)
+
+# Local Deploy
+node1.deploy()
+node2.deploy()
+iface1.deploy()
+iface2.deploy()
+channel.deploy()
+app1.deploy()
+app2.deploy()
+app3.deploy()
+
+# Start the Nodes
+node1.start()
+node2.start()
+time.sleep(2)
+
+# Start the Interfaces
+iface1.start()
+iface2.start()
+
+# Start the Channel
+time.sleep(2)
+channel.start()
+time.sleep(2)
+
+# Start the Application
+app1.start()
+time.sleep(2)
+app2.start()
+
+time.sleep(20)
+
+# Stop the Application
+app1.stop()
+app2.stop()
+time.sleep(1)
+app3.start()
+time.sleep(2)
+
+# Stop Experiment
+ec.shutdown()
index 1b7d4be..a7a33d8 100644 (file)
@@ -25,6 +25,9 @@ class ExperimentController(object):
         # Resource managers
         self._resources = dict()
 
+        # Resource managers
+        self._group = dict()
+
         # Scheduler
         self._scheduler = HeapScheduler()
 
@@ -51,18 +54,28 @@ class ExperimentController(object):
     def resources(self):
         return self._resources.keys()
 
-    def register_resource(self, rtype, guid = None, creds = None):
+    def register_resource(self, rtype, guid = None):
         # Get next available guid
         guid = self._guid_generator.next(guid)
         
         # Instantiate RM
-        rm = ResourceFactory.create(rtype, self, guid, creds)
+        rm = ResourceFactory.create(rtype, self, guid)
 
         # Store RM
         self._resources[guid] = rm
 
         return guid
 
+    def create_group(self, *args):
+        guid = self._guid_generator.next(guid)
+
+        grp = [arg for arg in args]
+
+        self._resources[guid] = grp
+
+        return guid
+
     def get_attributes(self, guid):
         rm = self.get_resource(guid)
         return rm.get_attributes()
@@ -100,12 +113,12 @@ class ExperimentController(object):
 
         """
         if isinstance(group1, int):
-            group1 = list[group1]
+            group1 = [group1]
         if isinstance(group2, int):
-            group2 = list[group2]
+            group2 = [group2]
 
         for guid1 in group1:
-            rm = self.get_resource(guid)
+            rm = self.get_resource(guid1)
             rm.register_condition(action, group2, state, time)
 
     def discover(self, guid, filters):
@@ -165,9 +178,9 @@ class ExperimentController(object):
 
         """
         if isinstance(group1, int):
-            group1 = list[group1]
+            group1 = [group1]
         if isinstance(group2, int):
-            group2 = list[group2]
+            group2 = [group2]
 
         for guid1 in group1:
             rm = self.get_resource(guid)
@@ -181,13 +194,13 @@ class ExperimentController(object):
         rm = self.get_resource(guid)
         return rm.start_with_condition()
 
-    def deploy(self, group = None, wait_all_ready = True):
+    def deploy(self, group = None, wait_all_deployed = True):
         """ Deploy all resource manager in group
 
         :param group: List of guids of RMs to deploy
         :type group: list
 
-        :param wait_all_ready: Wait until all RMs are deployed in
+        :param wait_all_deployed: Wait until all RMs are deployed in
             order to start the RMs
         :type guid: int
 
@@ -208,13 +221,13 @@ class ExperimentController(object):
         for guid in group:
             rm = self.get_resource(guid)
 
-            if wait_all_ready:
+            if wait_all_deployed:
                 towait = list(group)
                 towait.remove(guid)
                 self.register_condition(guid, ResourceAction.START, 
                         towait, ResourceState.DEPLOYED)
 
-            thread = threading.Thread(target = steps, args = (rm))
+            thread = threading.Thread(target = steps, args = (rm,))
             threads.append(thread)
             thread.start()
 
index deaa063..ce77568 100644 (file)
@@ -5,6 +5,7 @@ import copy
 import functools
 import logging
 import weakref
+import time as TIME
 
 _reschedule_delay = "1s"
 
@@ -34,29 +35,43 @@ class ResourceManager(object):
     @classmethod
     def _register_filter(cls, attr):
         """ Resource subclasses will invoke this method to add a 
-        filter attribute"""
+        filter attribute
+
+        """
         cls._filters[attr.name] = attr
 
     @classmethod
     def _register_attribute(cls, attr):
         """ Resource subclasses will invoke this method to add a 
-        resource attribute"""
+        resource attribute
+
+        """
         cls._attributes[attr.name] = attr
 
     @classmethod
     def _register_filters(cls):
         """ Resource subclasses will invoke this method to add a 
-        filter attribute"""
+        filter attribute
+
+        """
         pass
 
     @classmethod
     def _register_attributes(cls):
         """ Resource subclasses will invoke this method to add a 
-        resource attribute"""
+        resource attribute
+
+        """
         pass
 
     @classmethod
     def _clsinit(cls):
+        """ Create a new dictionnary instance of the dictionnary 
+        with the same template.
+        Each ressource should have the same registration dictionary
+        template with different instances.
+        """
         # static template for resource filters
         cls._filters = dict()
         cls._register_filters()
@@ -71,10 +86,16 @@ class ResourceManager(object):
 
     @classmethod
     def get_filters(cls):
+        """ Returns a copy of the filters
+
+        """
         return copy.deepcopy(cls._filters.values())
 
     @classmethod
     def get_attributes(cls):
+        """ Returns a copy of the attributes
+
+        """
         return copy.deepcopy(cls._attributes.values())
 
     def __init__(self, ec, guid):
@@ -113,7 +134,7 @@ class ResourceManager(object):
         return self._connections
 
     @property
-    def conditons(self):
+    def conditions(self):
         return self._conditions
 
     @property
@@ -140,6 +161,9 @@ class ResourceManager(object):
         pass
 
     def start(self):
+        """ Start the Resource Manager
+
+        """
         if not self._state in [ResourceState.DEPLOYED, ResourceState.STOPPED]:
             self.logger.error("Wrong state %s for start" % self.state)
 
@@ -147,6 +171,9 @@ class ResourceManager(object):
         self._state = ResourceState.STARTED
 
     def stop(self):
+        """ Start the Resource Manager
+
+        """
         if not self._state in [ResourceState.STARTED]:
             self.logger.error("Wrong state %s for stop" % self.state)
 
@@ -154,21 +181,63 @@ class ResourceManager(object):
         self._state = ResourceState.STOPPED
 
     def set(self, name, value):
+        """ Set the value of the attribute
+
+        :param name: Name of the attribute
+        :type name: str
+        :param name: Value of the attribute
+        :type name: str
+        :rtype:  Boolean
+        """
         attr = self._attrs[name]
         attr.value = value
 
     def get(self, name):
+        """ Start the Resource Manager
+
+        :param name: Name of the attribute
+        :type name: str
+        :rtype: str
+        """
         attr = self._attrs[name]
         return attr.value
 
     def register_condition(self, action, group, state, 
             time = None):
+        """ Do the 'action' after 'time' on the current RM when 'group' 
+         reach the state 'state'
+
+        :param action: Action to do. Either 'START' or 'STOP'
+        :type action: str
+        :param group: group of RM
+        :type group: str
+        :param state: RM that are part of the condition
+        :type state: list
+        :param time: Time to wait after the state is reached (ex : '2s' )
+        :type time: str
+
+        """
         if action not in self.conditions:
             self._conditions[action] = set()
 
-        self.conditions.get(action).add((group, state, time))
+        # We need to use only sequence inside a set and not a list. 
+        # As group is a list, we need to change it.
+        #print (tuple(group), state, time)
+        self.conditions.get(action).add((tuple(group), state, time))
 
     def _needs_reschedule(self, group, state, time):
+        """ Internal method that verify if 'time' has elapsed since 
+        all elements in 'group' have reached state 'state'.
+
+        :param group: RM that are part of the condition
+        :type group: list
+        :param state: State that group need to reach for the condtion
+        :type state: str
+        :param time: time to wait after the state
+        :type time: str
+
+
+        """
         reschedule = False
         delay = _reschedule_delay 
 
@@ -182,29 +251,46 @@ class ResourceManager(object):
                 break
 
             if time:
-                if state == ResourceAction.START:
+                if state == ResourceState.STARTED:
                     t = rm.start_time
-                elif state == ResourceAction.STOP:
+                elif state == ResourceState.STOPPED:
                     t = rm.stop_time
                 else:
                     # Only keep time information for START and STOP
                     break
 
-                delay = strfdiff(t, strnow()
-                if delay < time:
+                d = strfdiff(strfnow(), t
+                if d < time:
                     reschedule = True
+                    delay = "%ds" % (int(time - d) +1)
                     break
-
         return reschedule, delay
 
     def set_with_conditions(self, name, value, group, state, time):
+        """ Set value 'value' on attribute with name 'name' when 'time' 
+            has elapsed since all elements in 'group' have reached state
+           'state'.
+
+        :param name: Name of the attribute
+        :type name: str
+        :param name: Value of the attribute
+        :type name: str
+        :param group: RM that are part of the condition
+        :type group: list
+        :param state: State that group need to reach before set
+        :type state: str
+        :param time: Time to wait after the state is reached (ex : '2s' )
+        :type time: str
+
+        """
+
         reschedule = False
         delay = _reschedule_delay 
 
         ## evaluate if set conditions are met
 
         # only can set with conditions after the RM is started
-        if self.status != ResourceStatus.STARTED:
+        if self.state != ResourceState.STARTED:
             reschedule = True
         else:
             reschedule, delay = self._needs_reschedule(group, state, time)
@@ -217,60 +303,93 @@ class ResourceManager(object):
             self.set(name, value)
 
     def start_with_conditions(self):
+        """ Starts when all the conditions are reached
+
+        """
         reschedule = False
         delay = _reschedule_delay 
 
         ## evaluate if set conditions are met
 
         # only can start when RM is either STOPPED or DEPLOYED
-        if self.status not in [ResourceStatus.STOPPED, ResourceStatus.DEPLOYED]:
+        if self.state not in [ResourceState.STOPPED, ResourceState.DEPLOYED]:
             reschedule = True
         else:
-            for action, (group, state, time) in self.conditions.iteritems():
-                if action == ResourceAction.START:
-                    reschedule, delay = self._needs_reschedule(group, state, time)   
+            print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") -----  start condition : " + str(self.conditions.items())
+            # Need to separate because it could have more that tuple of condition 
+            # for the same action.
+            if self.conditions.get(ResourceAction.START): 
+                for (group, state, time) in self.conditions.get(ResourceAction.START):
+                    reschedule, delay = self._needs_reschedule(group, state, time)
                     if reschedule:
                         break
 
         if reschedule:
-            callback = functools.partial(self.start_with_conditions, 
-                    group, state, time)
+            callback = functools.partial(self.start_with_conditions)
             self.ec.schedule(delay, callback)
         else:
+            print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----\
+------------------------------------------------------------------------------\
+----------------------------------------------------------------  STARTING -- "
             self.start()
 
     def stop_with_conditions(self):
+        """ Starts when all the conditions are reached
+
+        """
         reschedule = False
         delay = _reschedule_delay 
 
         ## evaluate if set conditions are met
 
-        # only can start when RM is either STOPPED or DEPLOYED
-        if self.status != ResourceStatus.STARTED:
+        # only can stop when RM is STARTED
+        if self.state != ResourceState.STARTED:
             reschedule = True
         else:
-            for action, (group, state, time) in self.conditions.iteritems():
-                if action == ResourceAction.STOP:
-                    reschedule, delay = self._needs_reschedule(group, state, time)   
-                    if reschedule:
-                        break
+            print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +")  ----  stop condition : " + str(self.conditions.items())
+            # Need to separate because it could have more that tuple of condition 
+            # for the same action.
+            conditions =  self.conditions.get(ResourceAction.STOP, []) 
+            for (group, state, time) in conditions:
+                reschedule, delay = self._needs_reschedule(group, state, time)
+                if reschedule:
+                    break
+
+        #else:
+        #    for action, (group, state, time) in self.conditions.iteritems():
+        #        if action == ResourceAction.STOP:
+        #            reschedule, delay = self._needs_reschedule(group, state, time)   
+        #            if reschedule:
+        #                break
 
         if reschedule:
-            callback = functools.partial(self.stop_with_conditions, 
-                    group, state, time)
+            callback = functools.partial(self.stop_with_conditions)
             self.ec.schedule(delay, callback)
         else:
             self.stop()
 
     def deploy(self):
+        """Execute all the differents steps required to reach the state DEPLOYED
+
+        """
         self.discover()
         self.provision()
         self._state = ResourceState.DEPLOYED
 
     def release(self):
+        """Clean the resource at the end of the Experiment and change the status
+
+        """
         self._state = ResourceState.RELEASED
 
     def _validate_connection(self, guid):
+        """Check if the connection is available.
+
+        :param guid: Guid of the current Resource Manager
+        :type guid: int
+        :rtype:  Boolean
+
+        """
         # TODO: Validate!
         return True
 
index de4f850..2005617 100644 (file)
@@ -147,7 +147,7 @@ class LinuxApplication(ResourceManager):
         return True
         # XXX: What if it is connected to more than one node?
         resources = self.find_resources(exact_tags = [tags.NODE])
-        self._node = resources[0] is len(resources) == 1 else None
+        self._node = resources[0] if len(resources) == 1 else None
         return self._node
 
 
index 38aa72a..7c71090 100644 (file)
@@ -3,6 +3,9 @@ import logging
 import ssl
 import sys
 import time
+import hashlib
+import neco
+import threading
 
 from neco.resources.omf.omf_client import OMFClient
 from neco.resources.omf.omf_messages_5_4 import MessageHandler
@@ -28,6 +31,20 @@ class OMFAPI(object):
 
     """
     def __init__(self, slice, host, port, password, xmpp_root = None):
+        """
+    
+        :param slice: Xmpp Slice
+        :type slice: Str
+        :param host: Xmpp Server
+        :type host: Str
+        :param port: Xmpp Port
+        :type port: Str
+        :param password: Xmpp password
+        :type password: Str
+        :param xmpp_root: Root of the Xmpp Topic Architecture
+        :type xmpp_root: Str
+
+        """
         date = datetime.datetime.now().strftime("%Y-%m-%dt%H.%M.%S")
         tz = -time.altzone if time.daylight != 0 else -time.timezone
         date += "%+06.2f" % (tz / 3600) # timezone difference is in seconds
@@ -39,7 +56,8 @@ class OMFAPI(object):
         self._hostnames = []
         self._xmpp_root = xmpp_root or "OMF_5.4"
 
-        self._logger = logging.getLogger("neco.resources.omf")
+        self._logger = logging.getLogger("neco.omf.omfApi    ")
+        self._logger.setLevel(neco.LOGLEVEL)
 
         # OMF xmpp client
         self._client = None
@@ -70,7 +88,7 @@ class OMFAPI(object):
         xmpp.ssl_version = ssl.PROTOCOL_SSLv3
 
         if xmpp.connect((self._host, self._port)):
-            xmpp.process(threaded=True)
+            xmpp.process(block=False)
             while not xmpp.ready:
                 time.sleep(1)
             self._client = xmpp
@@ -96,8 +114,8 @@ class OMFAPI(object):
 
         """
         address = "/%s/%s/%s/%s" % (self._host, self._xmpp_root, self._slice, self._user)
-        print address
-        payload = self._message.newexpfunction(self._user, address)
+        #print address
+        payload = self._message.newexp_function(self._user, address)
         slice_sid = "/%s/%s" % (self._xmpp_root, self._slice)
         self._client.publish(payload, slice_sid)
 
@@ -109,7 +127,7 @@ class OMFAPI(object):
         self._client.create(xmpp_node)
         self._client.subscribe(xmpp_node)
 
-        payload = self._message.logfunction("2", 
+        payload = self._message.log_function("2", 
                 "nodeHandler::NodeHandler", 
                 "INFO", 
                 "OMF Experiment Controller 5.4 (git 529a626)")
@@ -181,7 +199,7 @@ class OMFAPI(object):
         xmpp_node =  self._host_resource_id(hostname)
         self._client.subscribe(xmpp_node)
 
-        payload = self._message.enrollfunction("1", "*", "1", hostname)
+        payload = self._message.enroll_function("1", "*", "1", hostname)
         self._client.publish(payload, xmpp_node)
 
     def configure(self, hostname, attribute, value):
@@ -195,7 +213,7 @@ class OMFAPI(object):
         :type value: str
 
         """
-        payload = self._message.configurefunction(hostname, value, attribute)
+        payload = self._message.configure_function(hostname, value, attribute)
         xmpp_node =  self._host_session_id(hostname)
         self._client.publish(payload, xmpp_node)
 
@@ -214,7 +232,7 @@ class OMFAPI(object):
         :type env: str
 
         """
-        payload = self._message.executefunction(hostname, app_id, arguments, path, env)
+        payload = self._message.execute_function(hostname, app_id, arguments, path, env)
         xmpp_node =  self._host_session_id(hostname)
         self._client.publish(payload, xmpp_node)
 
@@ -227,34 +245,42 @@ class OMFAPI(object):
         :type app_id: str
 
         """
-        payload = self._message.exitfunction(hostname, app_id)
+        payload = self._message.exit_function(hostname, app_id)
         xmpp_node =  self._host_session_id(hostname)
         self._client.publish(payload, xmpp_node)
 
-    def disconnect(self):
-        """ Delete the sesion and logger topic and disconnect 
+    def release(self, hostname):
+        """ Delete the session and logger topics. Then disconnect 
+
+        """
+        if hostname in self._hostnames:
+            self.delete(hostname)
+
+    def disconnect(self) :
+        """ Delete the session and logger topics. Then disconnect 
 
         """
         self._client.delete(self._exp_session_id)
         self._client.delete(self._logger_session_id)
 
-        for hostname in self._hostnames[:]:
-            self.delete(hostname)
-
         time.sleep(1)
-        self._client.disconnect()
+        
+        # Wait the send queue to be empty before disconnect
+        self._client.disconnect(wait=True)
+        self._logger.debug(" Disconnected from XMPP Server")
 
 
 class OMFAPIFactory(object):
     """ 
     .. note::
 
-        It allows the different RM to use the same xmpp client if they use the same credentials. For the moment, it is focused on Xmpp.
+        It allows the different RM to use the same xmpp client if they use the same credentials. 
+        For the moment, it is focused on Xmpp.
 
     """
-
-    # XXX: put '_apis' instead of '_Api'
-    _Api = dict()
+    # use lock to avoid concurrent access to the Api list at the same times by 2 different threads
+    lock = threading.Lock()
+    _apis = dict()
 
     @classmethod 
     def get_api(cls, slice, host, port, password):
@@ -271,11 +297,16 @@ class OMFAPIFactory(object):
 
         """
         if slice and host and port and password:
-            key = cls._hash_api(slice, host, port)
-            if key in cls._Api:
-                return cls._Api[key]
+            key = cls._make_key(slice, host, port, password)
+            cls.lock.acquire()
+            if key in cls._apis:
+                cls._apis[key]['cnt'] += 1
+                cls.lock.release()
+                return cls._apis[key]['api']
             else :
-                return cls.create_api(slice, host, port, password)
+                omf_api = cls.create_api(slice, host, port, password)
+                cls.lock.release()
+                return omf_api
         return None
 
     @classmethod 
@@ -292,24 +323,16 @@ class OMFAPIFactory(object):
         :type password: str
 
         """
-        OmfApi = OMFAPI(slice, host, port, password)
-        key = cls._hash_api(slice, host, port)      
-        cls._Api[key] = OmfApi
-        return OmfApi
-
-    # XXX: this is not a hash :)
-    # From wikipedia: "A hash function is any algorithm or subroutine that maps large data 
-    # sets of variable length to smaller data sets of a fixed length."
-    # The idea is to apply a function to get a smaller string. Use hashlib instead.
-    # e.g:
-    # import hashlib
-    # res = slice + "_" + host + "_" + port
-    # hashlib.md5(res).hexdigest()
-    #
-    # XXX: change method name for 'make_key'
+        omf_api = OMFAPI(slice, host, port, password)
+        key = cls._make_key(slice, host, port, password)
+        cls._apis[key] = {}
+        cls._apis[key]['api'] = omf_api
+        cls._apis[key]['cnt'] = 1
+        return omf_api
+
     @classmethod 
-    def _hash_api(cls, slice, host, port):
-        """ Hash the credentials in order to create a key
+    def release_api(cls, slice, host, port, password):
+        """ Release an API with this credentials
 
         :param slice: Xmpp Slice Name
         :type slice: str
@@ -317,12 +340,30 @@ class OMFAPIFactory(object):
         :type host: str
         :param port: Xmpp Port (Default : 5222)
         :type port: str
+        :param password: Xmpp Password
+        :type password: str
 
         """
-        res = slice + "_" + host + "_" + port
-        return res
+        if slice and host and port and password:
+            key = cls._make_key(slice, host, port, password)
+            if key in cls._apis:
+                cls._apis[key]['cnt'] -= 1
+                #print "Api Counter : " + str(cls._apis[key]['cnt'])
+                if cls._apis[key]['cnt'] == 0:
+                    omf_api = cls._apis[key]['api']
+                    omf_api.disconnect()
 
 
+    @classmethod 
+    def _make_key(cls, *args):
+        """ Hash the credentials in order to create a key
+
+        :param args: list of arguments used to create the hash (user, host, port, ...)
+        :type args: list of args
+
+        """
+        skey = "".join(map(str, args))
+        return hashlib.md5(skey).hexdigest()
 
 
 
index 2ffe809..1e64974 100644 (file)
@@ -1,14 +1,14 @@
 #!/usr/bin/env python
 
-from neco.execution.resource import Resource, clsinit
-from neco.execution.attribute import Attribute
+from neco.execution.resource import ResourceManager, clsinit
+from neco.execution.attribute import Attribute, Flags 
 from neco.resources.omf.omf_api import OMFAPIFactory
 
 import neco
 import logging
 
 @clsinit
-class OMFApplication(Resource):
+class OMFApplication(ResourceManager):
     """
     .. class:: Class Args :
       
@@ -16,7 +16,7 @@ class OMFApplication(Resource):
         :type ec: ExperimentController
         :param guid: guid of the RM
         :type guid: int
-        :param creds: Credentials to communicate with the rm (XmppClient for OMF)
+        :param creds: Credentials to communicate with the rm (XmppClient)
         :type creds: dict
 
     .. note::
@@ -36,10 +36,10 @@ class OMFApplication(Resource):
         path = Attribute("path", "Path of the application")
         args = Attribute("args", "Argument of the application")
         env = Attribute("env", "Environnement variable of the application")
-        xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02")
-        xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02")
-        xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02")
-        xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02")
+        xmppSlice = Attribute("xmppSlice","Name of the slice", flags = Flags.Credential)
+        xmppHost = Attribute("xmppHost", "Xmpp Server",flags = Flags.Credential)
+        xmppPort = Attribute("xmppPort", "Xmpp Port",flags = Flags.Credential)
+        xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = Flags.Credential)
         cls._register_attribute(appid)
         cls._register_attribute(path)
         cls._register_attribute(args)
@@ -50,7 +50,7 @@ class OMFApplication(Resource):
         cls._register_attribute(xmppPassword)
 
 
-    def __init__(self, ec, guid, creds):
+    def __init__(self, ec, guid):
         """
         :param ec: The Experiment controller
         :type ec: ExperimentController
@@ -62,10 +62,6 @@ class OMFApplication(Resource):
         """
         
         super(OMFApplication, self).__init__(ec, guid)
-        self.set('xmppSlice', creds['xmppSlice'])
-        self.set('xmppHost', creds['xmppHost'])
-        self.set('xmppPort', creds['xmppPort'])
-        self.set('xmppPassword', creds['xmppPassword'])
 
         self.set('appid', "")
         self.set('path', "")
@@ -74,7 +70,7 @@ class OMFApplication(Resource):
 
         self._node = None
 
-        self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+        self._omf_api = None
 
         self._logger = logging.getLogger("neco.omf.omfApp    ")
         self._logger.setLevel(neco.LOGLEVEL)
@@ -88,7 +84,7 @@ class OMFApplication(Resource):
         :rtype:  Boolean
 
         """
-        rm = self.ec.resource(guid)
+        rm = self.ec.get_resource(guid)
         if rm.rtype() not in self._authorized_connections:
             self._logger.debug("Connection between %s %s and %s %s refused : An Application can be connected only to a Node" % (self.rtype(), self._guid, rm.rtype(), guid))
             return False
@@ -108,17 +104,26 @@ class OMFApplication(Resource):
         """
 
         for elt in conn_set:
-            rm = self.ec.resource(elt)
+            rm = self.ec.get_resource(elt)
             if rm.rtype() == "OMFNode":
                 return rm
         return None
 
+    def deploy(self):
+        """Deploy the RM
+
+        """
+        super(OMFApplication, self).deploy()
+        self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
+            self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+
     def start(self):
         """Send Xmpp Message Using OMF protocol to execute the application
 
         """
+        super(OMFApplication, self).start()
         self._logger.debug(" " + self.rtype() + " ( Guid : " + str(self._guid) +") : " + self.get('appid') + " : " + self.get('path') + " : " + self.get('args') + " : " + self.get('env'))
-        #try:
+
         if self.get('appid') and self.get('path') and self.get('args') and self.get('env'):
             rm_node = self._get_nodes(self._connections)
             self._omf_api.execute(rm_node.get('hostname'),self.get('appid'), self.get('args'), self.get('path'), self.get('env'))
@@ -127,8 +132,15 @@ class OMFApplication(Resource):
         """Send Xmpp Message Using OMF protocol to kill the application
 
         """
+
         rm_node = self._get_nodes(self._connections)
         self._omf_api.exit(rm_node.get('hostname'),self.get('appid'))
+        super(OMFApplication, self).stop()
 
+    def release(self):
+        """Clean the RM at the end of the experiment
 
+        """
+        OMFAPIFactory.release_api(self.get('xmppSlice'), 
+            self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
 
index d9da3ed..192de27 100644 (file)
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
-from neco.execution.resource import Resource, clsinit
-from neco.execution.attribute import Attribute
+from neco.execution.resource import ResourceManager, clsinit
+from neco.execution.attribute import Attribute, Flags 
 
 from neco.resources.omf.omf_api import OMFAPIFactory
 
@@ -8,7 +8,7 @@ import neco
 import logging
 
 @clsinit
-class OMFChannel(Resource):
+class OMFChannel(ResourceManager):
     """
     .. class:: Class Args :
       
@@ -32,17 +32,17 @@ class OMFChannel(Resource):
         """Register the attributes of an OMF channel
         """
         channel = Attribute("channel", "Name of the application")
-        xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02")
-        xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02")
-        xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02")
-        xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02")
+        xmppSlice = Attribute("xmppSlice","Name of the slice", flags = Flags.Credential)
+        xmppHost = Attribute("xmppHost", "Xmpp Server",flags = Flags.Credential)
+        xmppPort = Attribute("xmppPort", "Xmpp Port",flags = Flags.Credential)
+        xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = Flags.Credential)
         cls._register_attribute(channel)
         cls._register_attribute(xmppSlice)
         cls._register_attribute(xmppHost)
         cls._register_attribute(xmppPort)
         cls._register_attribute(xmppPassword)
 
-    def __init__(self, ec, guid, creds):
+    def __init__(self, ec, guid):
         """
         :param ec: The Experiment controller
         :type ec: ExperimentController
@@ -53,14 +53,10 @@ class OMFChannel(Resource):
 
         """
         super(OMFChannel, self).__init__(ec, guid)
-        self.set('xmppSlice', creds['xmppSlice'])
-        self.set('xmppHost', creds['xmppHost'])
-        self.set('xmppPort', creds['xmppPort'])
-        self.set('xmppPassword', creds['xmppPassword'])
 
         self._nodes_guid = list()
 
-        self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+        self._omf_api = None
 
         self._logger = logging.getLogger("neco.omf.omfChannel")
         self._logger.setLevel(neco.LOGLEVEL)
@@ -73,9 +69,10 @@ class OMFChannel(Resource):
         :rtype:  Boolean
 
         """
-        rm = self.ec.resource(guid)
+        rm = self.ec.get_resource(guid)
         if rm.rtype() in self._authorized_connections:
-            self._logger.debug("Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid))
+            self._logger.debug("Connection between %s %s and %s %s accepted" %
+                (self.rtype(), self._guid, rm.rtype(), guid))
             return True
         self._logger.debug("Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid))
         return False
@@ -88,24 +85,33 @@ class OMFChannel(Resource):
         :type conn_set: set
         :rtype: list
         :return: self._nodes_guid
+
         """
         for elt in conn_set:
-            rm_iface = self.ec.resource(elt)
+            rm_iface = self.ec.get_resource(elt)
             for conn in rm_iface._connections:
-                rm_node = self.ec.resource(conn)
+                rm_node = self.ec.get_resource(conn)
                 if rm_node.rtype() == "OMFNode":
                     couple = [rm_node.get('hostname'), rm_iface.get('alias')]
                     #print couple
                     self._nodes_guid.append(couple)
         return self._nodes_guid
 
+    def deploy(self):
+        """Deploy the RM
+
+        """
+        super(OMFChannel, self).deploy()
+        self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
+            self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+
     def discover(self):
         """ Discover the availables channels
 
         """
         pass
      
-    def provision(self, credential):
+    def provision(self):
         """ Provision some availables channels
 
         """
@@ -117,23 +123,25 @@ class OMFChannel(Resource):
         """
         if self.get('channel'):
             set_nodes = self._get_target(self._connections) 
-            #print set_nodes
+            print set_nodes
             for couple in set_nodes:
                 #print "Couple node/alias : " + couple[0] + "  ,  " + couple[1]
                 attrval = self.get('channel')
                 attrname = "net/%s/%s" % (couple[1], 'channel')
                 #print "Send the configure message"
                 self._omf_api.configure(couple[0], attrname, attrval)
+        super(OMFChannel, self).start()
 
-    def xstart(self):
-        try:
-            if self.get('channel'):
-                node = self.tc.elements.get(self._node_guid)    
-                attrval = self.get('channel')
-                attrname = "net/%s/%s" % (self._alias, 'channel')
-                self._omf_api.configure('omf.plexus.wlab17', attrname, attrval)
-        except AttributeError:
-            # If the attribute is not yet defined, ignore the error
-            pass
+    def stop(self):
+        """Send Xmpp Message Using OMF protocol to put down the interface
 
+        """
+        super(OMFChannel, self).stop()
+
+    def release(self):
+        """Clean the RM at the end of the experiment
+
+        """
+        OMFAPIFactory.release_api(self.get('xmppSlice'), 
+            self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
 
index 3764f65..53dcb39 100644 (file)
@@ -6,7 +6,8 @@ import xml.etree.ElementTree as ET
 
 import neco
 
-class OMFClient(sleekxmpp.ClientXMPP):
+# inherit from BaseXmpp and XMLStream classes
+class OMFClient(sleekxmpp.ClientXMPP): 
     """
     .. class:: Class Args :
       
@@ -22,6 +23,15 @@ class OMFClient(sleekxmpp.ClientXMPP):
     """
 
     def __init__(self, jid, password):
+        """
+
+        :param jid: Jabber Id (= Xmpp Slice + Date)
+        :type jid: Str
+        :param password: Jabber Password (= Xmpp Password)
+        :type password: Str
+
+
+        """
         sleekxmpp.ClientXMPP.__init__(self, jid, password)
         self._ready = False
         self._registered = False
@@ -41,16 +51,25 @@ class OMFClient(sleekxmpp.ClientXMPP):
 
     @property
     def ready(self):
+        """ Check if the client is ready
+
+        """
         return self._ready
 
     def start(self, event):
+        """ Send presence to the Xmppp Server. This function is called directly by the sleekXmpp library
+
+        """
         self.send_presence()
         self._ready = True
         self._server = "pubsub.%s" % self.boundjid.domain
 
     def register(self, iq):
+        """  Register to the Xmppp Server. This function is called directly by the sleekXmpp library
+
+        """
         if self._registered:
-            self._logger.info("%s already registered!" % self.boundjid)
+            self._logger.info(" %s already registered!" % self.boundjid)
             return 
 
         resp = self.Iq()
@@ -60,26 +79,32 @@ class OMFClient(sleekxmpp.ClientXMPP):
 
         try:
             resp.send(now=True)
-            self._logger.info("Account created for %s!" % self.boundjid)
+            self._logger.info(" Account created for %s!" % self.boundjid)
             self._registered = True
         except IqError as e:
-            self._logger.error("Could not register account: %s" %
+            self._logger.error(" Could not register account: %s" %
                     e.iq['error']['text'])
         except IqTimeout:
-            self._logger.error("No response from server.")
+            self._logger.error(" No response from server.")
 
     def unregister(self):
+        """  Unregister from the Xmppp Server.
+
+        """
         try:
             self.plugin['xep_0077'].cancel_registration(
                 ifrom=self.boundjid.full)
-            self._logger.info("Account unregistered for %s!" % self.boundjid)
+            self._logger.info(" Account unregistered for %s!" % self.boundjid)
         except IqError as e:
-            self._logger.error("Could not unregister account: %s" %
+            self._logger.error(" Could not unregister account: %s" %
                     e.iq['error']['text'])
         except IqTimeout:
-            self._logger.error("No response from server.")
+            self._logger.error(" No response from server.")
 
     def nodes(self):
+        """  Get all the nodes of the Xmppp Server.
+
+        """
         try:
             result = self['xep_0060'].get_nodes(self._server)
             for item in result['disco_items']['items']:
@@ -87,9 +112,12 @@ class OMFClient(sleekxmpp.ClientXMPP):
             return result
         except:
             error = traceback.format_exc()
-            self._logger.error('Could not retrieve node list.\ntraceback:\n%s', error)
+            self._logger.error(' Could not retrieve node list.\ntraceback:\n%s', error)
 
     def subscriptions(self):
+        """  Get all the subscriptions of the Xmppp Server.
+
+        """
         try:
             result = self['xep_0060'].get_subscriptions(self._server)
                 #self.boundjid.full)
@@ -98,9 +126,15 @@ class OMFClient(sleekxmpp.ClientXMPP):
             return result
         except:
             error = traceback.format_exc()
-            self._logger.error('Could not retrieve subscriptions.\ntraceback:\n%s', error)
+            self._logger.error(' Could not retrieve subscriptions.\ntraceback:\n%s', error)
 
     def create(self, node):
+        """  Create the topic corresponding to the node
+
+        :param node: Name of the topic, corresponding to the node (ex : omf.plexus.wlab17)
+        :type node: str
+
+        """
         self._logger.debug(" Create Topic : " + node)
    
         config = self['xep_0004'].makeForm('submit')
@@ -115,28 +149,53 @@ class OMFClient(sleekxmpp.ClientXMPP):
             self['xep_0060'].create_node(self._server, node, config = config)
         except:
             error = traceback.format_exc()
-            self._logger.error('Could not create topic: %s\ntraceback:\n%s' % (node, error))
+            self._logger.error(' Could not create topic: %s\ntraceback:\n%s' % (node, error))
 
     def delete(self, node):
+        """  Delete the topic corresponding to the node
+
+        :param node: Name of the topic, corresponding to the node (ex : omf.plexus.wlab17)
+        :type node: str
+
+        """
+        # To check if the queue are well empty at the end
+        #print " length of the queue : " + str(self.send_queue.qsize())
+        #print " length of the queue : " + str(self.event_queue.qsize())
         try:
             self['xep_0060'].delete_node(self._server, node)
-            self._logger.info('Deleted node: %s' % node)
+            self._logger.info(' Deleted node: %s' % node)
         except:
             error = traceback.format_exc()
-            self._logger.error('Could not delete topic: %s\ntraceback:\n%s' % (node, error))
+            self._logger.error(' Could not delete topic: %s\ntraceback:\n%s' % (node, error))
     
     def publish(self, data, node):
-        self._logger.debug(" Publish to Topic :" + node)
+        """  Publish the data to the corresponding topic
+
+        :param data: Data that will be published
+        :type data: str
+        :param node: Name of the topic
+        :type node: str
+
+        """ 
+
+        self._logger.debug(" Publish to Topic : " + node)
         try:
             result = self['xep_0060'].publish(self._server,node,payload=data)
             # id = result['pubsub']['publish']['item']['id']
             # print('Published at item id: %s' % id)
         except:
             error = traceback.format_exc()
-            self._logger.error('Could not publish to: %s\ntraceback:\n%s' \
+            self._logger.error(' Could not publish to: %s\ntraceback:\n%s' \
                     % (node, error))
 
     def get(self, data):
+        """  Get the item
+
+        :param data: data from which the items will be get back
+        :type data: str
+
+
+        """
         try:
             result = self['xep_0060'].get_item(self._server, self.boundjid,
                 data)
@@ -145,28 +204,43 @@ class OMFClient(sleekxmpp.ClientXMPP):
                     tostring(item['payload'])))
         except:
             error = traceback.format_exc()
-            self._logger.error('Could not retrieve item %s from topic %s\ntraceback:\n%s' \
+            self._logger.error(' Could not retrieve item %s from topic %s\ntraceback:\n%s' \
                     % (data, self.boundjid, error))
 
     def retract(self, data):
+        """  Retract the item
+
+        :param data: data from which the item will be retracted
+        :type data: str
+
+        """
         try:
             result = self['xep_0060'].retract(self._server, self.boundjid, data)
-            self._logger.info('Retracted item %s from topic %s' % (data, self.boundjid))
+            self._logger.info(' Retracted item %s from topic %s' % (data, self.boundjid))
         except:
             error = traceback.format_exc()
-            self._logger.error('Could not retract item %s from topic %s\ntraceback:\n%s' \
+            self._logger.error(' Could not retract item %s from topic %s\ntraceback:\n%s' \
                     % (data, self.boundjid, error))
 
     def purge(self):
+        """  Purge the information in the server
+
+        """
         try:
             result = self['xep_0060'].purge(self._server, self.boundjid)
-            self._logger.info('Purged all items from topic %s' % self.boundjid)
+            self._logger.info(' Purged all items from topic %s' % self.boundjid)
         except:
             error = traceback.format_exc()
-            self._logger.error('Could not purge items from topic %s\ntraceback:\n%s' \
+            self._logger.error(' Could not purge items from topic %s\ntraceback:\n%s' \
                     % (self.boundjid, error))
 
     def subscribe(self, node):
+        """ Subscribe to a topic
+
+        :param node: Name of the topic
+        :type node: str
+
+        """
         try:
             result = self['xep_0060'].subscribe(self._server, node)
             #self._logger.debug('Subscribed %s to node %s' \
@@ -179,6 +253,12 @@ class OMFClient(sleekxmpp.ClientXMPP):
                     % (self.boundjid.bare, node, error))
 
     def unsubscribe(self, node):
+        """ Unsubscribe to a topic
+
+        :param node: Name of the topic
+        :type node: str
+
+        """
         try:
             result = self['xep_0060'].unsubscribe(self._server, node)
             self._logger.info(' Unsubscribed %s from topic %s' % (self.boundjid.bare, node))
@@ -187,27 +267,51 @@ class OMFClient(sleekxmpp.ClientXMPP):
             self._logger.error(' Could not unsubscribe %s from topic %s\ntraceback:\n%s' \
                     % (self.boundjid.bare, node, error))
 
-    def _check_for_tag(self, treeroot, namespaces, tag):
-        for element in treeroot.iter(namespaces+tag):
+    def _check_for_tag(self, root, namespaces, tag):
+        """  Check if an element markup is in the ElementTree
+
+        :param root: Root of the tree
+        :type root: ElementTree Element
+        :param namespaces: Namespaces of the element
+        :type namespaces: str
+        :param tag: Tag that will search in the tree
+        :type tag: str
+
+        """
+        for element in root.iter(namespaces+tag):
             if element.text:
                 return element
             else : 
                 return None    
 
-    def _check_output(self, treeroot, namespaces):
-        output_param = ["TARGET", "REASON", "PATH", "APPID", "VALUE"]
+    def _check_output(self, root, namespaces):
+        """ Check the significative element in the answer and display it
+
+        :param root: Root of the tree
+        :type root: ElementTree Element
+        :param namespaces: Namespaces of the tree
+        :type namespaces: str
+
+        """
+        fields = ["TARGET", "REASON", "PATH", "APPID", "VALUE"]
         response = ""
-        for elt in output_param:
-            msg = self._check_for_tag(treeroot, namespaces, elt)
+        for elt in fields:
+            msg = self._check_for_tag(root, namespaces, elt)
             if msg is not None:
                 response = response + " " + msg.text + " :"
-        deb = self._check_for_tag(treeroot, namespaces, "MESSAGE")
+        deb = self._check_for_tag(root, namespaces, "MESSAGE")
         if deb is not None:
             self._logger.debug(response + " " + deb.text)
         else :
             self._logger.info(response)
 
     def handle_omf_message(self, iq):
+        """ Handle published/received message 
+
+        :param iq: Stanzas that is currently published/received
+        :type iq: Iq Stanza
+
+        """
         namespaces = "{http://jabber.org/protocol/pubsub}"
         for i in iq['pubsub_event']['items']:
             root = ET.fromstring(str(i))
index 982d8cd..bbfc045 100644 (file)
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
-from neco.execution.resource import Resource, clsinit
-from neco.execution.attribute import Attribute
+from neco.execution.resource import ResourceManager, clsinit
+from neco.execution.attribute import Attribute, Flags 
 
 from neco.resources.omf.omf_api import OMFAPIFactory
 
@@ -8,7 +8,7 @@ import neco
 import logging
 
 @clsinit
-class OMFWifiInterface(Resource):
+class OMFWifiInterface(ResourceManager):
     """
     .. class:: Class Args :
       
@@ -32,16 +32,17 @@ class OMFWifiInterface(Resource):
     @classmethod
     def _register_attributes(cls):
         """Register the attributes of an OMF interface 
+
         """
-        alias = Attribute("alias","Alias of the interface", default_value = "w0")  
+        alias = Attribute("alias","Alias of the interface", default_value = "w0")
         mode = Attribute("mode","Mode of the interface")
         type = Attribute("type","Type of the interface")
         essid = Attribute("essid","Essid of the interface")
         ip = Attribute("ip","IP of the interface")
-        xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02")
-        xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02")
-        xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02")
-        xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02")
+        xmppSlice = Attribute("xmppSlice","Name of the slice", flags = Flags.Credential)
+        xmppHost = Attribute("xmppHost", "Xmpp Server",flags = Flags.Credential)
+        xmppPort = Attribute("xmppPort", "Xmpp Port",flags = Flags.Credential)
+        xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = Flags.Credential)
         cls._register_attribute(alias)
         cls._register_attribute(xmppSlice)
         cls._register_attribute(xmppHost)
@@ -52,7 +53,7 @@ class OMFWifiInterface(Resource):
         cls._register_attribute(essid)
         cls._register_attribute(ip)
 
-    def __init__(self, ec, guid, creds):
+    def __init__(self, ec, guid):
         """
         :param ec: The Experiment controller
         :type ec: ExperimentController
@@ -63,52 +64,60 @@ class OMFWifiInterface(Resource):
 
         """
         super(OMFWifiInterface, self).__init__(ec, guid)
-        self.set('xmppSlice', creds['xmppSlice'])
-        self.set('xmppHost', creds['xmppHost'])
-        self.set('xmppPort', creds['xmppPort'])
-        self.set('xmppPassword', creds['xmppPassword'])
 
-        self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+        self._omf_api = None
         self._alias = self.get('alias')
 
         self._logger = logging.getLogger("neco.omf.omfIface  ")
         self._logger.setLevel(neco.LOGLEVEL)
 
     def _validate_connection(self, guid):
-        """Check if the connection is available.
+        """ Check if the connection is available.
 
         :param guid: Guid of the current RM
         :type guid: int
         :rtype:  Boolean
 
         """
-        rm = self.ec.resource(guid)
+        rm = self.ec.get_resource(guid)
         if rm.rtype() in self._authorized_connections:
-            self._logger.debug("Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid))
+            self._logger.debug("Connection between %s %s and %s %s accepted" %
+                (self.rtype(), self._guid, rm.rtype(), guid))
             return True
-        self._logger.debug("Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid))
+        self._logger.debug("Connection between %s %s and %s %s refused" % 
+            (self.rtype(), self._guid, rm.rtype(), guid))
         return False
 
     def _get_nodes(self, conn_set):
-        """
-        Get the RM of the node to which the application is connected
+        """ Get the RM of the node to which the application is connected
 
         :param conn_set: Connections of the current Guid
         :type conn_set: set
         :rtype: ResourceManager
+
         """
         for elt in conn_set:
-            rm = self.ec.resource(elt)
+            rm = self.ec.get_resource(elt)
             if rm.rtype() == "OMFNode":
                 return rm
         return None
 
+    def deploy(self):
+        """Deploy the RM
+
+        """
+        super(OMFWifiInterface, self).deploy()
+        self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
+            self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+
 
     def start(self):
         """Send Xmpp Messages Using OMF protocol to configure Interface
 
         """
-        self._logger.debug(self.rtype() + " ( Guid : " + str(self._guid) +") : " + self.get('mode') + " : " + self.get('type') + " : " + self.get('essid') + " : " + self.get('ip'))
+        self._logger.debug(" " + self.rtype() + " ( Guid : " + str(self._guid) +") : " +
+            self.get('mode') + " : " + self.get('type') + " : " +
+            self.get('essid') + " : " + self.get('ip'))
         #try:
         if self.get('mode') and self.get('type') and self.get('essid') and self.get('ip'):
             rm_node = self._get_nodes(self._connections)    
@@ -117,12 +126,19 @@ class OMFWifiInterface(Resource):
                 attrname = "net/%s/%s" % (self._alias, attrname)
                 #print "Send the configure message"
                 self._omf_api.configure(rm_node.get('hostname'), attrname, attrval)
+        super(OMFWifiInterface, self).start()
 
     def stop(self):
         """Send Xmpp Message Using OMF protocol to put down the interface
 
         """
-        self._omf_api.disconnect()
+        super(OMFWifiInterface, self).stop()
 
+    def release(self):
+        """Clean the RM at the end of the experiment
+
+        """
+        OMFAPIFactory.release_api(self.get('xmppSlice'), 
+            self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
 
 
index 4b07265..c5857c2 100644 (file)
@@ -1,26 +1,5 @@
 from xml.etree import cElementTree as ET
 
-EXECUTE = "EXECUTE"
-KILL = "KILL"
-STDIN = "STDIN"
-NOOP = "NOOP"
-PM_INSTALL = "PM_INSTALL"
-APT_INSTALL = "APT_INSTALL"
-RPM_INSTALL = "RPM_INSTALL"
-RESET = "RESET"
-REBOOT = "REBOOT"
-MODPROBE = "MODPROBE"
-CONFIGURE = "CONFIGURE"
-LOAD_IMAGE = "LOAD_IMAGE"
-SAVE_IMAGE = "SAVE_IMAGE"
-LOAD_DATA = "LOAD_DATA"
-SET_LINK = "SET_LINK"
-ALIAS = "ALIAS"
-SET_DISCONNECTION = "SET_DISCONNECTION"
-RESTART = "RESTART"
-ENROLL = "ENROLL"
-EXIT = "EXIT" 
-
 class MessageHandler():
     """
     .. class:: Class Args :
@@ -36,102 +15,201 @@ class MessageHandler():
 
     """
 
-
     def __init__(self, sliceid, expid ):
+        """
+
+        :param sliceid: Slice Name (= Xmpp Slice)
+        :type expid: Str
+        :param expid: Experiment ID (= Xmpp User)
+        :type expid: Str
+
+        """
         self._slice_id = sliceid
         self._exp_id = expid
-        print "init" + self._exp_id +"  "+ self._slice_id
-        pass
-
-    def Mid(self, parent, keyword):
-        mid = ET.SubElement(parent, keyword)
-        mid.set("id", "\'omf-payload\'")
-        return mid
 
-    def Mtext(self, parent, keyword, text):
-        mtext = ET.SubElement(parent, keyword)
-        mtext.text = text
-        return mtext
 
-    def executefunction(self, target, appid, cmdlineargs, path, env):
+    def _id_element(self, parent, markup):
+        """ Insert a markup element with an id
+
+        :param parent: Parent element in an XML point of view
+        :type parent: ElementTree Element
+        :param markup: Name of the markup
+        :type markup: str
+
+        """
+        elt = ET.SubElement(parent, markup)
+        elt.set("id", "\'omf-payload\'")
+        return elt
+
+    def _attr_element(self, parent, markup, text):
+        """ Insert a markup element with a text (value)
+
+        :param parent: Parent element in an XML point of view
+        :type parent: ElementTree Element
+        :param markup: Name of the markup
+        :type markup: str
+        :param text: Value of the markup element
+        :type text: str
+
+        """
+        elt = ET.SubElement(parent, markup)
+        elt.text = text
+        return elt
+
+    def execute_function(self, target, appid, cmdlineargs, path, env):
+        """ Build an Execute Message
+
+        :param target: Hrn of the target node (ex : omf.plexus.wlab17)
+        :type target: str
+        :param appid: Application id
+        :type appid: str
+        :param cmdlineargs: Arguments of the application
+        :type cmdlineargs: str
+        :param path: Path of the application
+        :type path: str
+        :param env: Environment variables
+        :type env: str
+
+        """
         payload = ET.Element("omf-message")
-        execute = self.Mid(payload,"EXECUTE")
-        env = self.Mtext(execute, "ENV", env)
-        sliceid = self.Mtext(execute,"SLICEID",self._slice_id)
-        expid = self.Mtext(execute,"EXPID",self._exp_id)
-        target = self.Mtext(execute,"TARGET",target)
-        appid = self.Mtext(execute,"APPID",appid)
-        cmdlineargs = self.Mtext(execute,"CMDLINEARGS",cmdlineargs)
-        path = self.Mtext(execute,"PATH",path)
+        execute = self._id_element(payload,"EXECUTE")
+        env = self._attr_element(execute, "ENV", env)
+        sliceid = self._attr_element(execute,"SLICEID",self._slice_id)
+        expid = self._attr_element(execute,"EXPID",self._exp_id)
+        target = self._attr_element(execute,"TARGET",target)
+        appid = self._attr_element(execute,"APPID",appid)
+        cmdlineargs = self._attr_element(execute,"CMDLINEARGS",cmdlineargs)
+        path = self._attr_element(execute,"PATH",path)
         return payload
 
-    def exitfunction(self, target, appid):
+    def exit_function(self, target, appid):
+        """ Build an Exit Message
+
+        :param target: Hrn of the target node (ex : omf.plexus.wlab17)
+        :type target: str
+        :param appid: Application id (ex : vlc#1)
+        :type appid: str
+
+        """
         payload = ET.Element("omf-message")
-        execute = self.Mid(payload,"EXIT")
-        sliceid = self.Mtext(execute,"SLICEID",self._slice_id)
-        expid = self.Mtext(execute,"EXPID",self._exp_id)
-        target = self.Mtext(execute,"TARGET",target)
-        appid = self.Mtext(execute,"APPID",appid)
+        execute = self._id_element(payload,"EXIT")
+        sliceid = self._attr_element(execute,"SLICEID",self._slice_id)
+        expid = self._attr_element(execute,"EXPID",self._exp_id)
+        target = self._attr_element(execute,"TARGET",target)
+        appid = self._attr_element(execute,"APPID",appid)
         return payload
 
-    def configurefunction(self, target, value, path):
+    def configure_function(self, target, value, path):
+        """ Build a Configure Message
+
+        :param target: Hrn of the target node (ex : omf.plexus.wlab17)
+        :type target: str
+        :param value: guid of the RM
+        :type value: int
+        :param path: Path of the element to configure (ex : net/w0/channel)
+        :type path: dict
+
+        """
         payload = ET.Element("omf-message")
-        config = self.Mid(payload, "CONFIGURE")
-        sliceid = self.Mtext(config,"SLICEID",self._slice_id)
-        expid = self.Mtext(config,"EXPID",self._exp_id)
-        target = self.Mtext(config,"TARGET",target)
-        value = self.Mtext(config,"VALUE",value)
-        path = self.Mtext(config,"PATH",path)
+        config = self._id_element(payload, "CONFIGURE")
+        sliceid = self._attr_element(config,"SLICEID",self._slice_id)
+        expid = self._attr_element(config,"EXPID",self._exp_id)
+        target = self._attr_element(config,"TARGET",target)
+        value = self._attr_element(config,"VALUE",value)
+        path = self._attr_element(config,"PATH",path)
         return payload
 
-    def logfunction(self,level, logger, level_name, data):
+    def log_function(self,level, logger, level_name, data):
+        """ Build a Log Message
+
+        :param level: Level of logging
+        :type level: str
+        :param logger: Element publishing the log
+        :type logger: str
+        :param level_name: Name of the level (ex : INFO)
+        :type level_name: str
+        :param data: Content to publish
+        :type data: str
+
+        """
         payload = ET.Element("omf-message")
-        log = self.Mid(payload, "LOGGING")
-        level = self.Mtext(log,"LEVEL",level)
-        sliceid = self.Mtext(log,"SLICEID",self._slice_id)
-        logger = self.Mtext(log,"LOGGER",logger)
-        expid = self.Mtext(log,"EXPID",self._exp_id)
-        level_name = self.Mtext(log,"LEVEL_NAME",level_name)
-        data = self.Mtext(log,"DATA",data)
+        log = self._id_element(payload, "LOGGING")
+        level = self._attr_element(log,"LEVEL",level)
+        sliceid = self._attr_element(log,"SLICEID",self._slice_id)
+        logger = self._attr_element(log,"LOGGER",logger)
+        expid = self._attr_element(log,"EXPID",self._exp_id)
+        level_name = self._attr_element(log,"LEVEL_NAME",level_name)
+        data = self._attr_element(log,"DATA",data)
         return payload
 
-    def aliasfunction(self, name, target):
+    def alias_function(self, name, target):
+        """ Build a Alias Message
+
+        :param name: Name of the new alias
+        :type name: str
+        :param target: Hrn of the target node (ex : omf.plexus.wlab17)
+        :type target: str
+
+        """
         payload = ET.Element("omf-message")
-        alias = self.Mid(payload,"ALIAS")
-        sliceid = self.Mtext(alias,"SLICEID",self._slice_id)
-        expid = self.Mtext(alias,"EXPID",self._exp_id)
-        name = self.Mtext(alias,"NAME",name)
-        target = self.Mtext(alias,"TARGET",target)
+        alias = self._id_element(payload,"ALIAS")
+        sliceid = self._attr_element(alias,"SLICEID",self._slice_id)
+        expid = self._attr_element(alias,"EXPID",self._exp_id)
+        name = self._attr_element(alias,"NAME",name)
+        target = self._attr_element(alias,"TARGET",target)
         return payload
 
-    def enrollfunction(self, enrollkey, image, index, target ):
+    def enroll_function(self, enrollkey, image, index, target ):
+        """ Build an Enroll Message
+
+        :param enrollkey: Type of enrollment (= 1)
+        :type enrollkey: str
+        :param image: Image (= * when all the nodes are concerned)
+        :type image: str
+        :param index: Index (= 1 in general)
+        :type index: str
+        :param target: Hrn of the target node (ex : omf.plexus.wlab17)
+        :type target: str
+
+        """
         payload = ET.Element("omf-message")
-        enroll = self.Mid(payload,"ENROLL")
-        enrollkey = self.Mtext(enroll,"ENROLLKEY",enrollkey)
-        sliceid = self.Mtext(enroll,"SLICEID",self._slice_id)
-        image = self.Mtext(enroll,"IMAGE",image)
-        expid = self.Mtext(enroll,"EXPID",self._exp_id)
-        index = self.Mtext(enroll,"INDEX",index)
-        target = self.Mtext(enroll,"TARGET",target)
+        enroll = self._id_element(payload,"ENROLL")
+        enrollkey = self._attr_element(enroll,"ENROLLKEY",enrollkey)
+        sliceid = self._attr_element(enroll,"SLICEID",self._slice_id)
+        image = self._attr_element(enroll,"IMAGE",image)
+        expid = self._attr_element(enroll,"EXPID",self._exp_id)
+        index = self._attr_element(enroll,"INDEX",index)
+        target = self._attr_element(enroll,"TARGET",target)
         return payload
 
-    def noopfunction(self,target):
+    def noop_function(self,target):
+        """ Build a Noop Message
+
+        :param target: Hrn of the target node (ex : omf.plexus.wlab17)
+        :type target: str
+
+        """
         payload = ET.Element("omf-message")
-        noop = self.Mid(payload,"NOOP")
-        sliceid = self.Mtext(noop,"SLICEID",self._slice_id)
-        expid = self.Mtext(noop,"EXPID",self._exp_id)
-        target = self.Mtext(noop,"TARGET",target)
+        noop = self._id_element(payload,"NOOP")
+        sliceid = self._attr_element(noop,"SLICEID",self._slice_id)
+        expid = self._attr_element(noop,"EXPID",self._exp_id)
+        target = self._attr_element(noop,"TARGET",target)
         return payload
 
-    def newexpfunction(self, experimentid, address):
+    def newexp_function(self, experimentid, address):
+        """ Build a NewExp Message
+
+        :param experimentid: Id of the new experiment
+        :type experimentid: str
+        :param address: Adress of the destination set of nodes
+        :type address: str
+
+        """
         payload = ET.Element("omf-message")
-        newexp = self.Mid(payload,"EXPERIMENT_NEW")
-        experimentid = self.Mtext(newexp,"EXPERIMENT_ID",experimentid)
-        sliceid = self.Mtext(newexp,"SLICEID",self._slice_id)
-        expid = self.Mtext(newexp,"EXPID",self._exp_id)
-        address = self.Mtext(newexp,"ADDRESS",address)
+        newexp = self._id_element(payload,"EXPERIMENT_NEW")
+        experimentid = self._attr_element(newexp,"EXPERIMENT_ID",experimentid)
+        sliceid = self._attr_element(newexp,"SLICEID",self._slice_id)
+        expid = self._attr_element(newexp,"EXPID",self._exp_id)
+        address = self._attr_element(newexp,"ADDRESS",address)
         return payload
 
-    def handle_message(self, msg):
-        # Do something!!!
-        return msg
index becc400..5bcdcdf 100644 (file)
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
 from neco.execution.resource import ResourceManager, clsinit
-from neco.execution.attribute import Attribute
+from neco.execution.attribute import Attribute, Flags 
 
 from neco.resources.omf.omf_api import OMFAPIFactory
 
@@ -35,21 +35,17 @@ class OMFNode(ResourceManager):
         hostname = Attribute("hostname", "Hostname of the machine")
         cpu = Attribute("cpu", "CPU of the node")
         ram = Attribute("ram", "RAM of the node")
-        # XXX: flags = "0x02" is not human readable.
-        # instead:
-        # from neco.execution.attribute import Attribute, Flags 
-        # xmppSlice = Attribute("xmppSlice","Name of the slice", flags = Flags.Credential)
-        xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02")
-        xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02")
-        xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02")
-        xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02")
+        xmppSlice = Attribute("xmppSlice","Name of the slice", flags = Flags.Credential)
+        xmppHost = Attribute("xmppHost", "Xmpp Server",flags = Flags.Credential)
+        xmppPort = Attribute("xmppPort", "Xmpp Port",flags = Flags.Credential)
+        xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = Flags.Credential)
         cls._register_attribute(hostname)
         cls._register_attribute(ram)
         cls._register_attribute(cpu)
         cls._register_attribute(xmppSlice)
         cls._register_attribute(xmppHost)
         cls._register_attribute(xmppPort)
-        ls._register_attribute(xmppPassword)
+        cls._register_attribute(xmppPassword)
 
     @classmethod
     def _register_filters(cls):
@@ -67,9 +63,7 @@ class OMFNode(ResourceManager):
 
     # XXX: We don't necessary need to have the credentials at the 
     # moment we create the RM
-    # THE OMF API SHOULD BE CREATED ON THE DEPLOY METHOD, NOT NOW
-    # THIS FORCES MORE CONSTRAINES ON THE WAY WE WILL AUTHOMATE DEPLOYMENT!
-    def __init__(self, ec, guid, creds):
+    def __init__(self, ec, guid):
         """
         :param ec: The Experiment controller
         :type ec: ExperimentController
@@ -80,13 +74,8 @@ class OMFNode(ResourceManager):
 
         """
         super(OMFNode, self).__init__(ec, guid)
-        self.set('xmppSlice', creds['xmppSlice'])
-        self.set('xmppHost', creds['xmppHost'])
-        self.set('xmppPort', creds['xmppPort'])
-        self.set('xmppPassword', creds['xmppPassword'])
 
-        # XXX: Lines should not be more than 80 characters!
-        self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+        self._omf_api = None 
 
         self._logger = logging.getLogger("neco.omf.omfNode   ")
 
@@ -101,20 +90,30 @@ class OMFNode(ResourceManager):
         :rtype:  Boolean
 
         """
-        rm = self.ec.resource(guid)
+        rm = self.ec.get_resource(guid)
         if rm.rtype() in self._authorized_connections:
-            self._logger.debug("Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid))
+            self._logger.debug("Connection between %s %s and %s %s accepted" %
+                (self.rtype(), self._guid, rm.rtype(), guid))
             return True
-        self._logger.debug("Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid))
+        self._logger.debug("Connection between %s %s and %s %s refused" %
+            (self.rtype(), self._guid, rm.rtype(), guid))
         return False
 
+    def deploy(self):
+        """Deploy the RM
+
+        """ 
+        self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
+            self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+        super(OMFNode, self).deploy()
+
     def discover(self):
         """ Discover the availables nodes
 
         """
         pass
      
-    def provision(self, credential):
+    def provision(self):
         """ Provision some availables nodes
 
         """
@@ -124,13 +123,23 @@ class OMFNode(ResourceManager):
         """Send Xmpp Message Using OMF protocol to enroll the node into the experiment
 
         """
+        super(OMFNode, self).start()
         self._omf_api.enroll_host(self.get('hostname'))
 
     def stop(self):
         """Send Xmpp Message Using OMF protocol to disconnect the node
 
         """
-        self._omf_api.disconnect()
+        super(OMFNode, self).stop()
+
+    def release(self):
+        """Clean the RM at the end of the experiment
+
+        """
+        self._omf_api.release(self.get('hostname'))
+        OMFAPIFactory.release_api(self.get('xmppSlice'), 
+            self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+
 
     def configure(self):
         #routes = self.tc._add_route.get(self.guid, [])
index 316e584..9c990c4 100755 (executable)
@@ -1,5 +1,5 @@
 #!/usr/bin/env python
-from neco.execution.resource import ResourceFactory
+from neco.execution.resource import ResourceFactory, ResourceManager, ResourceAction, ResourceState
 from neco.execution.ec import ExperimentController
 
 from neco.resources.omf.omf_node import OMFNode
@@ -20,18 +20,13 @@ logging.basicConfig()
 class DummyEC(ExperimentController):
     pass
 
-class OMFVLCTestCase(unittest.TestCase):
+class DummyRM(ResourceManager):
+    pass
 
-    def setUp(self):
-        #self.guid_generator = guid.GuidGenerator()
-        self._creds = {'xmppSlice' : 'nepi' , 'xmppHost' : 'xmpp-plexus.onelab.eu' , 'xmppPort' : '5222', 'xmppPassword' : '1234'  }
 
-    def tearDown(self):
-        pass
+class OMFResourceFactoryTestCase(unittest.TestCase):
 
     def test_creation_phase(self):
-        ec = DummyEC()
-
         ResourceFactory.register_type(OMFNode)
         ResourceFactory.register_type(OMFWifiInterface)
         ResourceFactory.register_type(OMFChannel)
@@ -51,104 +46,149 @@ class OMFVLCTestCase(unittest.TestCase):
 
         self.assertEquals(len(ResourceFactory.resource_types()), 4)
 
-    #def xtest_creation_and_configuration_node(self):
-        guid = ec.register_resource("OMFNode", creds =  self._creds)
-        node1 = ec._resources[guid]
-        node1.set('hostname', 'omf.plexus.wlab17')
-
-        guid = ec.register_resource("OMFNode", creds =  self._creds)
-        node2 = ec._resources[guid]
-        node2.set('hostname', "omf.plexus.wlab37")
-
-    #def xtest_creation_and_configuration_interface(self):
-        guid = ec.register_resource("OMFWifiInterface", creds =  self._creds)
-        iface1 = ec._resources[guid]
-        iface1.set('alias', "w0")
-        iface1.set('mode', "adhoc")
-        iface1.set('type', "g")
-        iface1.set('essid', "helloworld")
-        iface1.set('ip', "10.0.0.17")
-
-        guid = ec.register_resource("OMFWifiInterface", creds =  self._creds)
-        iface2 = ec._resources[guid]
-        iface2.set('alias', "w0")
-        iface2.set('mode', "adhoc")
-        iface2.set('type', 'g')
-        iface2.set('essid', "helloworld")
-        iface2.set('ip', "10.0.0.37")  
-
-    #def xtest_creation_and_configuration_channel(self):
-        guid = ec.register_resource("OMFChannel", creds =  self._creds)
-        channel = ec._resources[guid]
-        channel.set('channel', "6")
-
-    #def xtest_creation_and_configuration_application(self):
-        guid = ec.register_resource("OMFApplication", creds =  self._creds)
-        app1 = ec._resources[guid]
-        app1.set('appid', 'Vlc#1')
-        app1.set('path', "/opt/vlc-1.1.13/cvlc")
-        app1.set('args', "/opt/10-by-p0d.avi --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts}'")
-        app1.set('env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
-
-        guid = ec.register_resource("OMFApplication", creds =  self._creds)
-        app2 = ec._resources[guid]
-        app2.set('appid', 'Vlc#2')
-        app2.set('path', "/opt/vlc-1.1.13/cvlc")
-        app2.set('args', "rtp://10.0.0.37:1234")
-        app2.set('env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
-        self.assertEquals(len(OMFAPIFactory._Api), 1)   
-
-    #def test_connection(self):
-        app1.connect(node1._guid)
-        node1.connect(app1._guid)
-
-        node1.connect(iface1._guid)
-        iface1.connect(node1._guid)
-
-        iface1.connect(channel._guid)
-        channel.connect(iface1._guid)
-
-        channel.connect(iface2._guid)
-        iface2.connect(channel._guid)
-
-        iface2.connect(node2._guid)
-        node2.connect(iface2._guid)
-
-        node2.connect(app2._guid)
-        app2.connect(node2._guid)
-
-    #def test_start_node(self):
-        node1.start()
-        node2.start()
-        time.sleep(1)
-        #pass
-
-    #def test_start_interface(self):
-        iface1.start()
-        iface2.start()
-
-    #def test_start_channel(self):
-        channel.start()
-        time.sleep(1)
-
-    #def test_start_application(self):
-        app1.start()
-        time.sleep(2)
-        app2.start()
-
-        time.sleep(10)
-    
-    #def test_stop_application(self):
-        app1.stop()
-        app2.stop()
-        time.sleep(2)
-
-
-    #def test_stop_nodes(self):
-        node1.stop()
-        #node2.stop()
+
+class OMFVLCTestCase(unittest.TestCase):
+
+    def setUp(self):
+        self.ec = DummyEC()
+        ResourceFactory.register_type(OMFNode)
+        ResourceFactory.register_type(OMFWifiInterface)
+        ResourceFactory.register_type(OMFChannel)
+        ResourceFactory.register_type(OMFApplication)
+
+    def tearDown(self):
+        self.ec.shutdown()
+
+    def test_creation_and_configuration_node(self):
+
+        node1 = self.ec.register_resource("OMFNode")
+        self.ec.set(node1, 'hostname', 'omf.plexus.wlab17')
+        self.ec.set(node1, 'xmppSlice', "nepi")
+        self.ec.set(node1, 'xmppHost', "xmpp-plexus.onelab.eu")
+        self.ec.set(node1, 'xmppPort', "5222")
+        self.ec.set(node1, 'xmppPassword', "1234")
+
+        self.assertEquals(self.ec.get(node1, 'hostname'), 'omf.plexus.wlab17')
+        self.assertEquals(self.ec.get(node1, 'xmppSlice'), 'nepi')
+        self.assertEquals(self.ec.get(node1, 'xmppHost'), 'xmpp-plexus.onelab.eu')
+        self.assertEquals(self.ec.get(node1, 'xmppPort'), '5222')
+        self.assertEquals(self.ec.get(node1, 'xmppPassword'), '1234')
+
+    def test_creation_and_configuration_interface(self):
+
+        iface1 = self.ec.register_resource("OMFWifiInterface")
+        self.ec.set(iface1, 'alias', "w0")
+        self.ec.set(iface1, 'mode', "adhoc")
+        self.ec.set(iface1, 'type', "g")
+        self.ec.set(iface1, 'essid', "vlcexp")
+        self.ec.set(iface1, 'ip', "10.0.0.17")
+        self.ec.set(iface1, 'xmppSlice', "nepi")
+        self.ec.set(iface1, 'xmppHost', "xmpp-plexus.onelab.eu")
+        self.ec.set(iface1, 'xmppPort', "5222")
+        self.ec.set(iface1, 'xmppPassword', "1234")
+
+        self.assertEquals(self.ec.get(iface1, 'alias'), 'w0')
+        self.assertEquals(self.ec.get(iface1, 'mode'), 'adhoc')
+        self.assertEquals(self.ec.get(iface1, 'type'), 'g')
+        self.assertEquals(self.ec.get(iface1, 'essid'), 'vlcexp')
+        self.assertEquals(self.ec.get(iface1, 'ip'), '10.0.0.17')
+        self.assertEquals(self.ec.get(iface1, 'xmppSlice'), 'nepi')
+        self.assertEquals(self.ec.get(iface1, 'xmppHost'), 'xmpp-plexus.onelab.eu')
+        self.assertEquals(self.ec.get(iface1, 'xmppPort'), '5222')
+        self.assertEquals(self.ec.get(iface1, 'xmppPassword'), '1234')
+
+    def test_creation_and_configuration_channel(self):
+
+        channel = self.ec.register_resource("OMFChannel")
+        self.ec.set(channel, 'channel', "6")
+        self.ec.set(channel, 'xmppSlice', "nepi")
+        self.ec.set(channel, 'xmppHost', "xmpp-plexus.onelab.eu")
+        self.ec.set(channel, 'xmppPort', "5222")
+        self.ec.set(channel, 'xmppPassword', "1234")
+
+        self.assertEquals(self.ec.get(channel, 'channel'), '6')
+        self.assertEquals(self.ec.get(channel, 'xmppSlice'), 'nepi')
+        self.assertEquals(self.ec.get(channel, 'xmppHost'), 'xmpp-plexus.onelab.eu')
+        self.assertEquals(self.ec.get(channel, 'xmppPort'), '5222')
+        self.assertEquals(self.ec.get(channel, 'xmppPassword'), '1234')
+
+    def test_creation_and_configuration_application(self):
+
+        app1 = self.ec.register_resource("OMFApplication")
+        self.ec.set(app1, 'appid', 'Vlc#1')
+        self.ec.set(app1, 'path', "/opt/vlc-1.1.13/cvlc")
+        self.ec.set(app1, 'args', "/opt/10-by-p0d.avi --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts}'")
+        self.ec.set(app1, 'env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
+        self.ec.set(app1, 'xmppSlice', "nepi")
+        self.ec.set(app1, 'xmppHost', "xmpp-plexus.onelab.eu")
+        self.ec.set(app1, 'xmppPort', "5222")
+        self.ec.set(app1, 'xmppPassword', "1234")
+
+        self.assertEquals(self.ec.get(app1, 'appid'), 'Vlc#1')
+        self.assertEquals(self.ec.get(app1, 'path'), '/opt/vlc-1.1.13/cvlc')
+        self.assertEquals(self.ec.get(app1, 'args'), "/opt/10-by-p0d.avi --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts}'")
+        self.assertEquals(self.ec.get(app1, 'env'), 'DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority')
+        self.assertEquals(self.ec.get(app1, 'xmppSlice'), 'nepi')
+        self.assertEquals(self.ec.get(app1, 'xmppHost'), 'xmpp-plexus.onelab.eu')
+        self.assertEquals(self.ec.get(app1, 'xmppPort'), '5222')
+        self.assertEquals(self.ec.get(app1, 'xmppPassword'), '1234')
+
+    def test_connection(self):
+
+        node1 = self.ec.register_resource("OMFNode")
+        iface1 = self.ec.register_resource("OMFWifiInterface")
+        channel = self.ec.register_resource("OMFChannel")
+        app1 = self.ec.register_resource("OMFApplication")
+        app2 = self.ec.register_resource("OMFApplication")
+
+        self.ec.register_connection(app1, node1)
+        self.ec.register_connection(app2, node1)
+        self.ec.register_connection(node1, iface1)
+        self.ec.register_connection(iface1, channel)
+
+        self.assertEquals(len(self.ec.get_resource(node1).connections), 3)
+        self.assertEquals(len(self.ec.get_resource(iface1).connections), 2)
+        self.assertEquals(len(self.ec.get_resource(channel).connections), 1)
+        self.assertEquals(len(self.ec.get_resource(app1).connections), 1)
+        self.assertEquals(len(self.ec.get_resource(app2).connections), 1)
+
+    def test_condition(self):
+
+        node1 = self.ec.register_resource("OMFNode")
+        iface1 = self.ec.register_resource("OMFWifiInterface")
+        channel = self.ec.register_resource("OMFChannel")
+        app1 = self.ec.register_resource("OMFApplication")
+        app2 = self.ec.register_resource("OMFApplication")
+
+        self.ec.register_connection(app1, node1)
+        self.ec.register_connection(app2, node1)
+        self.ec.register_connection(node1, iface1)
+        self.ec.register_connection(iface1, channel)
+
+        # For the moment
+        self.ec.register_condition([iface1, channel], ResourceAction.START, node1, ResourceState.STARTED , 2)
+        self.ec.register_condition(channel, ResourceAction.START, iface1, ResourceState.STARTED , 1)
+        self.ec.register_condition(app1, ResourceAction.START, channel, ResourceState.STARTED , 1)
+
+        # Real test
+        self.ec.register_condition(app2, ResourceAction.START, app1, ResourceState.STARTED , 4)
+
+        self.assertEquals(len(self.ec.get_resource(node1).conditions), 0)
+        self.assertEquals(len(self.ec.get_resource(iface1).conditions), 1)
+        self.assertEquals(len(self.ec.get_resource(channel).conditions), 1)
+        self.assertEquals(len(self.ec.get_resource(app1).conditions), 1)
+
+
+    def xtest_deploy(self):
+        ec.deploy()
+
+    #In order to release everythings
+        time.sleep(45)
+        ec.shutdown()
 
 
 if __name__ == '__main__':
     unittest.main()
 
+
+