From 48491bfa520c658632d72dee0ad938cb7409e4e5 Mon Sep 17 00:00:00 2001 From: Julien Tribino Date: Wed, 29 May 2013 15:06:44 +0200 Subject: [PATCH] debug the omf part. Test are still missing --- .../omf/automated_vlc_experiment_plexus.py | 23 ++++------ src/nepi/execution/resource.py | 2 +- src/nepi/resources/omf/application.py | 42 ++++++++----------- src/nepi/resources/omf/channel.py | 20 ++++----- src/nepi/resources/omf/interface.py | 15 ++++--- src/nepi/resources/omf/node.py | 10 +++-- src/nepi/resources/omf/omf_api.py | 1 + src/nepi/resources/omf/omf_client.py | 23 +++++----- test/resources/omf/vlc.py | 12 +++--- 9 files changed, 73 insertions(+), 75 deletions(-) diff --git a/examples/omf/automated_vlc_experiment_plexus.py b/examples/omf/automated_vlc_experiment_plexus.py index 1b65e37e..43750a6f 100644 --- a/examples/omf/automated_vlc_experiment_plexus.py +++ b/examples/omf/automated_vlc_experiment_plexus.py @@ -18,7 +18,7 @@ """ #!/usr/bin/env python -from nepi.execution.resource import ResourceFactory, ResourceAction, ResourceState +from nepi.execution.resource import ResourceFactory, ResourceAction, ResourceState, populate_factory from nepi.execution.ec import ExperimentController from nepi.resources.omf.node import OMFNode @@ -35,10 +35,7 @@ logging.basicConfig() ec = ExperimentController() # Register the different RM that will be used -ResourceFactory.register_type(OMFNode) -ResourceFactory.register_type(OMFWifiInterface) -ResourceFactory.register_type(OMFChannel) -ResourceFactory.register_type(OMFApplication) +populate_factory() # Create and Configure the Nodes node1 = ec.register_resource("OMFNode") @@ -101,6 +98,7 @@ app1 = ec.register_resource("OMFApplication") ec.set(app1, 'appid', 'Vlc#1') ec.set(app1, 'path', "/opt/vlc-1.1.13/cvlc") ec.set(app1, 'args', "/opt/10-by-p0d.avi --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts}'") +#ec.set(app1, 'args', "--quiet /opt/big_buck_bunny_240p_mpeg4.ts --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts} '") ec.set(app1, 'env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority") ec.set(app1, 'xmppSlice', "nepi") ec.set(app1, 'xmppHost', "xmpp-plexus.onelab.eu") @@ -110,7 +108,7 @@ ec.set(app1, 'xmppPassword', "1234") app2 = ec.register_resource("OMFApplication") ec.set(app2, 'appid', 'Vlc#2') ec.set(app2, 'path', "/opt/vlc-1.1.13/cvlc") -ec.set(app2, 'args', "rtp://10.0.0.37:1234") +ec.set(app2, 'args', "--quiet rtp://10.0.0.37:1234") ec.set(app2, 'env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority") ec.set(app2, 'xmppSlice', "nepi") ec.set(app2, 'xmppHost', "xmpp-plexus.onelab.eu") @@ -136,20 +134,17 @@ ec.register_connection(iface2, channel) ec.register_connection(node2, iface2) ec.register_connection(app2, node2) -# Condition -# Topology behaviour : It should not be done by the user, but .... -#ec.register_condition([iface1, iface2, channel], ResourceAction.START, [node1, node2], ResourceState.STARTED , 2) -#ec.register_condition(channel, ResourceAction.START, [iface1, iface2], ResourceState.STARTED , 1) -#ec.register_condition(app1, ResourceAction.START, channel, ResourceState.STARTED , 1) - # User Behaviour ec.register_condition(app2, ResourceAction.START, app1, ResourceState.STARTED , "4s") -ec.register_condition([app1, app2], ResourceAction.STOP, app2, ResourceState.STARTED , "20s") +ec.register_condition([app1, app2], ResourceAction.STOP, app2, ResourceState.STARTED , "22s") ec.register_condition(app3, ResourceAction.START, app2, ResourceState.STARTED , "25s") +ec.register_condition(app3, ResourceAction.STOP, app3, ResourceState.STARTED , "1s") # Deploy ec.deploy() +ec.wait_finished([app1, app2, app3]) + # Stop Experiment -time.sleep(50) +#time.sleep(55) ec.shutdown() diff --git a/src/nepi/execution/resource.py b/src/nepi/execution/resource.py index a76bbc66..f41b0709 100644 --- a/src/nepi/execution/resource.py +++ b/src/nepi/execution/resource.py @@ -531,7 +531,7 @@ class ResourceManager(Logger): callback = functools.partial(self.stop_with_conditions) self.ec.schedule(delay, callback) else: - self.logger.debug(" ----- STOPPING ---- ") + self.debug(" ----- STOPPING ---- ") self.stop() def deploy(self): diff --git a/src/nepi/resources/omf/application.py b/src/nepi/resources/omf/application.py index f65b3547..9d2aef1a 100644 --- a/src/nepi/resources/omf/application.py +++ b/src/nepi/resources/omf/application.py @@ -17,10 +17,12 @@ """ -from nepi.execution.resource import ResourceManager, clsinit +from nepi.execution.resource import ResourceManager, clsinit, ResourceState from nepi.execution.attribute import Attribute, Flags from nepi.resources.omf.omf_api import OMFAPIFactory +reschedule_delay = "0.5s" + @clsinit class OMFApplication(ResourceManager): """ @@ -40,7 +42,6 @@ class OMFApplication(ResourceManager): """ _rtype = "OMFApplication" _authorized_connections = ["OMFNode"] - _waiters = ["OMFNode", "OMFChannel", "OMFWifiInterface"] @classmethod def _register_attributes(cls): @@ -98,31 +99,17 @@ class OMFApplication(ResourceManager): rm = self.ec.get_resource(guid) if rm.rtype() not in self._authorized_connections: msg = "Connection between %s %s and %s %s refused : An Application can be connected only to a Node" % (self.rtype(), self._guid, rm.rtype(), guid) - self._logger.debug(msg) + self.debug(msg) return False elif len(self.connections) != 0 : msg = "Connection between %s %s and %s %s refused : Already Connected" % (self.rtype(), self._guid, rm.rtype(), guid) - self._logger.debug(msg) + self.debug(msg) return False else : msg = "Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid) - self._logger.debug(msg) + self.debug(msg) return True - def _get_nodes(self, conn_set): - """Get the RM of the node to which the application is connected - - :param conn_set: Connections of the current Guid - :type conn_set: set - :rtype: ResourceManager - """ - - for elt in conn_set: - rm = self.ec.get_resource(elt) - if rm.rtype() == "OMFNode": - return rm - return None - def deploy(self): """Deploy the RM @@ -137,20 +124,27 @@ class OMFApplication(ResourceManager): """ super(OMFApplication, self).start() msg = " " + self.rtype() + " ( Guid : " + str(self._guid) +") : " + self.get('appid') + " : " + self.get('path') + " : " + self.get('args') + " : " + self.get('env') - self.debug(msg) + self.info(msg) if self.get('appid') and self.get('path') and self.get('args') and self.get('env'): - rm_node = self._get_nodes(self._connections) - self._omf_api.execute(rm_node.get('hostname'),self.get('appid'), self.get('args'), self.get('path'), self.get('env')) + rm_list = self.get_connected("OMFNode") + for rm_node in rm_list: + self._omf_api.execute(rm_node.get('hostname'),self.get('appid'), self.get('args'), self.get('path'), self.get('env')) + else : + msg = "Credentials are not initialized" + self.error(msg) def stop(self): """Send Xmpp Message Using OMF protocol to kill the application """ - rm_node = self._get_nodes(self._connections) - self._omf_api.exit(rm_node.get('hostname'),self.get('appid')) + rm_list = self.get_connected("OMFNode") + for rm_node in rm_list : + self._omf_api.exit(rm_node.get('hostname'),self.get('appid')) super(OMFApplication, self).stop() + self._state = ResourceState.FINISHED + def release(self): """Clean the RM at the end of the experiment diff --git a/src/nepi/resources/omf/channel.py b/src/nepi/resources/omf/channel.py index d81cf6c7..f1173c9f 100644 --- a/src/nepi/resources/omf/channel.py +++ b/src/nepi/resources/omf/channel.py @@ -17,13 +17,12 @@ """ -from nepi.execution.resource import ResourceManager, clsinit +from nepi.execution.resource import ResourceManager, clsinit, ResourceState from nepi.execution.attribute import Attribute, Flags from nepi.resources.omf.omf_api import OMFAPIFactory -import nepi -import logging +reschedule_delay = "0.5s" @clsinit class OMFChannel(ResourceManager): @@ -44,7 +43,6 @@ class OMFChannel(ResourceManager): """ _rtype = "OMFChannel" _authorized_connections = ["OMFWifiInterface", "OMFNode"] - _waiters = ["OMFNode", "OMFWifiInterface"] @classmethod @@ -78,9 +76,6 @@ class OMFChannel(ResourceManager): self._omf_api = None - self._logger = logging.getLogger("nepi.omf.omfChannel") - self._logger.setLevel(nepi.LOGLEVEL) - def _validate_connection(self, guid): """Check if the connection is available. @@ -113,6 +108,8 @@ class OMFChannel(ResourceManager): for conn in rm_iface.connections: rm_node = self.ec.get_resource(conn) if rm_node.rtype() == "OMFNode": + if rm_iface.state < ResourceState.READY or rm_node.state < ResourceState.READY: + return "reschedule" couple = [rm_node.get('hostname'), rm_iface.get('alias')] #print couple self._nodes_guid.append(couple) @@ -122,17 +119,20 @@ class OMFChannel(ResourceManager): """Deploy the RM """ - self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), - self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) + if not self._omf_api : + self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), + self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) if self.get('channel'): set_nodes = self._get_target(self._connections) + if set_nodes == "reschedule" : + self.ec.schedule(reschedule_delay, self.deploy) + return print set_nodes for couple in set_nodes: #print "Couple node/alias : " + couple[0] + " , " + couple[1] attrval = self.get('channel') attrname = "net/%s/%s" % (couple[1], 'channel') - #print "Send the configure message" self._omf_api.configure(couple[0], attrname, attrval) super(OMFChannel, self).deploy() diff --git a/src/nepi/resources/omf/interface.py b/src/nepi/resources/omf/interface.py index 1bb539c2..a512c81b 100644 --- a/src/nepi/resources/omf/interface.py +++ b/src/nepi/resources/omf/interface.py @@ -17,11 +17,12 @@ """ -from nepi.execution.resource import ResourceManager, clsinit +from nepi.execution.resource import ResourceManager, clsinit, ResourceState from nepi.execution.attribute import Attribute, Flags from nepi.resources.omf.omf_api import OMFAPIFactory +reschedule_delay = "0.5s" @clsinit class OMFWifiInterface(ResourceManager): @@ -42,7 +43,6 @@ class OMFWifiInterface(ResourceManager): """ _rtype = "OMFWifiInterface" _authorized_connections = ["OMFNode" , "OMFChannel"] - _waiters = ["OMFNode"] #alias2name = dict({'w0':'wlan0', 'w1':'wlan1'}) @@ -120,15 +120,20 @@ class OMFWifiInterface(ResourceManager): """Deploy the RM """ - self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), - self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) + if not self._omf_api : + self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), + self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) self._logger.debug(" " + self.rtype() + " ( Guid : " + str(self._guid) +") : " + self.get('mode') + " : " + self.get('type') + " : " + self.get('essid') + " : " + self.get('ip')) #try: if self.get('mode') and self.get('type') and self.get('essid') and self.get('ip'): - rm_node = self._get_nodes(self._connections) + rm_list = self.get_connected("OMFNode") + for rm_node in rm_list: + if rm_node.state < ResourceState.READY: + self.ec.schedule(reschedule_delay, self.deploy) + return for attrname in ["mode", "type", "essid", "ip"]: attrval = self.get(attrname) attrname = "net/%s/%s" % (self._alias, attrname) diff --git a/src/nepi/resources/omf/node.py b/src/nepi/resources/omf/node.py index d4da2c1e..bd88c397 100644 --- a/src/nepi/resources/omf/node.py +++ b/src/nepi/resources/omf/node.py @@ -17,13 +17,15 @@ """ -from nepi.execution.resource import ResourceManager, clsinit +from nepi.execution.resource import ResourceManager, clsinit, ResourceState from nepi.execution.attribute import Attribute, Flags from nepi.resources.omf.omf_api import OMFAPIFactory import time +reschedule_delay = "0.5s" + @clsinit class OMFNode(ResourceManager): """ @@ -43,7 +45,6 @@ class OMFNode(ResourceManager): """ _rtype = "OMFNode" _authorized_connections = ["OMFApplication" , "OMFWifiInterface"] - _waiters = [] @classmethod def _register_attributes(cls): @@ -123,8 +124,9 @@ class OMFNode(ResourceManager): """Deploy the RM """ - self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), - self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) + if not self._omf_api : + self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), + self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) self._omf_api.enroll_host(self.get('hostname')) super(OMFNode, self).deploy() diff --git a/src/nepi/resources/omf/omf_api.py b/src/nepi/resources/omf/omf_api.py index df4fe034..04d78395 100644 --- a/src/nepi/resources/omf/omf_api.py +++ b/src/nepi/resources/omf/omf_api.py @@ -319,6 +319,7 @@ class OMFAPIFactory(object): key = cls._make_key(slice, host, port, password) cls.lock.acquire() if key in cls._apis: + #print "Api Counter : " + str(cls._apis[key]['cnt']) cls._apis[key]['cnt'] += 1 cls.lock.release() return cls._apis[key]['api'] diff --git a/src/nepi/resources/omf/omf_client.py b/src/nepi/resources/omf/omf_client.py index f20132d0..829603a3 100644 --- a/src/nepi/resources/omf/omf_client.py +++ b/src/nepi/resources/omf/omf_client.py @@ -102,7 +102,7 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger): self._registered = True except IqError as e: msg = " Could not register account: %s" % e.iq['error']['text'] - selferror(msg) + self.error(msg) except IqTimeout: msg = " No response from server." self.error(msg) @@ -131,7 +131,7 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger): result = self['xep_0060'].get_nodes(self._server) for item in result['disco_items']['items']: msg = ' - %s' % str(item) - self.info(msg) + self.debug(msg) return result except: error = traceback.format_exc() @@ -147,7 +147,7 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger): #self.boundjid.full) for node in result['node']: msg = ' - %s' % str(node) - self.info(msg) + self.debug(msg) return result except: error = traceback.format_exc() @@ -161,7 +161,8 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger): :type node: str """ - self.debug(" Create Topic : " + node) + msg = " Create Topic : " + node + self.info(msg) config = self['xep_0004'].makeForm('submit') config.add_field(var='pubsub#node_type', value='leaf') @@ -208,7 +209,7 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger): """ msg = " Publish to Topic : " + node - self.debug(msg) + self.info(msg) try: result = self['xep_0060'].publish(self._server,node,payload=data) # id = result['pubsub']['publish']['item']['id'] @@ -231,7 +232,7 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger): data) for item in result['pubsub']['items']['substanzas']: msg = 'Retrieved item %s: %s' % (item['id'], tostring(item['payload'])) - self.info(msg) + self.debug(msg) except: error = traceback.format_exc() msg = ' Could not retrieve item %s from topic %s\ntraceback:\n%s' \ @@ -248,7 +249,7 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger): try: result = self['xep_0060'].retract(self._server, self.boundjid, data) msg = ' Retracted item %s from topic %s' % (data, self.boundjid) - self.info(msg) + self.debug(msg) except: error = traceback.format_exc() msg = 'Could not retract item %s from topic %s\ntraceback:\n%s' \ @@ -262,7 +263,7 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger): try: result = self['xep_0060'].purge(self._server, self.boundjid) msg = ' Purged all items from topic %s' % self.boundjid - self.info(msg) + self.debug(msg) except: error = traceback.format_exc() msg = ' Could not purge items from topic %s\ntraceback:\n%s' \ @@ -280,8 +281,8 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger): result = self['xep_0060'].subscribe(self._server, node) msg = ' Subscribed %s to topic %s' \ % (self.boundjid.user, node) - self.info(msg) - #self.debug(msg) + #self.info(msg) + self.debug(msg) except: error = traceback.format_exc() msg = ' Could not subscribe %s to topic %s\ntraceback:\n%s' \ @@ -298,7 +299,7 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger): try: result = self['xep_0060'].unsubscribe(self._server, node) msg = ' Unsubscribed %s from topic %s' % (self.boundjid.bare, node) - self.info(msg) + self.debug(msg) except: error = traceback.format_exc() msg = ' Could not unsubscribe %s from topic %s\ntraceback:\n%s' \ diff --git a/test/resources/omf/vlc.py b/test/resources/omf/vlc.py index df6af6ce..e1ed3e88 100755 --- a/test/resources/omf/vlc.py +++ b/test/resources/omf/vlc.py @@ -21,10 +21,10 @@ from nepi.execution.resource import ResourceFactory, ResourceManager, ResourceAction, ResourceState from nepi.execution.ec import ExperimentController -from nepi.resources.omf.omf_node import OMFNode -from nepi.resources.omf.omf_application import OMFApplication -from nepi.resources.omf.omf_interface import OMFWifiInterface -from nepi.resources.omf.omf_channel import OMFChannel +from nepi.resources.omf.node import OMFNode +from nepi.resources.omf.application import OMFApplication +from nepi.resources.omf.interface import OMFWifiInterface +from nepi.resources.omf.channel import OMFChannel from nepi.resources.omf.omf_api import OMFAPIFactory from nepi.util import guid @@ -257,10 +257,10 @@ class OMFVLCTestCase(unittest.TestCase): self.ec.register_condition(app3, ResourceAction.START, app2, ResourceState.STARTED , "2s") self.ec.register_condition(app4, ResourceAction.START, app3, ResourceState.STARTED , "3s") self.ec.register_condition(app5, ResourceAction.START, [app3, app2], ResourceState.STARTED , "2s") - self.ec.register_condition(app5, ResourceAction.START, app1, ResourceState.STARTED , "1m20s") + self.ec.register_condition(app5, ResourceAction.START, app1, ResourceState.STARTED , "25s") self.ec.deploy() - time.sleep(150) + time.sleep(60) self.assertEquals(round(strfdiff(self.ec.get_resource(app2).start_time, self.ec.get_resource(app1).start_time),1), 3.0) self.assertEquals(round(strfdiff(self.ec.get_resource(app3).start_time, self.ec.get_resource(app2).start_time),1), 2.0) -- 2.47.0