From: Julien Tribino Date: Wed, 24 Apr 2013 13:05:31 +0000 (+0200) Subject: little changes X-Git-Tag: nepi-3.0.0~122^2~14 X-Git-Url: http://git.onelab.eu/?p=nepi.git;a=commitdiff_plain;h=b12a961a7225625c8112069e6e523e06cc4a181a little changes --- diff --git a/examples/automated_vlc_experiment_plexus.py b/examples/automated_vlc_experiment_plexus.py index 23375311..1cae09a6 100644 --- a/examples/automated_vlc_experiment_plexus.py +++ b/examples/automated_vlc_experiment_plexus.py @@ -111,18 +111,18 @@ ec.register_connection(app2, node2) # Condition # Topology behaviour : It should not be done by the user, but .... -ec.register_condition([iface1, iface2, channel], ResourceAction.START, [node1, node2], ResourceState.STARTED , 2) -ec.register_condition(channel, ResourceAction.START, [iface1, iface2], ResourceState.STARTED , 1) -ec.register_condition(app1, ResourceAction.START, channel, ResourceState.STARTED , 1) +#ec.register_condition([iface1, iface2, channel], ResourceAction.START, [node1, node2], ResourceState.STARTED , 2) +#ec.register_condition(channel, ResourceAction.START, [iface1, iface2], ResourceState.STARTED , 1) +#ec.register_condition(app1, ResourceAction.START, channel, ResourceState.STARTED , 1) # User Behaviour -ec.register_condition(app2, ResourceAction.START, app1, ResourceState.STARTED , 4) -ec.register_condition([app1, app2], ResourceAction.STOP, app2, ResourceState.STARTED , 20) -ec.register_condition(app3, ResourceAction.START, app2, ResourceState.STARTED , 25) +ec.register_condition(app2, ResourceAction.START, app1, ResourceState.STARTED , "4s") +ec.register_condition([app1, app2], ResourceAction.STOP, app2, ResourceState.STARTED , "20s") +ec.register_condition(app3, ResourceAction.START, app2, ResourceState.STARTED , "25s") # Deploy ec.deploy() # Stop Experiment -time.sleep(45) +time.sleep(50) ec.shutdown() diff --git a/src/neco/execution/resource.py b/src/neco/execution/resource.py index 8d0ce3e9..ad9a5c56 100644 --- a/src/neco/execution/resource.py +++ b/src/neco/execution/resource.py @@ -1,5 +1,5 @@ -from neco.util.timefuncs import strfnow, strfdiff, strfvalid +from neco.util.timefuncs import strfnow, strfdiff, strfvalid import copy import functools @@ -10,8 +10,9 @@ import time as TIME _reschedule_delay = "1s" class ResourceAction: - START = 0 - STOP = 1 + DEPLOYED = 0 + START = 1 + STOP = 2 class ResourceState: NEW = 0 @@ -31,6 +32,7 @@ class ResourceManager(object): _rtype = "Resource" _filters = None _attributes = None + _waiters = [] @classmethod def _register_filter(cls, attr): @@ -84,6 +86,10 @@ class ResourceManager(object): def rtype(cls): return cls._rtype + @classmethod + def waiters(cls): + return cls._waiters + @classmethod def get_filters(cls): """ Returns a copy of the filters @@ -146,6 +152,10 @@ class ResourceManager(object): def stop_time(self): return self._stop_time + @property + def deploy_time(self): + return self._deploy_time + @property def state(self): return self._state @@ -236,6 +246,9 @@ class ResourceManager(object): :param time: time to wait after the state :type time: str + .. note : time should be written like "2s" or "3m" with s for seconds, m for minutes, h for hours, ... + If for example, you need to wait 2min 30sec, time could be "150s" or "2.5m". + For the moment, 2m30s is not a correct syntax. """ reschedule = False @@ -251,7 +264,9 @@ class ResourceManager(object): break if time: - if state == ResourceState.STARTED: + if state == ResourceState.DEPLOYED: + t = rm.deploy_time + elif state == ResourceState.STARTED: t = rm.start_time elif state == ResourceState.STOPPED: t = rm.stop_time @@ -259,10 +274,12 @@ class ResourceManager(object): # Only keep time information for START and STOP break - d = strfdiff(strfnow(), t) - if d < time: + d = strfdiff(strfnow(), t) + #print "This is the value of d : " + str(d) + " // With the value of t : " + str(t) + " // With the value of time : " + str(time) + wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s")) + if wait > 0.001: reschedule = True - delay = "%ds" % (int(time - d) +1) + delay = "%fs" % wait break return reschedule, delay @@ -318,7 +335,7 @@ class ResourceManager(object): print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----- start condition : " + str(self.conditions.items()) # Need to separate because it could have more that tuple of condition # for the same action. - conditions_start = self.conditions.get(ResourceAction.START, []): + conditions_start = self.conditions.get(ResourceAction.START, []) for (group, state, time) in conditions_start: reschedule, delay = self._needs_reschedule(group, state, time) if reschedule: @@ -346,7 +363,7 @@ class ResourceManager(object): if self.state != ResourceState.STARTED: reschedule = True else: - #print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + "\ + print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + "\ (Guid : "+ str(self.guid) +") ---- stop condition : " + str(self.conditions.items()) conditions_stop = self.conditions.get(ResourceAction.STOP, []) for (group, state, time) in conditions_stop: @@ -357,16 +374,66 @@ class ResourceManager(object): callback = functools.partial(self.stop_with_conditions) self.ec.schedule(delay, callback) else: + print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----\ +------------------------------------------------------------------------------\ +---------------------------------------------------------------- STOPPING -- " self.stop() def deploy(self): """Execute all the differents steps required to reach the state DEPLOYED """ + self.deploy_restriction() self.discover() self.provision() + self.deploy_with_conditions() + + def deploy_restriction(self): + dep = set() + for guid in self.connections: + if self.ec.get_resource(guid).rtype() in self.__class__._waiters: + dep.add(guid) + self.register_condition(ResourceAction.DEPLOYED, dep, ResourceState.DEPLOYED) + + + def deploy_with_conditions(self): + """ Starts when all the conditions are reached + + """ + reschedule = False + delay = _reschedule_delay + + ## evaluate if set conditions are met + + # only can deploy when RM is NEW + if not self._state in [ResourceState.NEW]: + self.logger.error("Wrong state %s for stop" % self.state) + return + else: + print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----- deploy condition : " + str(self.conditions.items()) + # Need to separate because it could have more that tuple of condition + # for the same action. + conditions_deployed = self.conditions.get(ResourceAction.DEPLOYED, []) + for (group, state, time) in conditions_deployed: + reschedule, delay = self._needs_reschedule(group, state, time) + if reschedule: + break + + if reschedule: + callback = functools.partial(self.deploy_with_conditions) + self.ec.schedule(delay, callback) + else: + print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----\ +------------------------------------------------------------------------------\ +---------------------------------------------------------------- DEPLOY -- " + self.deploy_action() + + def deploy_action(self): + + self._deploy_time = strfnow() self._state = ResourceState.DEPLOYED + def release(self): """Clean the resource at the end of the Experiment and change the status diff --git a/src/neco/resources/omf/omf_application.py b/src/neco/resources/omf/omf_application.py index 1e64974b..8653fa3b 100644 --- a/src/neco/resources/omf/omf_application.py +++ b/src/neco/resources/omf/omf_application.py @@ -26,6 +26,7 @@ class OMFApplication(ResourceManager): """ _rtype = "OMFApplication" _authorized_connections = ["OMFNode"] + _waiters = ["OMFNode", "OMFChannel", "OMFWifiInterface"] @classmethod def _register_attributes(cls): @@ -109,13 +110,13 @@ class OMFApplication(ResourceManager): return rm return None - def deploy(self): + def deploy_action(self): """Deploy the RM """ - super(OMFApplication, self).deploy() self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) + super(OMFApplication, self).deploy_action() def start(self): """Send Xmpp Message Using OMF protocol to execute the application diff --git a/src/neco/resources/omf/omf_channel.py b/src/neco/resources/omf/omf_channel.py index 192de276..111af67a 100644 --- a/src/neco/resources/omf/omf_channel.py +++ b/src/neco/resources/omf/omf_channel.py @@ -25,7 +25,9 @@ class OMFChannel(ResourceManager): """ _rtype = "OMFChannel" - _authorized_connections = ["OMFWifiInterface"] + _authorized_connections = ["OMFWifiInterface", "OMFNode"] + _waiters = ["OMFNode", "OMFWifiInterface"] + @classmethod def _register_attributes(cls): @@ -89,7 +91,7 @@ class OMFChannel(ResourceManager): """ for elt in conn_set: rm_iface = self.ec.get_resource(elt) - for conn in rm_iface._connections: + for conn in rm_iface.connections: rm_node = self.ec.get_resource(conn) if rm_node.rtype() == "OMFNode": couple = [rm_node.get('hostname'), rm_iface.get('alias')] @@ -97,14 +99,25 @@ class OMFChannel(ResourceManager): self._nodes_guid.append(couple) return self._nodes_guid - def deploy(self): + def deploy_action(self): """Deploy the RM """ - super(OMFChannel, self).deploy() self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) + if self.get('channel'): + set_nodes = self._get_target(self._connections) + print set_nodes + for couple in set_nodes: + #print "Couple node/alias : " + couple[0] + " , " + couple[1] + attrval = self.get('channel') + attrname = "net/%s/%s" % (couple[1], 'channel') + #print "Send the configure message" + self._omf_api.configure(couple[0], attrname, attrval) + + super(OMFChannel, self).deploy_action() + def discover(self): """ Discover the availables channels @@ -121,15 +134,7 @@ class OMFChannel(ResourceManager): """Send Xmpp Message Using OMF protocol to configure Channel """ - if self.get('channel'): - set_nodes = self._get_target(self._connections) - print set_nodes - for couple in set_nodes: - #print "Couple node/alias : " + couple[0] + " , " + couple[1] - attrval = self.get('channel') - attrname = "net/%s/%s" % (couple[1], 'channel') - #print "Send the configure message" - self._omf_api.configure(couple[0], attrname, attrval) + super(OMFChannel, self).start() def stop(self): diff --git a/src/neco/resources/omf/omf_interface.py b/src/neco/resources/omf/omf_interface.py index bbfc0452..d4366352 100644 --- a/src/neco/resources/omf/omf_interface.py +++ b/src/neco/resources/omf/omf_interface.py @@ -26,6 +26,7 @@ class OMFWifiInterface(ResourceManager): """ _rtype = "OMFWifiInterface" _authorized_connections = ["OMFNode" , "OMFChannel"] + _waiters = ["OMFNode"] #alias2name = dict({'w0':'wlan0', 'w1':'wlan1'}) @@ -102,19 +103,13 @@ class OMFWifiInterface(ResourceManager): return rm return None - def deploy(self): + def deploy_action(self): """Deploy the RM """ - super(OMFWifiInterface, self).deploy() self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) - - def start(self): - """Send Xmpp Messages Using OMF protocol to configure Interface - - """ self._logger.debug(" " + self.rtype() + " ( Guid : " + str(self._guid) +") : " + self.get('mode') + " : " + self.get('type') + " : " + self.get('essid') + " : " + self.get('ip')) @@ -126,6 +121,15 @@ class OMFWifiInterface(ResourceManager): attrname = "net/%s/%s" % (self._alias, attrname) #print "Send the configure message" self._omf_api.configure(rm_node.get('hostname'), attrname, attrval) + + super(OMFWifiInterface, self).deploy_action() + + + def start(self): + """Send Xmpp Messages Using OMF protocol to configure Interface + + """ + super(OMFWifiInterface, self).start() def stop(self): diff --git a/src/neco/resources/omf/omf_node.py b/src/neco/resources/omf/omf_node.py index 5bcdcdf1..d5025a00 100644 --- a/src/neco/resources/omf/omf_node.py +++ b/src/neco/resources/omf/omf_node.py @@ -6,6 +6,7 @@ from neco.resources.omf.omf_api import OMFAPIFactory import neco import logging +import time @clsinit class OMFNode(ResourceManager): @@ -26,6 +27,7 @@ class OMFNode(ResourceManager): """ _rtype = "OMFNode" _authorized_connections = ["OMFApplication" , "OMFWifiInterface"] + _waiters = [] @classmethod def _register_attributes(cls): @@ -99,13 +101,15 @@ class OMFNode(ResourceManager): (self.rtype(), self._guid, rm.rtype(), guid)) return False - def deploy(self): + def deploy_action(self): """Deploy the RM """ self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) - super(OMFNode, self).deploy() + self._omf_api.enroll_host(self.get('hostname')) + + super(OMFNode, self).deploy_action() def discover(self): """ Discover the availables nodes @@ -124,7 +128,7 @@ class OMFNode(ResourceManager): """ super(OMFNode, self).start() - self._omf_api.enroll_host(self.get('hostname')) + def stop(self): """Send Xmpp Message Using OMF protocol to disconnect the node diff --git a/src/neco/util/timefuncs.py b/src/neco/util/timefuncs.py index b62c6406..0cf4fcd3 100644 --- a/src/neco/util/timefuncs.py +++ b/src/neco/util/timefuncs.py @@ -24,7 +24,12 @@ def strfdiff(str1, str2): return (ret or 0.001) def strfvalid(date): - """ User defined date to scheduler date """ + """ User defined date to scheduler date + + :param date : user define date matchin the pattern _strf + :type date : date + + """ if not date: return strfnow() if _reabs.match(date): diff --git a/test/resources/omf/omf_vlc_exp.py b/test/resources/omf/omf_vlc_exp.py index 9c990c4c..5c375473 100755 --- a/test/resources/omf/omf_vlc_exp.py +++ b/test/resources/omf/omf_vlc_exp.py @@ -9,6 +9,7 @@ from neco.resources.omf.omf_channel import OMFChannel from neco.resources.omf.omf_api import OMFAPIFactory from neco.util import guid +from neco.util.timefuncs import * import time import unittest @@ -165,26 +166,93 @@ class OMFVLCTestCase(unittest.TestCase): self.ec.register_connection(node1, iface1) self.ec.register_connection(iface1, channel) - # For the moment - self.ec.register_condition([iface1, channel], ResourceAction.START, node1, ResourceState.STARTED , 2) - self.ec.register_condition(channel, ResourceAction.START, iface1, ResourceState.STARTED , 1) - self.ec.register_condition(app1, ResourceAction.START, channel, ResourceState.STARTED , 1) + self.ec.register_condition(app2, ResourceAction.START, app1, ResourceState.STARTED , "4s") - # Real test - self.ec.register_condition(app2, ResourceAction.START, app1, ResourceState.STARTED , 4) + self.assertEquals(len(self.ec.get_resource(app2).conditions), 1) - self.assertEquals(len(self.ec.get_resource(node1).conditions), 0) - self.assertEquals(len(self.ec.get_resource(iface1).conditions), 1) - self.assertEquals(len(self.ec.get_resource(channel).conditions), 1) - self.assertEquals(len(self.ec.get_resource(app1).conditions), 1) + def test_deploy(self): + node1 = self.ec.register_resource("OMFNode") + self.ec.set(node1, 'hostname', 'omf.plexus.wlab17') + self.ec.set(node1, 'xmppSlice', "nepi") + self.ec.set(node1, 'xmppHost', "xmpp-plexus.onelab.eu") + self.ec.set(node1, 'xmppPort', "5222") + self.ec.set(node1, 'xmppPassword', "1234") + + iface1 = self.ec.register_resource("OMFWifiInterface") + self.ec.set(iface1, 'alias', "w0") + self.ec.set(iface1, 'mode', "adhoc") + self.ec.set(iface1, 'type', "g") + self.ec.set(iface1, 'essid', "vlcexp") + self.ec.set(iface1, 'ip', "10.0.0.17") + self.ec.set(iface1, 'xmppSlice', "nepi") + self.ec.set(iface1, 'xmppHost', "xmpp-plexus.onelab.eu") + self.ec.set(iface1, 'xmppPort', "5222") + self.ec.set(iface1, 'xmppPassword', "1234") + + channel = self.ec.register_resource("OMFChannel") + self.ec.set(channel, 'channel', "6") + self.ec.set(channel, 'xmppSlice', "nepi") + self.ec.set(channel, 'xmppHost', "xmpp-plexus.onelab.eu") + self.ec.set(channel, 'xmppPort', "5222") + self.ec.set(channel, 'xmppPassword', "1234") + + app1 = self.ec.register_resource("OMFApplication") + self.ec.set(app1, 'xmppSlice', "nepi") + self.ec.set(app1, 'xmppHost', "xmpp-plexus.onelab.eu") + self.ec.set(app1, 'xmppPort', "5222") + self.ec.set(app1, 'xmppPassword', "1234") + + app2 = self.ec.register_resource("OMFApplication") + self.ec.set(app2, 'xmppSlice', "nepi") + self.ec.set(app2, 'xmppHost', "xmpp-plexus.onelab.eu") + self.ec.set(app2, 'xmppPort', "5222") + self.ec.set(app2, 'xmppPassword', "1234") + + app3 = self.ec.register_resource("OMFApplication") + self.ec.set(app3, 'xmppSlice', "nepi") + self.ec.set(app3, 'xmppHost', "xmpp-plexus.onelab.eu") + self.ec.set(app3, 'xmppPort', "5222") + self.ec.set(app3, 'xmppPassword', "1234") + + app4 = self.ec.register_resource("OMFApplication") + self.ec.set(app4, 'xmppSlice', "nepi") + self.ec.set(app4, 'xmppHost', "xmpp-plexus.onelab.eu") + self.ec.set(app4, 'xmppPort', "5222") + self.ec.set(app4, 'xmppPassword', "1234") + + app5 = self.ec.register_resource("OMFApplication") + self.ec.set(app5, 'xmppSlice', "nepi") + self.ec.set(app5, 'xmppHost', "xmpp-plexus.onelab.eu") + self.ec.set(app5, 'xmppPort', "5222") + self.ec.set(app5, 'xmppPassword', "1234") + + self.ec.register_connection(app1, node1) + self.ec.register_connection(app2, node1) + self.ec.register_connection(app3, node1) + self.ec.register_connection(app4, node1) + self.ec.register_connection(app5, node1) + self.ec.register_connection(node1, iface1) + self.ec.register_connection(iface1, channel) + + self.ec.register_condition(app2, ResourceAction.START, app1, ResourceState.STARTED , "3s") + self.ec.register_condition(app3, ResourceAction.START, app2, ResourceState.STARTED , "2s") + self.ec.register_condition(app4, ResourceAction.START, app3, ResourceState.STARTED , "3s") + self.ec.register_condition(app5, ResourceAction.START, [app3, app2], ResourceState.STARTED , "2s") + self.ec.register_condition(app5, ResourceAction.START, app1, ResourceState.STARTED , "1m20s") + self.ec.deploy() + time.sleep(150) - def xtest_deploy(self): - ec.deploy() + self.assertEquals(round(strfdiff(self.ec.get_resource(app2).start_time, self.ec.get_resource(app1).start_time),1), 3.0) + self.assertEquals(round(strfdiff(self.ec.get_resource(app3).start_time, self.ec.get_resource(app2).start_time),1), 2.0) + self.assertEquals(round(strfdiff(self.ec.get_resource(app4).start_time, self.ec.get_resource(app3).start_time),1), 3.0) + self.assertEquals(round(strfdiff(self.ec.get_resource(app5).start_time, self.ec.get_resource(app3).start_time),1), 2.0) + self.assertEquals(round(strfdiff(self.ec.get_resource(app5).start_time, self.ec.get_resource(app1).start_time),1), 7.0) + # Precision is at 1/10. So this one returns an error 7.03 != 7.0 + #self.assertEquals(strfdiff(self.ec.get_resource(app5).start_time, self.ec.get_resource(app1).start_time), 7) #In order to release everythings - time.sleep(45) - ec.shutdown() + time.sleep(5) if __name__ == '__main__':