little changes
authorJulien Tribino <julien.tribino@inria.fr>
Wed, 24 Apr 2013 13:05:31 +0000 (15:05 +0200)
committerJulien Tribino <julien.tribino@inria.fr>
Wed, 24 Apr 2013 13:05:31 +0000 (15:05 +0200)
examples/automated_vlc_experiment_plexus.py
src/neco/execution/resource.py
src/neco/resources/omf/omf_application.py
src/neco/resources/omf/omf_channel.py
src/neco/resources/omf/omf_interface.py
src/neco/resources/omf/omf_node.py
src/neco/util/timefuncs.py
test/resources/omf/omf_vlc_exp.py

index 2337531..1cae09a 100644 (file)
@@ -111,18 +111,18 @@ ec.register_connection(app2, node2)
 
 # Condition
 #      Topology behaviour : It should not be done by the user, but ....
-ec.register_condition([iface1, iface2, channel], ResourceAction.START, [node1, node2], ResourceState.STARTED , 2)
-ec.register_condition(channel, ResourceAction.START, [iface1, iface2], ResourceState.STARTED , 1)
-ec.register_condition(app1, ResourceAction.START, channel, ResourceState.STARTED , 1)
+#ec.register_condition([iface1, iface2, channel], ResourceAction.START, [node1, node2], ResourceState.STARTED , 2)
+#ec.register_condition(channel, ResourceAction.START, [iface1, iface2], ResourceState.STARTED , 1)
+#ec.register_condition(app1, ResourceAction.START, channel, ResourceState.STARTED , 1)
 
 #      User Behaviour
-ec.register_condition(app2, ResourceAction.START, app1, ResourceState.STARTED , 4)
-ec.register_condition([app1, app2], ResourceAction.STOP, app2, ResourceState.STARTED , 20)
-ec.register_condition(app3, ResourceAction.START, app2, ResourceState.STARTED , 25)
+ec.register_condition(app2, ResourceAction.START, app1, ResourceState.STARTED , "4s")
+ec.register_condition([app1, app2], ResourceAction.STOP, app2, ResourceState.STARTED , "20s")
+ec.register_condition(app3, ResourceAction.START, app2, ResourceState.STARTED , "25s")
 
 # Deploy
 ec.deploy()
 
 # Stop Experiment
-time.sleep(45)
+time.sleep(50)
 ec.shutdown()
index 8d0ce3e..ad9a5c5 100644 (file)
@@ -1,5 +1,5 @@
 
-from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
+from neco.util.timefuncs import strfnow, strfdiff, strfvalid
 
 import copy
 import functools
@@ -10,8 +10,9 @@ import time as TIME
 _reschedule_delay = "1s"
 
 class ResourceAction:
-    START = 0
-    STOP = 1
+    DEPLOYED = 0
+    START = 1
+    STOP = 2
 
 class ResourceState:
     NEW = 0
@@ -31,6 +32,7 @@ class ResourceManager(object):
     _rtype = "Resource"
     _filters = None
     _attributes = None
+    _waiters = []
 
     @classmethod
     def _register_filter(cls, attr):
@@ -84,6 +86,10 @@ class ResourceManager(object):
     def rtype(cls):
         return cls._rtype
 
+    @classmethod
+    def waiters(cls):
+        return cls._waiters
+
     @classmethod
     def get_filters(cls):
         """ Returns a copy of the filters
@@ -146,6 +152,10 @@ class ResourceManager(object):
     def stop_time(self):
         return self._stop_time
 
+    @property
+    def deploy_time(self):
+        return self._deploy_time
+
     @property
     def state(self):
         return self._state
@@ -236,6 +246,9 @@ class ResourceManager(object):
         :param time: time to wait after the state
         :type time: str
 
+        .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ...
+        If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m".
+        For the moment, 2m30s is not a correct syntax.
 
         """
         reschedule = False
@@ -251,7 +264,9 @@ class ResourceManager(object):
                 break
 
             if time:
-                if state == ResourceState.STARTED:
+                if state == ResourceState.DEPLOYED:
+                    t = rm.deploy_time
+                elif state == ResourceState.STARTED:
                     t = rm.start_time
                 elif state == ResourceState.STOPPED:
                     t = rm.stop_time
@@ -259,10 +274,12 @@ class ResourceManager(object):
                     # Only keep time information for START and STOP
                     break
 
-                d = strfdiff(strfnow(), t) 
-                if d < time:
+                d = strfdiff(strfnow(), t)
+                #print "This is the value of d : " + str(d) + " // With the value of t : " + str(t) + " // With the value of time : " + str(time)
+                wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s"))
+                if wait > 0.001:
                     reschedule = True
-                    delay = "%ds" % (int(time - d) +1)
+                    delay = "%fs" % wait
                     break
         return reschedule, delay
 
@@ -318,7 +335,7 @@ class ResourceManager(object):
             print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") -----  start condition : " + str(self.conditions.items())
             # Need to separate because it could have more that tuple of condition 
             # for the same action.
-            conditions_start = self.conditions.get(ResourceAction.START, [])
+            conditions_start = self.conditions.get(ResourceAction.START, [])
             for (group, state, time) in conditions_start:
                 reschedule, delay = self._needs_reschedule(group, state, time)
                 if reschedule:
@@ -346,7 +363,7 @@ class ResourceManager(object):
         if self.state != ResourceState.STARTED:
             reschedule = True
         else:
-            #print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + "\
+            print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + "\
  (Guid : "+ str(self.guid) +")  ----  stop condition : " + str(self.conditions.items())
             conditions_stop = self.conditions.get(ResourceAction.STOP, []) 
             for (group, state, time) in conditions_stop:
@@ -357,16 +374,66 @@ class ResourceManager(object):
             callback = functools.partial(self.stop_with_conditions)
             self.ec.schedule(delay, callback)
         else:
+            print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----\
+------------------------------------------------------------------------------\
+----------------------------------------------------------------  STOPPING -- "
             self.stop()
 
     def deploy(self):
         """Execute all the differents steps required to reach the state DEPLOYED
 
         """
+        self.deploy_restriction()
         self.discover()
         self.provision()
+        self.deploy_with_conditions()
+
+    def deploy_restriction(self):
+        dep = set()
+        for guid in self.connections:
+            if self.ec.get_resource(guid).rtype() in self.__class__._waiters:
+                dep.add(guid)
+        self.register_condition(ResourceAction.DEPLOYED, dep, ResourceState.DEPLOYED)
+
+
+    def deploy_with_conditions(self):
+        """ Starts when all the conditions are reached
+
+        """
+        reschedule = False
+        delay = _reschedule_delay 
+
+        ## evaluate if set conditions are met
+
+        # only can deploy when RM is NEW
+        if not self._state in [ResourceState.NEW]:
+            self.logger.error("Wrong state %s for stop" % self.state)
+            return
+        else:
+            print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") -----  deploy condition : " + str(self.conditions.items())
+            # Need to separate because it could have more that tuple of condition 
+            # for the same action.
+            conditions_deployed = self.conditions.get(ResourceAction.DEPLOYED, [])
+            for (group, state, time) in conditions_deployed:
+                reschedule, delay = self._needs_reschedule(group, state, time)
+                if reschedule:
+                    break
+
+        if reschedule:
+            callback = functools.partial(self.deploy_with_conditions)
+            self.ec.schedule(delay, callback)
+        else:
+            print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----\
+------------------------------------------------------------------------------\
+----------------------------------------------------------------  DEPLOY -- "
+            self.deploy_action()
+
+    def deploy_action(self):
+
+        self._deploy_time = strfnow()
         self._state = ResourceState.DEPLOYED
 
+
     def release(self):
         """Clean the resource at the end of the Experiment and change the status
 
index 1e64974..8653fa3 100644 (file)
@@ -26,6 +26,7 @@ class OMFApplication(ResourceManager):
     """
     _rtype = "OMFApplication"
     _authorized_connections = ["OMFNode"]
+    _waiters = ["OMFNode", "OMFChannel", "OMFWifiInterface"]
 
     @classmethod
     def _register_attributes(cls):
@@ -109,13 +110,13 @@ class OMFApplication(ResourceManager):
                 return rm
         return None
 
-    def deploy(self):
+    def deploy_action(self):
         """Deploy the RM
 
         """
-        super(OMFApplication, self).deploy()
         self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
             self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+        super(OMFApplication, self).deploy_action()
 
     def start(self):
         """Send Xmpp Message Using OMF protocol to execute the application
index 192de27..111af67 100644 (file)
@@ -25,7 +25,9 @@ class OMFChannel(ResourceManager):
 
     """
     _rtype = "OMFChannel"
-    _authorized_connections = ["OMFWifiInterface"]
+    _authorized_connections = ["OMFWifiInterface", "OMFNode"]
+    _waiters = ["OMFNode", "OMFWifiInterface"]
+
 
     @classmethod
     def _register_attributes(cls):
@@ -89,7 +91,7 @@ class OMFChannel(ResourceManager):
         """
         for elt in conn_set:
             rm_iface = self.ec.get_resource(elt)
-            for conn in rm_iface._connections:
+            for conn in rm_iface.connections:
                 rm_node = self.ec.get_resource(conn)
                 if rm_node.rtype() == "OMFNode":
                     couple = [rm_node.get('hostname'), rm_iface.get('alias')]
@@ -97,14 +99,25 @@ class OMFChannel(ResourceManager):
                     self._nodes_guid.append(couple)
         return self._nodes_guid
 
-    def deploy(self):
+    def deploy_action(self):
         """Deploy the RM
 
         """
-        super(OMFChannel, self).deploy()
         self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
             self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
 
+        if self.get('channel'):
+            set_nodes = self._get_target(self._connections) 
+            print set_nodes
+            for couple in set_nodes:
+                #print "Couple node/alias : " + couple[0] + "  ,  " + couple[1]
+                attrval = self.get('channel')
+                attrname = "net/%s/%s" % (couple[1], 'channel')
+                #print "Send the configure message"
+                self._omf_api.configure(couple[0], attrname, attrval)
+
+        super(OMFChannel, self).deploy_action()
+
     def discover(self):
         """ Discover the availables channels
 
@@ -121,15 +134,7 @@ class OMFChannel(ResourceManager):
         """Send Xmpp Message Using OMF protocol to configure Channel
 
         """
-        if self.get('channel'):
-            set_nodes = self._get_target(self._connections) 
-            print set_nodes
-            for couple in set_nodes:
-                #print "Couple node/alias : " + couple[0] + "  ,  " + couple[1]
-                attrval = self.get('channel')
-                attrname = "net/%s/%s" % (couple[1], 'channel')
-                #print "Send the configure message"
-                self._omf_api.configure(couple[0], attrname, attrval)
+
         super(OMFChannel, self).start()
 
     def stop(self):
index bbfc045..d436635 100644 (file)
@@ -26,6 +26,7 @@ class OMFWifiInterface(ResourceManager):
     """
     _rtype = "OMFWifiInterface"
     _authorized_connections = ["OMFNode" , "OMFChannel"]
+    _waiters = ["OMFNode"]
 
     #alias2name = dict({'w0':'wlan0', 'w1':'wlan1'})
 
@@ -102,19 +103,13 @@ class OMFWifiInterface(ResourceManager):
                 return rm
         return None
 
-    def deploy(self):
+    def deploy_action(self):
         """Deploy the RM
 
         """
-        super(OMFWifiInterface, self).deploy()
         self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
             self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
 
-
-    def start(self):
-        """Send Xmpp Messages Using OMF protocol to configure Interface
-
-        """
         self._logger.debug(" " + self.rtype() + " ( Guid : " + str(self._guid) +") : " +
             self.get('mode') + " : " + self.get('type') + " : " +
             self.get('essid') + " : " + self.get('ip'))
@@ -126,6 +121,15 @@ class OMFWifiInterface(ResourceManager):
                 attrname = "net/%s/%s" % (self._alias, attrname)
                 #print "Send the configure message"
                 self._omf_api.configure(rm_node.get('hostname'), attrname, attrval)
+
+        super(OMFWifiInterface, self).deploy_action()
+
+
+    def start(self):
+        """Send Xmpp Messages Using OMF protocol to configure Interface
+
+        """
+
         super(OMFWifiInterface, self).start()
 
     def stop(self):
index 5bcdcdf..d5025a0 100644 (file)
@@ -6,6 +6,7 @@ from neco.resources.omf.omf_api import OMFAPIFactory
 
 import neco
 import logging
+import time
 
 @clsinit
 class OMFNode(ResourceManager):
@@ -26,6 +27,7 @@ class OMFNode(ResourceManager):
     """
     _rtype = "OMFNode"
     _authorized_connections = ["OMFApplication" , "OMFWifiInterface"]
+    _waiters = []
 
     @classmethod
     def _register_attributes(cls):
@@ -99,13 +101,15 @@ class OMFNode(ResourceManager):
             (self.rtype(), self._guid, rm.rtype(), guid))
         return False
 
-    def deploy(self):
+    def deploy_action(self):
         """Deploy the RM
 
         """ 
         self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
             self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
-        super(OMFNode, self).deploy()
+        self._omf_api.enroll_host(self.get('hostname'))
+
+        super(OMFNode, self).deploy_action()
 
     def discover(self):
         """ Discover the availables nodes
@@ -124,7 +128,7 @@ class OMFNode(ResourceManager):
 
         """
         super(OMFNode, self).start()
-        self._omf_api.enroll_host(self.get('hostname'))
+
 
     def stop(self):
         """Send Xmpp Message Using OMF protocol to disconnect the node
index b62c640..0cf4fcd 100644 (file)
@@ -24,7 +24,12 @@ def strfdiff(str1, str2):
     return (ret or 0.001)
 
 def strfvalid(date):
-    """ User defined date to scheduler date """
+    """ User defined date to scheduler date 
+    
+    :param date : user define date matchin the pattern _strf 
+    :type date : date 
+
+    """
     if not date:
         return strfnow()
     if _reabs.match(date):
index 9c990c4..5c37547 100755 (executable)
@@ -9,6 +9,7 @@ from neco.resources.omf.omf_channel import OMFChannel
 from neco.resources.omf.omf_api import OMFAPIFactory
 
 from neco.util import guid
+from neco.util.timefuncs import *
 
 import time
 import unittest
@@ -165,26 +166,93 @@ class OMFVLCTestCase(unittest.TestCase):
         self.ec.register_connection(node1, iface1)
         self.ec.register_connection(iface1, channel)
 
-        # For the moment
-        self.ec.register_condition([iface1, channel], ResourceAction.START, node1, ResourceState.STARTED , 2)
-        self.ec.register_condition(channel, ResourceAction.START, iface1, ResourceState.STARTED , 1)
-        self.ec.register_condition(app1, ResourceAction.START, channel, ResourceState.STARTED , 1)
+        self.ec.register_condition(app2, ResourceAction.START, app1, ResourceState.STARTED , "4s")
 
-        # Real test
-        self.ec.register_condition(app2, ResourceAction.START, app1, ResourceState.STARTED , 4)
+        self.assertEquals(len(self.ec.get_resource(app2).conditions), 1)
 
-        self.assertEquals(len(self.ec.get_resource(node1).conditions), 0)
-        self.assertEquals(len(self.ec.get_resource(iface1).conditions), 1)
-        self.assertEquals(len(self.ec.get_resource(channel).conditions), 1)
-        self.assertEquals(len(self.ec.get_resource(app1).conditions), 1)
+    def test_deploy(self):
+        node1 = self.ec.register_resource("OMFNode")
+        self.ec.set(node1, 'hostname', 'omf.plexus.wlab17')
+        self.ec.set(node1, 'xmppSlice', "nepi")
+        self.ec.set(node1, 'xmppHost', "xmpp-plexus.onelab.eu")
+        self.ec.set(node1, 'xmppPort', "5222")
+        self.ec.set(node1, 'xmppPassword', "1234")
+        
+        iface1 = self.ec.register_resource("OMFWifiInterface")
+        self.ec.set(iface1, 'alias', "w0")
+        self.ec.set(iface1, 'mode', "adhoc")
+        self.ec.set(iface1, 'type', "g")
+        self.ec.set(iface1, 'essid', "vlcexp")
+        self.ec.set(iface1, 'ip', "10.0.0.17")
+        self.ec.set(iface1, 'xmppSlice', "nepi")
+        self.ec.set(iface1, 'xmppHost', "xmpp-plexus.onelab.eu")
+        self.ec.set(iface1, 'xmppPort', "5222")
+        self.ec.set(iface1, 'xmppPassword', "1234")
+        
+        channel = self.ec.register_resource("OMFChannel")
+        self.ec.set(channel, 'channel', "6")
+        self.ec.set(channel, 'xmppSlice', "nepi")
+        self.ec.set(channel, 'xmppHost', "xmpp-plexus.onelab.eu")
+        self.ec.set(channel, 'xmppPort', "5222")
+        self.ec.set(channel, 'xmppPassword', "1234")
+        
+        app1 = self.ec.register_resource("OMFApplication")
+        self.ec.set(app1, 'xmppSlice', "nepi")
+        self.ec.set(app1, 'xmppHost', "xmpp-plexus.onelab.eu")
+        self.ec.set(app1, 'xmppPort', "5222")
+        self.ec.set(app1, 'xmppPassword', "1234")
+
+        app2 = self.ec.register_resource("OMFApplication")
+        self.ec.set(app2, 'xmppSlice', "nepi")
+        self.ec.set(app2, 'xmppHost', "xmpp-plexus.onelab.eu")
+        self.ec.set(app2, 'xmppPort', "5222")
+        self.ec.set(app2, 'xmppPassword', "1234")
+
+        app3 = self.ec.register_resource("OMFApplication")
+        self.ec.set(app3, 'xmppSlice', "nepi")
+        self.ec.set(app3, 'xmppHost', "xmpp-plexus.onelab.eu")
+        self.ec.set(app3, 'xmppPort', "5222")
+        self.ec.set(app3, 'xmppPassword', "1234")
+
+        app4 = self.ec.register_resource("OMFApplication")
+        self.ec.set(app4, 'xmppSlice', "nepi")
+        self.ec.set(app4, 'xmppHost', "xmpp-plexus.onelab.eu")
+        self.ec.set(app4, 'xmppPort', "5222")
+        self.ec.set(app4, 'xmppPassword', "1234")
+
+        app5 = self.ec.register_resource("OMFApplication")
+        self.ec.set(app5, 'xmppSlice', "nepi")
+        self.ec.set(app5, 'xmppHost', "xmpp-plexus.onelab.eu")
+        self.ec.set(app5, 'xmppPort', "5222")
+        self.ec.set(app5, 'xmppPassword', "1234")
+
+        self.ec.register_connection(app1, node1)
+        self.ec.register_connection(app2, node1)
+        self.ec.register_connection(app3, node1)
+        self.ec.register_connection(app4, node1)
+        self.ec.register_connection(app5, node1)
+        self.ec.register_connection(node1, iface1)
+        self.ec.register_connection(iface1, channel)
+
+        self.ec.register_condition(app2, ResourceAction.START, app1, ResourceState.STARTED , "3s")
+        self.ec.register_condition(app3, ResourceAction.START, app2, ResourceState.STARTED , "2s")
+        self.ec.register_condition(app4, ResourceAction.START, app3, ResourceState.STARTED , "3s")
+        self.ec.register_condition(app5, ResourceAction.START, [app3, app2], ResourceState.STARTED , "2s")
+        self.ec.register_condition(app5, ResourceAction.START, app1, ResourceState.STARTED , "1m20s")
 
+        self.ec.deploy()
+        time.sleep(150)
 
-    def xtest_deploy(self):
-        ec.deploy()
+        self.assertEquals(round(strfdiff(self.ec.get_resource(app2).start_time, self.ec.get_resource(app1).start_time),1), 3.0)
+        self.assertEquals(round(strfdiff(self.ec.get_resource(app3).start_time, self.ec.get_resource(app2).start_time),1), 2.0)
+        self.assertEquals(round(strfdiff(self.ec.get_resource(app4).start_time, self.ec.get_resource(app3).start_time),1), 3.0)
+        self.assertEquals(round(strfdiff(self.ec.get_resource(app5).start_time, self.ec.get_resource(app3).start_time),1), 2.0)
+        self.assertEquals(round(strfdiff(self.ec.get_resource(app5).start_time, self.ec.get_resource(app1).start_time),1), 7.0)
 
+        # Precision is at 1/10. So this one returns an error 7.03 != 7.0
+        #self.assertEquals(strfdiff(self.ec.get_resource(app5).start_time, self.ec.get_resource(app1).start_time), 7)
     #In order to release everythings
-        time.sleep(45)
-        ec.shutdown()
+        time.sleep(5)
 
 
 if __name__ == '__main__':