debug the omf part. Test are still missing
authorJulien Tribino <julien.tribino@inria.fr>
Wed, 29 May 2013 13:06:44 +0000 (15:06 +0200)
committerJulien Tribino <julien.tribino@inria.fr>
Wed, 29 May 2013 13:06:44 +0000 (15:06 +0200)
examples/omf/automated_vlc_experiment_plexus.py
src/nepi/execution/resource.py
src/nepi/resources/omf/application.py
src/nepi/resources/omf/channel.py
src/nepi/resources/omf/interface.py
src/nepi/resources/omf/node.py
src/nepi/resources/omf/omf_api.py
src/nepi/resources/omf/omf_client.py
test/resources/omf/vlc.py

index 1b65e37..43750a6 100644 (file)
@@ -18,7 +18,7 @@
 """
 
 #!/usr/bin/env python
-from nepi.execution.resource import ResourceFactory, ResourceAction, ResourceState
+from nepi.execution.resource import ResourceFactory, ResourceAction, ResourceState, populate_factory
 from nepi.execution.ec import ExperimentController
 
 from nepi.resources.omf.node import OMFNode
@@ -35,10 +35,7 @@ logging.basicConfig()
 ec = ExperimentController()
 
 # Register the different RM that will be used
-ResourceFactory.register_type(OMFNode)
-ResourceFactory.register_type(OMFWifiInterface)
-ResourceFactory.register_type(OMFChannel)
-ResourceFactory.register_type(OMFApplication)
+populate_factory()
 
 # Create and Configure the Nodes
 node1 = ec.register_resource("OMFNode")
@@ -101,6 +98,7 @@ app1 = ec.register_resource("OMFApplication")
 ec.set(app1, 'appid', 'Vlc#1')
 ec.set(app1, 'path', "/opt/vlc-1.1.13/cvlc")
 ec.set(app1, 'args', "/opt/10-by-p0d.avi --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts}'")
+#ec.set(app1, 'args', "--quiet /opt/big_buck_bunny_240p_mpeg4.ts --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts} '")
 ec.set(app1, 'env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
 ec.set(app1, 'xmppSlice', "nepi")
 ec.set(app1, 'xmppHost', "xmpp-plexus.onelab.eu")
@@ -110,7 +108,7 @@ ec.set(app1, 'xmppPassword', "1234")
 app2 = ec.register_resource("OMFApplication")
 ec.set(app2, 'appid', 'Vlc#2')
 ec.set(app2, 'path', "/opt/vlc-1.1.13/cvlc")
-ec.set(app2, 'args', "rtp://10.0.0.37:1234")
+ec.set(app2, 'args', "--quiet rtp://10.0.0.37:1234")
 ec.set(app2, 'env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
 ec.set(app2, 'xmppSlice', "nepi")
 ec.set(app2, 'xmppHost', "xmpp-plexus.onelab.eu")
@@ -136,20 +134,17 @@ ec.register_connection(iface2, channel)
 ec.register_connection(node2, iface2)
 ec.register_connection(app2, node2)
 
-# Condition
-#      Topology behaviour : It should not be done by the user, but ....
-#ec.register_condition([iface1, iface2, channel], ResourceAction.START, [node1, node2], ResourceState.STARTED , 2)
-#ec.register_condition(channel, ResourceAction.START, [iface1, iface2], ResourceState.STARTED , 1)
-#ec.register_condition(app1, ResourceAction.START, channel, ResourceState.STARTED , 1)
-
 #      User Behaviour
 ec.register_condition(app2, ResourceAction.START, app1, ResourceState.STARTED , "4s")
-ec.register_condition([app1, app2], ResourceAction.STOP, app2, ResourceState.STARTED , "20s")
+ec.register_condition([app1, app2], ResourceAction.STOP, app2, ResourceState.STARTED , "22s")
 ec.register_condition(app3, ResourceAction.START, app2, ResourceState.STARTED , "25s")
+ec.register_condition(app3, ResourceAction.STOP, app3, ResourceState.STARTED , "1s")
 
 # Deploy
 ec.deploy()
 
+ec.wait_finished([app1, app2, app3])
+
 # Stop Experiment
-time.sleep(50)
+#time.sleep(55)
 ec.shutdown()
index a76bbc6..f41b070 100644 (file)
@@ -531,7 +531,7 @@ class ResourceManager(Logger):
             callback = functools.partial(self.stop_with_conditions)
             self.ec.schedule(delay, callback)
         else:
-            self.logger.debug(" ----- STOPPING ---- ") 
+            self.debug(" ----- STOPPING ---- ") 
             self.stop()
 
     def deploy(self):
index f65b354..9d2aef1 100644 (file)
 
 """
 
-from nepi.execution.resource import ResourceManager, clsinit
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState
 from nepi.execution.attribute import Attribute, Flags 
 from nepi.resources.omf.omf_api import OMFAPIFactory
 
+reschedule_delay = "0.5s"
+
 @clsinit
 class OMFApplication(ResourceManager):
     """
@@ -40,7 +42,6 @@ class OMFApplication(ResourceManager):
     """
     _rtype = "OMFApplication"
     _authorized_connections = ["OMFNode"]
-    _waiters = ["OMFNode", "OMFChannel", "OMFWifiInterface"]
 
     @classmethod
     def _register_attributes(cls):
@@ -98,31 +99,17 @@ class OMFApplication(ResourceManager):
         rm = self.ec.get_resource(guid)
         if rm.rtype() not in self._authorized_connections:
             msg = "Connection between %s %s and %s %s refused : An Application can be connected only to a Node" % (self.rtype(), self._guid, rm.rtype(), guid)
-            self._logger.debug(msg)
+            self.debug(msg)
             return False
         elif len(self.connections) != 0 :
             msg = "Connection between %s %s and %s %s refused : Already Connected" % (self.rtype(), self._guid, rm.rtype(), guid)
-            self._logger.debug(msg)
+            self.debug(msg)
             return False
         else :
             msg = "Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid)
-            self._logger.debug(msg)
+            self.debug(msg)
             return True
 
-    def _get_nodes(self, conn_set):
-        """Get the RM of the node to which the application is connected
-
-        :param conn_set: Connections of the current Guid
-        :type conn_set: set
-        :rtype: ResourceManager
-        """
-
-        for elt in conn_set:
-            rm = self.ec.get_resource(elt)
-            if rm.rtype() == "OMFNode":
-                return rm
-        return None
-
     def deploy(self):
         """Deploy the RM
 
@@ -137,20 +124,27 @@ class OMFApplication(ResourceManager):
         """
         super(OMFApplication, self).start()
         msg = " " + self.rtype() + " ( Guid : " + str(self._guid) +") : " + self.get('appid') + " : " + self.get('path') + " : " + self.get('args') + " : " + self.get('env')
-        self.debug(msg)
+        self.info(msg)
 
         if self.get('appid') and self.get('path') and self.get('args') and self.get('env'):
-            rm_node = self._get_nodes(self._connections)
-            self._omf_api.execute(rm_node.get('hostname'),self.get('appid'), self.get('args'), self.get('path'), self.get('env'))
+            rm_list = self.get_connected("OMFNode")
+            for rm_node in rm_list:
+                self._omf_api.execute(rm_node.get('hostname'),self.get('appid'), self.get('args'), self.get('path'), self.get('env'))
+        else :
+            msg = "Credentials are not initialized"
+            self.error(msg)
 
     def stop(self):
         """Send Xmpp Message Using OMF protocol to kill the application
 
         """
 
-        rm_node = self._get_nodes(self._connections)
-        self._omf_api.exit(rm_node.get('hostname'),self.get('appid'))
+        rm_list = self.get_connected("OMFNode")
+        for rm_node in rm_list :
+            self._omf_api.exit(rm_node.get('hostname'),self.get('appid'))
         super(OMFApplication, self).stop()
+        self._state = ResourceState.FINISHED
+        
 
     def release(self):
         """Clean the RM at the end of the experiment
index d81cf6c..f1173c9 100644 (file)
 
 """
 
-from nepi.execution.resource import ResourceManager, clsinit
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState
 from nepi.execution.attribute import Attribute, Flags 
 
 from nepi.resources.omf.omf_api import OMFAPIFactory
 
-import nepi
-import logging
+reschedule_delay = "0.5s"
 
 @clsinit
 class OMFChannel(ResourceManager):
@@ -44,7 +43,6 @@ class OMFChannel(ResourceManager):
     """
     _rtype = "OMFChannel"
     _authorized_connections = ["OMFWifiInterface", "OMFNode"]
-    _waiters = ["OMFNode", "OMFWifiInterface"]
 
 
     @classmethod
@@ -78,9 +76,6 @@ class OMFChannel(ResourceManager):
 
         self._omf_api = None
 
-        self._logger = logging.getLogger("nepi.omf.omfChannel")
-        self._logger.setLevel(nepi.LOGLEVEL)
-
     def _validate_connection(self, guid):
         """Check if the connection is available.
 
@@ -113,6 +108,8 @@ class OMFChannel(ResourceManager):
             for conn in rm_iface.connections:
                 rm_node = self.ec.get_resource(conn)
                 if rm_node.rtype() == "OMFNode":
+                    if rm_iface.state < ResourceState.READY or  rm_node.state < ResourceState.READY:
+                        return "reschedule"
                     couple = [rm_node.get('hostname'), rm_iface.get('alias')]
                     #print couple
                     self._nodes_guid.append(couple)
@@ -122,17 +119,20 @@ class OMFChannel(ResourceManager):
         """Deploy the RM
 
         """
-        self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
-            self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+        if not self._omf_api :
+            self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
+                self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
 
         if self.get('channel'):
             set_nodes = self._get_target(self._connections) 
+            if set_nodes == "reschedule" :
+                self.ec.schedule(reschedule_delay, self.deploy)
+                return
             print set_nodes
             for couple in set_nodes:
                 #print "Couple node/alias : " + couple[0] + "  ,  " + couple[1]
                 attrval = self.get('channel')
                 attrname = "net/%s/%s" % (couple[1], 'channel')
-                #print "Send the configure message"
                 self._omf_api.configure(couple[0], attrname, attrval)
 
         super(OMFChannel, self).deploy()
index 1bb539c..a512c81 100644 (file)
 
 """
 
-from nepi.execution.resource import ResourceManager, clsinit
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState
 from nepi.execution.attribute import Attribute, Flags 
 
 from nepi.resources.omf.omf_api import OMFAPIFactory
 
+reschedule_delay = "0.5s"
 
 @clsinit
 class OMFWifiInterface(ResourceManager):
@@ -42,7 +43,6 @@ class OMFWifiInterface(ResourceManager):
     """
     _rtype = "OMFWifiInterface"
     _authorized_connections = ["OMFNode" , "OMFChannel"]
-    _waiters = ["OMFNode"]
 
     #alias2name = dict({'w0':'wlan0', 'w1':'wlan1'})
 
@@ -120,15 +120,20 @@ class OMFWifiInterface(ResourceManager):
         """Deploy the RM
 
         """
-        self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
-            self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+        if not self._omf_api :
+            self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
+                self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
 
         self._logger.debug(" " + self.rtype() + " ( Guid : " + str(self._guid) +") : " +
             self.get('mode') + " : " + self.get('type') + " : " +
             self.get('essid') + " : " + self.get('ip'))
         #try:
         if self.get('mode') and self.get('type') and self.get('essid') and self.get('ip'):
-            rm_node = self._get_nodes(self._connections)    
+            rm_list = self.get_connected("OMFNode") 
+            for rm_node in rm_list:
+                if rm_node.state < ResourceState.READY:
+                    self.ec.schedule(reschedule_delay, self.deploy)
+                    return 
             for attrname in ["mode", "type", "essid", "ip"]:
                 attrval = self.get(attrname)
                 attrname = "net/%s/%s" % (self._alias, attrname)
index d4da2c1..bd88c39 100644 (file)
 
 """
 
-from nepi.execution.resource import ResourceManager, clsinit
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState
 from nepi.execution.attribute import Attribute, Flags 
 
 from nepi.resources.omf.omf_api import OMFAPIFactory
 
 import time
 
+reschedule_delay = "0.5s"
+
 @clsinit
 class OMFNode(ResourceManager):
     """
@@ -43,7 +45,6 @@ class OMFNode(ResourceManager):
     """
     _rtype = "OMFNode"
     _authorized_connections = ["OMFApplication" , "OMFWifiInterface"]
-    _waiters = []
 
     @classmethod
     def _register_attributes(cls):
@@ -123,8 +124,9 @@ class OMFNode(ResourceManager):
         """Deploy the RM
 
         """ 
-        self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
-            self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+        if not self._omf_api :
+            self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
+                self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
         self._omf_api.enroll_host(self.get('hostname'))
 
         super(OMFNode, self).deploy()
index df4fe03..04d7839 100644 (file)
@@ -319,6 +319,7 @@ class OMFAPIFactory(object):
             key = cls._make_key(slice, host, port, password)
             cls.lock.acquire()
             if key in cls._apis:
+                #print "Api Counter : " + str(cls._apis[key]['cnt'])
                 cls._apis[key]['cnt'] += 1
                 cls.lock.release()
                 return cls._apis[key]['api']
index f20132d..829603a 100644 (file)
@@ -102,7 +102,7 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger):
             self._registered = True
         except IqError as e:
             msg = " Could not register account: %s" % e.iq['error']['text']
-            selferror(msg)
+            self.error(msg)
         except IqTimeout:
             msg = " No response from server."
             self.error(msg)
@@ -131,7 +131,7 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger):
             result = self['xep_0060'].get_nodes(self._server)
             for item in result['disco_items']['items']:
                 msg = ' - %s' % str(item)
-                self.info(msg)
+                self.debug(msg)
             return result
         except:
             error = traceback.format_exc()
@@ -147,7 +147,7 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger):
                 #self.boundjid.full)
             for node in result['node']:
                 msg = ' - %s' % str(node)
-                self.info(msg)
+                self.debug(msg)
             return result
         except:
             error = traceback.format_exc()
@@ -161,7 +161,8 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger):
         :type node: str
 
         """
-        self.debug(" Create Topic : " + node)
+        msg = " Create Topic : " + node
+        self.info(msg)
    
         config = self['xep_0004'].makeForm('submit')
         config.add_field(var='pubsub#node_type', value='leaf')
@@ -208,7 +209,7 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger):
         """ 
 
         msg = " Publish to Topic : " + node
-        self.debug(msg)
+        self.info(msg)
         try:
             result = self['xep_0060'].publish(self._server,node,payload=data)
             # id = result['pubsub']['publish']['item']['id']
@@ -231,7 +232,7 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger):
                 data)
             for item in result['pubsub']['items']['substanzas']:
                 msg = 'Retrieved item %s: %s' % (item['id'], tostring(item['payload']))
-                self.info(msg)
+                self.debug(msg)
         except:
             error = traceback.format_exc()
             msg = ' Could not retrieve item %s from topic %s\ntraceback:\n%s' \
@@ -248,7 +249,7 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger):
         try:
             result = self['xep_0060'].retract(self._server, self.boundjid, data)
             msg = ' Retracted item %s from topic %s' % (data, self.boundjid)
-            self.info(msg)
+            self.debug(msg)
         except:
             error = traceback.format_exc()
             msg = 'Could not retract item %s from topic %s\ntraceback:\n%s' \
@@ -262,7 +263,7 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger):
         try:
             result = self['xep_0060'].purge(self._server, self.boundjid)
             msg = ' Purged all items from topic %s' % self.boundjid
-            self.info(msg)
+            self.debug(msg)
         except:
             error = traceback.format_exc()
             msg = ' Could not purge items from topic %s\ntraceback:\n%s' \
@@ -280,8 +281,8 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger):
             result = self['xep_0060'].subscribe(self._server, node)
             msg = ' Subscribed %s to topic %s' \
                     % (self.boundjid.user, node)
-            self.info(msg)
-            #self.debug(msg)
+            #self.info(msg)
+            self.debug(msg)
         except:
             error = traceback.format_exc()
             msg = ' Could not subscribe %s to topic %s\ntraceback:\n%s' \
@@ -298,7 +299,7 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger):
         try:
             result = self['xep_0060'].unsubscribe(self._server, node)
             msg = ' Unsubscribed %s from topic %s' % (self.boundjid.bare, node)
-            self.info(msg)
+            self.debug(msg)
         except:
             error = traceback.format_exc()
             msg = ' Could not unsubscribe %s from topic %s\ntraceback:\n%s' \
index df6af6c..e1ed3e8 100755 (executable)
 from nepi.execution.resource import ResourceFactory, ResourceManager, ResourceAction, ResourceState
 from nepi.execution.ec import ExperimentController
 
-from nepi.resources.omf.omf_node import OMFNode
-from nepi.resources.omf.omf_application import OMFApplication
-from nepi.resources.omf.omf_interface import OMFWifiInterface
-from nepi.resources.omf.omf_channel import OMFChannel
+from nepi.resources.omf.node import OMFNode
+from nepi.resources.omf.application import OMFApplication
+from nepi.resources.omf.interface import OMFWifiInterface
+from nepi.resources.omf.channel import OMFChannel
 from nepi.resources.omf.omf_api import OMFAPIFactory
 
 from nepi.util import guid
@@ -257,10 +257,10 @@ class OMFVLCTestCase(unittest.TestCase):
         self.ec.register_condition(app3, ResourceAction.START, app2, ResourceState.STARTED , "2s")
         self.ec.register_condition(app4, ResourceAction.START, app3, ResourceState.STARTED , "3s")
         self.ec.register_condition(app5, ResourceAction.START, [app3, app2], ResourceState.STARTED , "2s")
-        self.ec.register_condition(app5, ResourceAction.START, app1, ResourceState.STARTED , "1m20s")
+        self.ec.register_condition(app5, ResourceAction.START, app1, ResourceState.STARTED , "25s")
 
         self.ec.deploy()
-        time.sleep(150)
+        time.sleep(60)
 
         self.assertEquals(round(strfdiff(self.ec.get_resource(app2).start_time, self.ec.get_resource(app1).start_time),1), 3.0)
         self.assertEquals(round(strfdiff(self.ec.get_resource(app3).start_time, self.ec.get_resource(app2).start_time),1), 2.0)