From 8f6fe89985578231158f590990794be52367c4e7 Mon Sep 17 00:00:00 2001 From: Julien Tribino Date: Mon, 22 Apr 2013 12:37:17 +0200 Subject: [PATCH] Debug the deploy part and start with condition --- examples/automated_vlc_experiment_plexus.py | 128 ++++++++++ examples/manual_vlc_experiment_plexus.py | 170 +++++++++++++ src/neco/execution/ec.py | 35 ++- src/neco/execution/resource.py | 173 +++++++++++-- src/neco/resources/linux/application.py | 2 +- src/neco/resources/omf/omf_api.py | 125 ++++++---- src/neco/resources/omf/omf_application.py | 46 ++-- src/neco/resources/omf/omf_channel.py | 66 ++--- src/neco/resources/omf/omf_client.py | 158 ++++++++++-- src/neco/resources/omf/omf_interface.py | 62 +++-- src/neco/resources/omf/omf_messages_5_4.py | 260 +++++++++++++------- src/neco/resources/omf/omf_node.py | 57 +++-- test/resources/omf/omf_vlc_exp.py | 250 +++++++++++-------- 13 files changed, 1135 insertions(+), 397 deletions(-) create mode 100644 examples/automated_vlc_experiment_plexus.py create mode 100644 examples/manual_vlc_experiment_plexus.py diff --git a/examples/automated_vlc_experiment_plexus.py b/examples/automated_vlc_experiment_plexus.py new file mode 100644 index 00000000..23375311 --- /dev/null +++ b/examples/automated_vlc_experiment_plexus.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python +from neco.execution.resource import ResourceFactory, ResourceAction, ResourceState +from neco.execution.ec import ExperimentController + +from neco.resources.omf.omf_node import OMFNode +from neco.resources.omf.omf_application import OMFApplication +from neco.resources.omf.omf_interface import OMFWifiInterface +from neco.resources.omf.omf_channel import OMFChannel + +import logging +import time + +logging.basicConfig() + +# Create the EC +ec = ExperimentController() + +# Register the different RM that will be used +ResourceFactory.register_type(OMFNode) +ResourceFactory.register_type(OMFWifiInterface) +ResourceFactory.register_type(OMFChannel) +ResourceFactory.register_type(OMFApplication) + +# Create and Configure the Nodes +node1 = ec.register_resource("OMFNode") +ec.set(node1, 'hostname', 'omf.plexus.wlab17') +ec.set(node1, 'xmppSlice', "nepi") +ec.set(node1, 'xmppHost', "xmpp-plexus.onelab.eu") +ec.set(node1, 'xmppPort', "5222") +ec.set(node1, 'xmppPassword', "1234") + +node2 = ec.register_resource("OMFNode") +ec.set(node2, 'hostname', "omf.plexus.wlab37") +ec.set(node2, 'xmppSlice', "nepi") +ec.set(node2, 'xmppHost', "xmpp-plexus.onelab.eu") +ec.set(node2, 'xmppPort', "5222") +ec.set(node2, 'xmppPassword', "1234") + +# Create and Configure the Interfaces +iface1 = ec.register_resource("OMFWifiInterface") +ec.set(iface1, 'alias', "w0") +ec.set(iface1, 'mode', "adhoc") +ec.set(iface1, 'type', "g") +ec.set(iface1, 'essid', "vlcexp") +#ec.set(iface1, 'ap', "11:22:33:44:55:66") +ec.set(iface1, 'ip', "10.0.0.17") +ec.set(iface1, 'xmppSlice', "nepi") +ec.set(iface1, 'xmppHost', "xmpp-plexus.onelab.eu") +ec.set(iface1, 'xmppPort', "5222") +ec.set(iface1, 'xmppPassword', "1234") + +iface2 = ec.register_resource("OMFWifiInterface") +ec.set(iface2, 'alias', "w0") +ec.set(iface2, 'mode', "adhoc") +ec.set(iface2, 'type', 'g') +ec.set(iface2, 'essid', "vlcexp") +#ec.set(iface2, 'ap', "11:22:33:44:55:66") +ec.set(iface2, 'ip', "10.0.0.37") +ec.set(iface2, 'xmppSlice', "nepi") +ec.set(iface2, 'xmppHost', "xmpp-plexus.onelab.eu") +ec.set(iface2, 'xmppPort', "5222") +ec.set(iface2, 'xmppPassword', "1234") + +# Create and Configure the Channel +channel = ec.register_resource("OMFChannel") +ec.set(channel, 'channel', "6") +ec.set(channel, 'xmppSlice', "nepi") +ec.set(channel, 'xmppHost', "xmpp-plexus.onelab.eu") +ec.set(channel, 'xmppPort', "5222") +ec.set(channel, 'xmppPassword', "1234") + +# Create and Configure the Application +app1 = ec.register_resource("OMFApplication") +ec.set(app1, 'appid', 'Vlc#1') +ec.set(app1, 'path', "/opt/vlc-1.1.13/cvlc") +ec.set(app1, 'args', "/opt/10-by-p0d.avi --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts}'") +ec.set(app1, 'env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority") +ec.set(app1, 'xmppSlice', "nepi") +ec.set(app1, 'xmppHost', "xmpp-plexus.onelab.eu") +ec.set(app1, 'xmppPort', "5222") +ec.set(app1, 'xmppPassword', "1234") + +app2 = ec.register_resource("OMFApplication") +ec.set(app2, 'appid', 'Vlc#2') +ec.set(app2, 'path', "/opt/vlc-1.1.13/cvlc") +ec.set(app2, 'args', "rtp://10.0.0.37:1234") +ec.set(app2, 'env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority") +ec.set(app2, 'xmppSlice', "nepi") +ec.set(app2, 'xmppHost', "xmpp-plexus.onelab.eu") +ec.set(app2, 'xmppPort', "5222") +ec.set(app2, 'xmppPassword', "1234") + +app3 = ec.register_resource("OMFApplication") +ec.set(app3, 'appid', 'Kill#2') +ec.set(app3, 'path', "/usr/bin/killall") +ec.set(app3, 'args', "vlc") +ec.set(app3, 'env', " ") +ec.set(app3, 'xmppSlice', "nepi") +ec.set(app3, 'xmppHost', "xmpp-plexus.onelab.eu") +ec.set(app3, 'xmppPort', "5222") +ec.set(app3, 'xmppPassword', "1234") + +# Connection +ec.register_connection(app3, node1) +ec.register_connection(app1, node1) +ec.register_connection(node1, iface1) +ec.register_connection(iface1, channel) +ec.register_connection(iface2, channel) +ec.register_connection(node2, iface2) +ec.register_connection(app2, node2) + +# Condition +# Topology behaviour : It should not be done by the user, but .... +ec.register_condition([iface1, iface2, channel], ResourceAction.START, [node1, node2], ResourceState.STARTED , 2) +ec.register_condition(channel, ResourceAction.START, [iface1, iface2], ResourceState.STARTED , 1) +ec.register_condition(app1, ResourceAction.START, channel, ResourceState.STARTED , 1) + +# User Behaviour +ec.register_condition(app2, ResourceAction.START, app1, ResourceState.STARTED , 4) +ec.register_condition([app1, app2], ResourceAction.STOP, app2, ResourceState.STARTED , 20) +ec.register_condition(app3, ResourceAction.START, app2, ResourceState.STARTED , 25) + +# Deploy +ec.deploy() + +# Stop Experiment +time.sleep(45) +ec.shutdown() diff --git a/examples/manual_vlc_experiment_plexus.py b/examples/manual_vlc_experiment_plexus.py new file mode 100644 index 00000000..ebdd8e11 --- /dev/null +++ b/examples/manual_vlc_experiment_plexus.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python +from neco.execution.resource import ResourceFactory +from neco.execution.ec import ExperimentController + +from neco.resources.omf.omf_node import OMFNode +from neco.resources.omf.omf_application import OMFApplication +from neco.resources.omf.omf_interface import OMFWifiInterface +from neco.resources.omf.omf_channel import OMFChannel + +import logging +import time + +logging.basicConfig() + +# Create the EC +ec = ExperimentController() + +# Register the different RM that will be used +ResourceFactory.register_type(OMFNode) +ResourceFactory.register_type(OMFWifiInterface) +ResourceFactory.register_type(OMFChannel) +ResourceFactory.register_type(OMFApplication) + +# Create and Configure the Nodes +guid = ec.register_resource("OMFNode") +node1 = ec.get_resource(guid) +node1.set('hostname', 'omf.plexus.wlab17') +node1.set('xmppSlice', "nepi") +node1.set('xmppHost', "xmpp-plexus.onelab.eu") +node1.set('xmppPort', "5222") +node1.set('xmppPassword', "1234") + +guid = ec.register_resource("OMFNode") +node2 = ec.get_resource(guid) +node2.set('hostname', "omf.plexus.wlab37") +node2.set('xmppSlice', "nepi") +node2.set('xmppHost', "xmpp-plexus.onelab.eu") +node2.set('xmppPort', "5222") +node2.set('xmppPassword', "1234") + +# Create and Configure the Interfaces +guid = ec.register_resource("OMFWifiInterface") +iface1 = ec.get_resource(guid) +iface1.set('alias', "w0") +iface1.set('mode', "adhoc") +iface1.set('type', "g") +iface1.set('essid', "helloworld") +iface1.set('ip', "10.0.0.17") +iface1.set('xmppSlice', "nepi") +iface1.set('xmppHost', "xmpp-plexus.onelab.eu") +iface1.set('xmppPort', "5222") +iface1.set('xmppPassword', "1234") + +guid = ec.register_resource("OMFWifiInterface") +iface2 = ec.get_resource(guid) +iface2.set('alias', "w0") +iface2.set('mode', "adhoc") +iface2.set('type', 'g') +iface2.set('essid', "helloworld") +iface2.set('ip', "10.0.0.37") +iface2.set('xmppSlice', "nepi") +iface2.set('xmppHost', "xmpp-plexus.onelab.eu") +iface2.set('xmppPort', "5222") +iface2.set('xmppPassword', "1234") + +# Create and Configure the Channel +guid = ec.register_resource("OMFChannel") +channel = ec.get_resource(guid) +channel.set('channel', "6") +channel.set('xmppSlice', "nepi") +channel.set('xmppHost', "xmpp-plexus.onelab.eu") +channel.set('xmppPort', "5222") +channel.set('xmppPassword', "1234") + +# Create and Configure the Application +guid = ec.register_resource("OMFApplication") +app1 = ec.get_resource(guid) +app1.set('appid', 'Vlc#1') +app1.set('path', "/opt/vlc-1.1.13/cvlc") +app1.set('args', "/opt/10-by-p0d.avi --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts}'") +app1.set('env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority") +app1.set('xmppSlice', "nepi") +app1.set('xmppHost', "xmpp-plexus.onelab.eu") +app1.set('xmppPort', "5222") +app1.set('xmppPassword', "1234") + +guid = ec.register_resource("OMFApplication") +app2 = ec.get_resource(guid) +app2.set('appid', 'Vlc#2') +app2.set('path', "/opt/vlc-1.1.13/cvlc") +app2.set('args', "rtp://10.0.0.37:1234") +app2.set('env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority") +app2.set('xmppSlice', "nepi") +app2.set('xmppHost', "xmpp-plexus.onelab.eu") +app2.set('xmppPort', "5222") +app2.set('xmppPassword', "1234") + +guid = ec.register_resource("OMFApplication") +app3 = ec.get_resource(guid) +app3.set('appid', 'Kill#2') +app3.set('path', "/usr/bin/killall") +app3.set('args', "vlc") +app3.set('env', " ") +app3.set('xmppSlice', "nepi") +app3.set('xmppHost', "xmpp-plexus.onelab.eu") +app3.set('xmppPort', "5222") +app3.set('xmppPassword', "1234") + +# Connection +app3.connect(node1.guid) +node1.connect(app3.guid) + +app1.connect(node1.guid) +node1.connect(app1.guid) + +node1.connect(iface1.guid) +iface1.connect(node1.guid) + +iface1.connect(channel.guid) +channel.connect(iface1.guid) + +channel.connect(iface2.guid) +iface2.connect(channel.guid) + +iface2.connect(node2.guid) +node2.connect(iface2.guid) + +node2.connect(app2.guid) +app2.connect(node2.guid) + +# Local Deploy +node1.deploy() +node2.deploy() +iface1.deploy() +iface2.deploy() +channel.deploy() +app1.deploy() +app2.deploy() +app3.deploy() + +# Start the Nodes +node1.start() +node2.start() +time.sleep(2) + +# Start the Interfaces +iface1.start() +iface2.start() + +# Start the Channel +time.sleep(2) +channel.start() +time.sleep(2) + +# Start the Application +app1.start() +time.sleep(2) +app2.start() + +time.sleep(20) + +# Stop the Application +app1.stop() +app2.stop() +time.sleep(1) +app3.start() +time.sleep(2) + +# Stop Experiment +ec.shutdown() diff --git a/src/neco/execution/ec.py b/src/neco/execution/ec.py index 1b7d4be8..a7a33d83 100644 --- a/src/neco/execution/ec.py +++ b/src/neco/execution/ec.py @@ -25,6 +25,9 @@ class ExperimentController(object): # Resource managers self._resources = dict() + # Resource managers + self._group = dict() + # Scheduler self._scheduler = HeapScheduler() @@ -51,18 +54,28 @@ class ExperimentController(object): def resources(self): return self._resources.keys() - def register_resource(self, rtype, guid = None, creds = None): + def register_resource(self, rtype, guid = None): # Get next available guid guid = self._guid_generator.next(guid) # Instantiate RM - rm = ResourceFactory.create(rtype, self, guid, creds) + rm = ResourceFactory.create(rtype, self, guid) # Store RM self._resources[guid] = rm return guid + def create_group(self, *args): + guid = self._guid_generator.next(guid) + + grp = [arg for arg in args] + + self._resources[guid] = grp + + return guid + + def get_attributes(self, guid): rm = self.get_resource(guid) return rm.get_attributes() @@ -100,12 +113,12 @@ class ExperimentController(object): """ if isinstance(group1, int): - group1 = list[group1] + group1 = [group1] if isinstance(group2, int): - group2 = list[group2] + group2 = [group2] for guid1 in group1: - rm = self.get_resource(guid) + rm = self.get_resource(guid1) rm.register_condition(action, group2, state, time) def discover(self, guid, filters): @@ -165,9 +178,9 @@ class ExperimentController(object): """ if isinstance(group1, int): - group1 = list[group1] + group1 = [group1] if isinstance(group2, int): - group2 = list[group2] + group2 = [group2] for guid1 in group1: rm = self.get_resource(guid) @@ -181,13 +194,13 @@ class ExperimentController(object): rm = self.get_resource(guid) return rm.start_with_condition() - def deploy(self, group = None, wait_all_ready = True): + def deploy(self, group = None, wait_all_deployed = True): """ Deploy all resource manager in group :param group: List of guids of RMs to deploy :type group: list - :param wait_all_ready: Wait until all RMs are deployed in + :param wait_all_deployed: Wait until all RMs are deployed in order to start the RMs :type guid: int @@ -208,13 +221,13 @@ class ExperimentController(object): for guid in group: rm = self.get_resource(guid) - if wait_all_ready: + if wait_all_deployed: towait = list(group) towait.remove(guid) self.register_condition(guid, ResourceAction.START, towait, ResourceState.DEPLOYED) - thread = threading.Thread(target = steps, args = (rm)) + thread = threading.Thread(target = steps, args = (rm,)) threads.append(thread) thread.start() diff --git a/src/neco/execution/resource.py b/src/neco/execution/resource.py index deaa063e..ce775684 100644 --- a/src/neco/execution/resource.py +++ b/src/neco/execution/resource.py @@ -5,6 +5,7 @@ import copy import functools import logging import weakref +import time as TIME _reschedule_delay = "1s" @@ -34,29 +35,43 @@ class ResourceManager(object): @classmethod def _register_filter(cls, attr): """ Resource subclasses will invoke this method to add a - filter attribute""" + filter attribute + + """ cls._filters[attr.name] = attr @classmethod def _register_attribute(cls, attr): """ Resource subclasses will invoke this method to add a - resource attribute""" + resource attribute + + """ cls._attributes[attr.name] = attr @classmethod def _register_filters(cls): """ Resource subclasses will invoke this method to add a - filter attribute""" + filter attribute + + """ pass @classmethod def _register_attributes(cls): """ Resource subclasses will invoke this method to add a - resource attribute""" + resource attribute + + """ pass @classmethod def _clsinit(cls): + """ Create a new dictionnary instance of the dictionnary + with the same template. + + Each ressource should have the same registration dictionary + template with different instances. + """ # static template for resource filters cls._filters = dict() cls._register_filters() @@ -71,10 +86,16 @@ class ResourceManager(object): @classmethod def get_filters(cls): + """ Returns a copy of the filters + + """ return copy.deepcopy(cls._filters.values()) @classmethod def get_attributes(cls): + """ Returns a copy of the attributes + + """ return copy.deepcopy(cls._attributes.values()) def __init__(self, ec, guid): @@ -113,7 +134,7 @@ class ResourceManager(object): return self._connections @property - def conditons(self): + def conditions(self): return self._conditions @property @@ -140,6 +161,9 @@ class ResourceManager(object): pass def start(self): + """ Start the Resource Manager + + """ if not self._state in [ResourceState.DEPLOYED, ResourceState.STOPPED]: self.logger.error("Wrong state %s for start" % self.state) @@ -147,6 +171,9 @@ class ResourceManager(object): self._state = ResourceState.STARTED def stop(self): + """ Start the Resource Manager + + """ if not self._state in [ResourceState.STARTED]: self.logger.error("Wrong state %s for stop" % self.state) @@ -154,21 +181,63 @@ class ResourceManager(object): self._state = ResourceState.STOPPED def set(self, name, value): + """ Set the value of the attribute + + :param name: Name of the attribute + :type name: str + :param name: Value of the attribute + :type name: str + :rtype: Boolean + """ attr = self._attrs[name] attr.value = value def get(self, name): + """ Start the Resource Manager + + :param name: Name of the attribute + :type name: str + :rtype: str + """ attr = self._attrs[name] return attr.value def register_condition(self, action, group, state, time = None): + """ Do the 'action' after 'time' on the current RM when 'group' + reach the state 'state' + + :param action: Action to do. Either 'START' or 'STOP' + :type action: str + :param group: group of RM + :type group: str + :param state: RM that are part of the condition + :type state: list + :param time: Time to wait after the state is reached (ex : '2s' ) + :type time: str + + """ if action not in self.conditions: self._conditions[action] = set() - self.conditions.get(action).add((group, state, time)) + # We need to use only sequence inside a set and not a list. + # As group is a list, we need to change it. + #print (tuple(group), state, time) + self.conditions.get(action).add((tuple(group), state, time)) def _needs_reschedule(self, group, state, time): + """ Internal method that verify if 'time' has elapsed since + all elements in 'group' have reached state 'state'. + + :param group: RM that are part of the condition + :type group: list + :param state: State that group need to reach for the condtion + :type state: str + :param time: time to wait after the state + :type time: str + + + """ reschedule = False delay = _reschedule_delay @@ -182,29 +251,46 @@ class ResourceManager(object): break if time: - if state == ResourceAction.START: + if state == ResourceState.STARTED: t = rm.start_time - elif state == ResourceAction.STOP: + elif state == ResourceState.STOPPED: t = rm.stop_time else: # Only keep time information for START and STOP break - delay = strfdiff(t, strnow()) - if delay < time: + d = strfdiff(strfnow(), t) + if d < time: reschedule = True + delay = "%ds" % (int(time - d) +1) break - return reschedule, delay def set_with_conditions(self, name, value, group, state, time): + """ Set value 'value' on attribute with name 'name' when 'time' + has elapsed since all elements in 'group' have reached state + 'state'. + + :param name: Name of the attribute + :type name: str + :param name: Value of the attribute + :type name: str + :param group: RM that are part of the condition + :type group: list + :param state: State that group need to reach before set + :type state: str + :param time: Time to wait after the state is reached (ex : '2s' ) + :type time: str + + """ + reschedule = False delay = _reschedule_delay ## evaluate if set conditions are met # only can set with conditions after the RM is started - if self.status != ResourceStatus.STARTED: + if self.state != ResourceState.STARTED: reschedule = True else: reschedule, delay = self._needs_reschedule(group, state, time) @@ -217,60 +303,93 @@ class ResourceManager(object): self.set(name, value) def start_with_conditions(self): + """ Starts when all the conditions are reached + + """ reschedule = False delay = _reschedule_delay ## evaluate if set conditions are met # only can start when RM is either STOPPED or DEPLOYED - if self.status not in [ResourceStatus.STOPPED, ResourceStatus.DEPLOYED]: + if self.state not in [ResourceState.STOPPED, ResourceState.DEPLOYED]: reschedule = True else: - for action, (group, state, time) in self.conditions.iteritems(): - if action == ResourceAction.START: - reschedule, delay = self._needs_reschedule(group, state, time) + print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----- start condition : " + str(self.conditions.items()) + # Need to separate because it could have more that tuple of condition + # for the same action. + if self.conditions.get(ResourceAction.START): + for (group, state, time) in self.conditions.get(ResourceAction.START): + reschedule, delay = self._needs_reschedule(group, state, time) if reschedule: break if reschedule: - callback = functools.partial(self.start_with_conditions, - group, state, time) + callback = functools.partial(self.start_with_conditions) self.ec.schedule(delay, callback) else: + print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ----\ +------------------------------------------------------------------------------\ +---------------------------------------------------------------- STARTING -- " self.start() def stop_with_conditions(self): + """ Starts when all the conditions are reached + + """ reschedule = False delay = _reschedule_delay ## evaluate if set conditions are met - # only can start when RM is either STOPPED or DEPLOYED - if self.status != ResourceStatus.STARTED: + # only can stop when RM is STARTED + if self.state != ResourceState.STARTED: reschedule = True else: - for action, (group, state, time) in self.conditions.iteritems(): - if action == ResourceAction.STOP: - reschedule, delay = self._needs_reschedule(group, state, time) - if reschedule: - break + print TIME.strftime("%H:%M:%S", TIME.localtime()) + " RM : " + self._rtype + " (Guid : "+ str(self.guid) +") ---- stop condition : " + str(self.conditions.items()) + # Need to separate because it could have more that tuple of condition + # for the same action. + conditions = self.conditions.get(ResourceAction.STOP, []) + for (group, state, time) in conditions: + reschedule, delay = self._needs_reschedule(group, state, time) + if reschedule: + break + + #else: + # for action, (group, state, time) in self.conditions.iteritems(): + # if action == ResourceAction.STOP: + # reschedule, delay = self._needs_reschedule(group, state, time) + # if reschedule: + # break if reschedule: - callback = functools.partial(self.stop_with_conditions, - group, state, time) + callback = functools.partial(self.stop_with_conditions) self.ec.schedule(delay, callback) else: self.stop() def deploy(self): + """Execute all the differents steps required to reach the state DEPLOYED + + """ self.discover() self.provision() self._state = ResourceState.DEPLOYED def release(self): + """Clean the resource at the end of the Experiment and change the status + + """ self._state = ResourceState.RELEASED def _validate_connection(self, guid): + """Check if the connection is available. + + :param guid: Guid of the current Resource Manager + :type guid: int + :rtype: Boolean + + """ # TODO: Validate! return True diff --git a/src/neco/resources/linux/application.py b/src/neco/resources/linux/application.py index de4f8505..2005617e 100644 --- a/src/neco/resources/linux/application.py +++ b/src/neco/resources/linux/application.py @@ -147,7 +147,7 @@ class LinuxApplication(ResourceManager): return True # XXX: What if it is connected to more than one node? resources = self.find_resources(exact_tags = [tags.NODE]) - self._node = resources[0] is len(resources) == 1 else None + self._node = resources[0] if len(resources) == 1 else None return self._node diff --git a/src/neco/resources/omf/omf_api.py b/src/neco/resources/omf/omf_api.py index 38aa72a1..7c71090a 100644 --- a/src/neco/resources/omf/omf_api.py +++ b/src/neco/resources/omf/omf_api.py @@ -3,6 +3,9 @@ import logging import ssl import sys import time +import hashlib +import neco +import threading from neco.resources.omf.omf_client import OMFClient from neco.resources.omf.omf_messages_5_4 import MessageHandler @@ -28,6 +31,20 @@ class OMFAPI(object): """ def __init__(self, slice, host, port, password, xmpp_root = None): + """ + + :param slice: Xmpp Slice + :type slice: Str + :param host: Xmpp Server + :type host: Str + :param port: Xmpp Port + :type port: Str + :param password: Xmpp password + :type password: Str + :param xmpp_root: Root of the Xmpp Topic Architecture + :type xmpp_root: Str + + """ date = datetime.datetime.now().strftime("%Y-%m-%dt%H.%M.%S") tz = -time.altzone if time.daylight != 0 else -time.timezone date += "%+06.2f" % (tz / 3600) # timezone difference is in seconds @@ -39,7 +56,8 @@ class OMFAPI(object): self._hostnames = [] self._xmpp_root = xmpp_root or "OMF_5.4" - self._logger = logging.getLogger("neco.resources.omf") + self._logger = logging.getLogger("neco.omf.omfApi ") + self._logger.setLevel(neco.LOGLEVEL) # OMF xmpp client self._client = None @@ -70,7 +88,7 @@ class OMFAPI(object): xmpp.ssl_version = ssl.PROTOCOL_SSLv3 if xmpp.connect((self._host, self._port)): - xmpp.process(threaded=True) + xmpp.process(block=False) while not xmpp.ready: time.sleep(1) self._client = xmpp @@ -96,8 +114,8 @@ class OMFAPI(object): """ address = "/%s/%s/%s/%s" % (self._host, self._xmpp_root, self._slice, self._user) - print address - payload = self._message.newexpfunction(self._user, address) + #print address + payload = self._message.newexp_function(self._user, address) slice_sid = "/%s/%s" % (self._xmpp_root, self._slice) self._client.publish(payload, slice_sid) @@ -109,7 +127,7 @@ class OMFAPI(object): self._client.create(xmpp_node) self._client.subscribe(xmpp_node) - payload = self._message.logfunction("2", + payload = self._message.log_function("2", "nodeHandler::NodeHandler", "INFO", "OMF Experiment Controller 5.4 (git 529a626)") @@ -181,7 +199,7 @@ class OMFAPI(object): xmpp_node = self._host_resource_id(hostname) self._client.subscribe(xmpp_node) - payload = self._message.enrollfunction("1", "*", "1", hostname) + payload = self._message.enroll_function("1", "*", "1", hostname) self._client.publish(payload, xmpp_node) def configure(self, hostname, attribute, value): @@ -195,7 +213,7 @@ class OMFAPI(object): :type value: str """ - payload = self._message.configurefunction(hostname, value, attribute) + payload = self._message.configure_function(hostname, value, attribute) xmpp_node = self._host_session_id(hostname) self._client.publish(payload, xmpp_node) @@ -214,7 +232,7 @@ class OMFAPI(object): :type env: str """ - payload = self._message.executefunction(hostname, app_id, arguments, path, env) + payload = self._message.execute_function(hostname, app_id, arguments, path, env) xmpp_node = self._host_session_id(hostname) self._client.publish(payload, xmpp_node) @@ -227,34 +245,42 @@ class OMFAPI(object): :type app_id: str """ - payload = self._message.exitfunction(hostname, app_id) + payload = self._message.exit_function(hostname, app_id) xmpp_node = self._host_session_id(hostname) self._client.publish(payload, xmpp_node) - def disconnect(self): - """ Delete the sesion and logger topic and disconnect + def release(self, hostname): + """ Delete the session and logger topics. Then disconnect + + """ + if hostname in self._hostnames: + self.delete(hostname) + + def disconnect(self) : + """ Delete the session and logger topics. Then disconnect """ self._client.delete(self._exp_session_id) self._client.delete(self._logger_session_id) - for hostname in self._hostnames[:]: - self.delete(hostname) - time.sleep(1) - self._client.disconnect() + + # Wait the send queue to be empty before disconnect + self._client.disconnect(wait=True) + self._logger.debug(" Disconnected from XMPP Server") class OMFAPIFactory(object): """ .. note:: - It allows the different RM to use the same xmpp client if they use the same credentials. For the moment, it is focused on Xmpp. + It allows the different RM to use the same xmpp client if they use the same credentials. + For the moment, it is focused on Xmpp. """ - - # XXX: put '_apis' instead of '_Api' - _Api = dict() + # use lock to avoid concurrent access to the Api list at the same times by 2 different threads + lock = threading.Lock() + _apis = dict() @classmethod def get_api(cls, slice, host, port, password): @@ -271,11 +297,16 @@ class OMFAPIFactory(object): """ if slice and host and port and password: - key = cls._hash_api(slice, host, port) - if key in cls._Api: - return cls._Api[key] + key = cls._make_key(slice, host, port, password) + cls.lock.acquire() + if key in cls._apis: + cls._apis[key]['cnt'] += 1 + cls.lock.release() + return cls._apis[key]['api'] else : - return cls.create_api(slice, host, port, password) + omf_api = cls.create_api(slice, host, port, password) + cls.lock.release() + return omf_api return None @classmethod @@ -292,24 +323,16 @@ class OMFAPIFactory(object): :type password: str """ - OmfApi = OMFAPI(slice, host, port, password) - key = cls._hash_api(slice, host, port) - cls._Api[key] = OmfApi - return OmfApi - - # XXX: this is not a hash :) - # From wikipedia: "A hash function is any algorithm or subroutine that maps large data - # sets of variable length to smaller data sets of a fixed length." - # The idea is to apply a function to get a smaller string. Use hashlib instead. - # e.g: - # import hashlib - # res = slice + "_" + host + "_" + port - # hashlib.md5(res).hexdigest() - # - # XXX: change method name for 'make_key' + omf_api = OMFAPI(slice, host, port, password) + key = cls._make_key(slice, host, port, password) + cls._apis[key] = {} + cls._apis[key]['api'] = omf_api + cls._apis[key]['cnt'] = 1 + return omf_api + @classmethod - def _hash_api(cls, slice, host, port): - """ Hash the credentials in order to create a key + def release_api(cls, slice, host, port, password): + """ Release an API with this credentials :param slice: Xmpp Slice Name :type slice: str @@ -317,12 +340,30 @@ class OMFAPIFactory(object): :type host: str :param port: Xmpp Port (Default : 5222) :type port: str + :param password: Xmpp Password + :type password: str """ - res = slice + "_" + host + "_" + port - return res + if slice and host and port and password: + key = cls._make_key(slice, host, port, password) + if key in cls._apis: + cls._apis[key]['cnt'] -= 1 + #print "Api Counter : " + str(cls._apis[key]['cnt']) + if cls._apis[key]['cnt'] == 0: + omf_api = cls._apis[key]['api'] + omf_api.disconnect() + @classmethod + def _make_key(cls, *args): + """ Hash the credentials in order to create a key + + :param args: list of arguments used to create the hash (user, host, port, ...) + :type args: list of args + + """ + skey = "".join(map(str, args)) + return hashlib.md5(skey).hexdigest() diff --git a/src/neco/resources/omf/omf_application.py b/src/neco/resources/omf/omf_application.py index 2ffe809b..1e64974b 100644 --- a/src/neco/resources/omf/omf_application.py +++ b/src/neco/resources/omf/omf_application.py @@ -1,14 +1,14 @@ #!/usr/bin/env python -from neco.execution.resource import Resource, clsinit -from neco.execution.attribute import Attribute +from neco.execution.resource import ResourceManager, clsinit +from neco.execution.attribute import Attribute, Flags from neco.resources.omf.omf_api import OMFAPIFactory import neco import logging @clsinit -class OMFApplication(Resource): +class OMFApplication(ResourceManager): """ .. class:: Class Args : @@ -16,7 +16,7 @@ class OMFApplication(Resource): :type ec: ExperimentController :param guid: guid of the RM :type guid: int - :param creds: Credentials to communicate with the rm (XmppClient for OMF) + :param creds: Credentials to communicate with the rm (XmppClient) :type creds: dict .. note:: @@ -36,10 +36,10 @@ class OMFApplication(Resource): path = Attribute("path", "Path of the application") args = Attribute("args", "Argument of the application") env = Attribute("env", "Environnement variable of the application") - xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02") - xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02") - xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02") - xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02") + xmppSlice = Attribute("xmppSlice","Name of the slice", flags = Flags.Credential) + xmppHost = Attribute("xmppHost", "Xmpp Server",flags = Flags.Credential) + xmppPort = Attribute("xmppPort", "Xmpp Port",flags = Flags.Credential) + xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = Flags.Credential) cls._register_attribute(appid) cls._register_attribute(path) cls._register_attribute(args) @@ -50,7 +50,7 @@ class OMFApplication(Resource): cls._register_attribute(xmppPassword) - def __init__(self, ec, guid, creds): + def __init__(self, ec, guid): """ :param ec: The Experiment controller :type ec: ExperimentController @@ -62,10 +62,6 @@ class OMFApplication(Resource): """ super(OMFApplication, self).__init__(ec, guid) - self.set('xmppSlice', creds['xmppSlice']) - self.set('xmppHost', creds['xmppHost']) - self.set('xmppPort', creds['xmppPort']) - self.set('xmppPassword', creds['xmppPassword']) self.set('appid', "") self.set('path', "") @@ -74,7 +70,7 @@ class OMFApplication(Resource): self._node = None - self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) + self._omf_api = None self._logger = logging.getLogger("neco.omf.omfApp ") self._logger.setLevel(neco.LOGLEVEL) @@ -88,7 +84,7 @@ class OMFApplication(Resource): :rtype: Boolean """ - rm = self.ec.resource(guid) + rm = self.ec.get_resource(guid) if rm.rtype() not in self._authorized_connections: self._logger.debug("Connection between %s %s and %s %s refused : An Application can be connected only to a Node" % (self.rtype(), self._guid, rm.rtype(), guid)) return False @@ -108,17 +104,26 @@ class OMFApplication(Resource): """ for elt in conn_set: - rm = self.ec.resource(elt) + rm = self.ec.get_resource(elt) if rm.rtype() == "OMFNode": return rm return None + def deploy(self): + """Deploy the RM + + """ + super(OMFApplication, self).deploy() + self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), + self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) + def start(self): """Send Xmpp Message Using OMF protocol to execute the application """ + super(OMFApplication, self).start() self._logger.debug(" " + self.rtype() + " ( Guid : " + str(self._guid) +") : " + self.get('appid') + " : " + self.get('path') + " : " + self.get('args') + " : " + self.get('env')) - #try: + if self.get('appid') and self.get('path') and self.get('args') and self.get('env'): rm_node = self._get_nodes(self._connections) self._omf_api.execute(rm_node.get('hostname'),self.get('appid'), self.get('args'), self.get('path'), self.get('env')) @@ -127,8 +132,15 @@ class OMFApplication(Resource): """Send Xmpp Message Using OMF protocol to kill the application """ + rm_node = self._get_nodes(self._connections) self._omf_api.exit(rm_node.get('hostname'),self.get('appid')) + super(OMFApplication, self).stop() + def release(self): + """Clean the RM at the end of the experiment + """ + OMFAPIFactory.release_api(self.get('xmppSlice'), + self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) diff --git a/src/neco/resources/omf/omf_channel.py b/src/neco/resources/omf/omf_channel.py index d9da3ed7..192de276 100644 --- a/src/neco/resources/omf/omf_channel.py +++ b/src/neco/resources/omf/omf_channel.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -from neco.execution.resource import Resource, clsinit -from neco.execution.attribute import Attribute +from neco.execution.resource import ResourceManager, clsinit +from neco.execution.attribute import Attribute, Flags from neco.resources.omf.omf_api import OMFAPIFactory @@ -8,7 +8,7 @@ import neco import logging @clsinit -class OMFChannel(Resource): +class OMFChannel(ResourceManager): """ .. class:: Class Args : @@ -32,17 +32,17 @@ class OMFChannel(Resource): """Register the attributes of an OMF channel """ channel = Attribute("channel", "Name of the application") - xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02") - xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02") - xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02") - xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02") + xmppSlice = Attribute("xmppSlice","Name of the slice", flags = Flags.Credential) + xmppHost = Attribute("xmppHost", "Xmpp Server",flags = Flags.Credential) + xmppPort = Attribute("xmppPort", "Xmpp Port",flags = Flags.Credential) + xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = Flags.Credential) cls._register_attribute(channel) cls._register_attribute(xmppSlice) cls._register_attribute(xmppHost) cls._register_attribute(xmppPort) cls._register_attribute(xmppPassword) - def __init__(self, ec, guid, creds): + def __init__(self, ec, guid): """ :param ec: The Experiment controller :type ec: ExperimentController @@ -53,14 +53,10 @@ class OMFChannel(Resource): """ super(OMFChannel, self).__init__(ec, guid) - self.set('xmppSlice', creds['xmppSlice']) - self.set('xmppHost', creds['xmppHost']) - self.set('xmppPort', creds['xmppPort']) - self.set('xmppPassword', creds['xmppPassword']) self._nodes_guid = list() - self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) + self._omf_api = None self._logger = logging.getLogger("neco.omf.omfChannel") self._logger.setLevel(neco.LOGLEVEL) @@ -73,9 +69,10 @@ class OMFChannel(Resource): :rtype: Boolean """ - rm = self.ec.resource(guid) + rm = self.ec.get_resource(guid) if rm.rtype() in self._authorized_connections: - self._logger.debug("Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid)) + self._logger.debug("Connection between %s %s and %s %s accepted" % + (self.rtype(), self._guid, rm.rtype(), guid)) return True self._logger.debug("Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid)) return False @@ -88,24 +85,33 @@ class OMFChannel(Resource): :type conn_set: set :rtype: list :return: self._nodes_guid + """ for elt in conn_set: - rm_iface = self.ec.resource(elt) + rm_iface = self.ec.get_resource(elt) for conn in rm_iface._connections: - rm_node = self.ec.resource(conn) + rm_node = self.ec.get_resource(conn) if rm_node.rtype() == "OMFNode": couple = [rm_node.get('hostname'), rm_iface.get('alias')] #print couple self._nodes_guid.append(couple) return self._nodes_guid + def deploy(self): + """Deploy the RM + + """ + super(OMFChannel, self).deploy() + self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), + self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) + def discover(self): """ Discover the availables channels """ pass - def provision(self, credential): + def provision(self): """ Provision some availables channels """ @@ -117,23 +123,25 @@ class OMFChannel(Resource): """ if self.get('channel'): set_nodes = self._get_target(self._connections) - #print set_nodes + print set_nodes for couple in set_nodes: #print "Couple node/alias : " + couple[0] + " , " + couple[1] attrval = self.get('channel') attrname = "net/%s/%s" % (couple[1], 'channel') #print "Send the configure message" self._omf_api.configure(couple[0], attrname, attrval) + super(OMFChannel, self).start() - def xstart(self): - try: - if self.get('channel'): - node = self.tc.elements.get(self._node_guid) - attrval = self.get('channel') - attrname = "net/%s/%s" % (self._alias, 'channel') - self._omf_api.configure('omf.plexus.wlab17', attrname, attrval) - except AttributeError: - # If the attribute is not yet defined, ignore the error - pass + def stop(self): + """Send Xmpp Message Using OMF protocol to put down the interface + """ + super(OMFChannel, self).stop() + + def release(self): + """Clean the RM at the end of the experiment + + """ + OMFAPIFactory.release_api(self.get('xmppSlice'), + self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) diff --git a/src/neco/resources/omf/omf_client.py b/src/neco/resources/omf/omf_client.py index 3764f657..53dcb39c 100644 --- a/src/neco/resources/omf/omf_client.py +++ b/src/neco/resources/omf/omf_client.py @@ -6,7 +6,8 @@ import xml.etree.ElementTree as ET import neco -class OMFClient(sleekxmpp.ClientXMPP): +# inherit from BaseXmpp and XMLStream classes +class OMFClient(sleekxmpp.ClientXMPP): """ .. class:: Class Args : @@ -22,6 +23,15 @@ class OMFClient(sleekxmpp.ClientXMPP): """ def __init__(self, jid, password): + """ + + :param jid: Jabber Id (= Xmpp Slice + Date) + :type jid: Str + :param password: Jabber Password (= Xmpp Password) + :type password: Str + + + """ sleekxmpp.ClientXMPP.__init__(self, jid, password) self._ready = False self._registered = False @@ -41,16 +51,25 @@ class OMFClient(sleekxmpp.ClientXMPP): @property def ready(self): + """ Check if the client is ready + + """ return self._ready def start(self, event): + """ Send presence to the Xmppp Server. This function is called directly by the sleekXmpp library + + """ self.send_presence() self._ready = True self._server = "pubsub.%s" % self.boundjid.domain def register(self, iq): + """ Register to the Xmppp Server. This function is called directly by the sleekXmpp library + + """ if self._registered: - self._logger.info("%s already registered!" % self.boundjid) + self._logger.info(" %s already registered!" % self.boundjid) return resp = self.Iq() @@ -60,26 +79,32 @@ class OMFClient(sleekxmpp.ClientXMPP): try: resp.send(now=True) - self._logger.info("Account created for %s!" % self.boundjid) + self._logger.info(" Account created for %s!" % self.boundjid) self._registered = True except IqError as e: - self._logger.error("Could not register account: %s" % + self._logger.error(" Could not register account: %s" % e.iq['error']['text']) except IqTimeout: - self._logger.error("No response from server.") + self._logger.error(" No response from server.") def unregister(self): + """ Unregister from the Xmppp Server. + + """ try: self.plugin['xep_0077'].cancel_registration( ifrom=self.boundjid.full) - self._logger.info("Account unregistered for %s!" % self.boundjid) + self._logger.info(" Account unregistered for %s!" % self.boundjid) except IqError as e: - self._logger.error("Could not unregister account: %s" % + self._logger.error(" Could not unregister account: %s" % e.iq['error']['text']) except IqTimeout: - self._logger.error("No response from server.") + self._logger.error(" No response from server.") def nodes(self): + """ Get all the nodes of the Xmppp Server. + + """ try: result = self['xep_0060'].get_nodes(self._server) for item in result['disco_items']['items']: @@ -87,9 +112,12 @@ class OMFClient(sleekxmpp.ClientXMPP): return result except: error = traceback.format_exc() - self._logger.error('Could not retrieve node list.\ntraceback:\n%s', error) + self._logger.error(' Could not retrieve node list.\ntraceback:\n%s', error) def subscriptions(self): + """ Get all the subscriptions of the Xmppp Server. + + """ try: result = self['xep_0060'].get_subscriptions(self._server) #self.boundjid.full) @@ -98,9 +126,15 @@ class OMFClient(sleekxmpp.ClientXMPP): return result except: error = traceback.format_exc() - self._logger.error('Could not retrieve subscriptions.\ntraceback:\n%s', error) + self._logger.error(' Could not retrieve subscriptions.\ntraceback:\n%s', error) def create(self, node): + """ Create the topic corresponding to the node + + :param node: Name of the topic, corresponding to the node (ex : omf.plexus.wlab17) + :type node: str + + """ self._logger.debug(" Create Topic : " + node) config = self['xep_0004'].makeForm('submit') @@ -115,28 +149,53 @@ class OMFClient(sleekxmpp.ClientXMPP): self['xep_0060'].create_node(self._server, node, config = config) except: error = traceback.format_exc() - self._logger.error('Could not create topic: %s\ntraceback:\n%s' % (node, error)) + self._logger.error(' Could not create topic: %s\ntraceback:\n%s' % (node, error)) def delete(self, node): + """ Delete the topic corresponding to the node + + :param node: Name of the topic, corresponding to the node (ex : omf.plexus.wlab17) + :type node: str + + """ + # To check if the queue are well empty at the end + #print " length of the queue : " + str(self.send_queue.qsize()) + #print " length of the queue : " + str(self.event_queue.qsize()) try: self['xep_0060'].delete_node(self._server, node) - self._logger.info('Deleted node: %s' % node) + self._logger.info(' Deleted node: %s' % node) except: error = traceback.format_exc() - self._logger.error('Could not delete topic: %s\ntraceback:\n%s' % (node, error)) + self._logger.error(' Could not delete topic: %s\ntraceback:\n%s' % (node, error)) def publish(self, data, node): - self._logger.debug(" Publish to Topic :" + node) + """ Publish the data to the corresponding topic + + :param data: Data that will be published + :type data: str + :param node: Name of the topic + :type node: str + + """ + + self._logger.debug(" Publish to Topic : " + node) try: result = self['xep_0060'].publish(self._server,node,payload=data) # id = result['pubsub']['publish']['item']['id'] # print('Published at item id: %s' % id) except: error = traceback.format_exc() - self._logger.error('Could not publish to: %s\ntraceback:\n%s' \ + self._logger.error(' Could not publish to: %s\ntraceback:\n%s' \ % (node, error)) def get(self, data): + """ Get the item + + :param data: data from which the items will be get back + :type data: str + + + """ try: result = self['xep_0060'].get_item(self._server, self.boundjid, data) @@ -145,28 +204,43 @@ class OMFClient(sleekxmpp.ClientXMPP): tostring(item['payload']))) except: error = traceback.format_exc() - self._logger.error('Could not retrieve item %s from topic %s\ntraceback:\n%s' \ + self._logger.error(' Could not retrieve item %s from topic %s\ntraceback:\n%s' \ % (data, self.boundjid, error)) def retract(self, data): + """ Retract the item + + :param data: data from which the item will be retracted + :type data: str + + """ try: result = self['xep_0060'].retract(self._server, self.boundjid, data) - self._logger.info('Retracted item %s from topic %s' % (data, self.boundjid)) + self._logger.info(' Retracted item %s from topic %s' % (data, self.boundjid)) except: error = traceback.format_exc() - self._logger.error('Could not retract item %s from topic %s\ntraceback:\n%s' \ + self._logger.error(' Could not retract item %s from topic %s\ntraceback:\n%s' \ % (data, self.boundjid, error)) def purge(self): + """ Purge the information in the server + + """ try: result = self['xep_0060'].purge(self._server, self.boundjid) - self._logger.info('Purged all items from topic %s' % self.boundjid) + self._logger.info(' Purged all items from topic %s' % self.boundjid) except: error = traceback.format_exc() - self._logger.error('Could not purge items from topic %s\ntraceback:\n%s' \ + self._logger.error(' Could not purge items from topic %s\ntraceback:\n%s' \ % (self.boundjid, error)) def subscribe(self, node): + """ Subscribe to a topic + + :param node: Name of the topic + :type node: str + + """ try: result = self['xep_0060'].subscribe(self._server, node) #self._logger.debug('Subscribed %s to node %s' \ @@ -179,6 +253,12 @@ class OMFClient(sleekxmpp.ClientXMPP): % (self.boundjid.bare, node, error)) def unsubscribe(self, node): + """ Unsubscribe to a topic + + :param node: Name of the topic + :type node: str + + """ try: result = self['xep_0060'].unsubscribe(self._server, node) self._logger.info(' Unsubscribed %s from topic %s' % (self.boundjid.bare, node)) @@ -187,27 +267,51 @@ class OMFClient(sleekxmpp.ClientXMPP): self._logger.error(' Could not unsubscribe %s from topic %s\ntraceback:\n%s' \ % (self.boundjid.bare, node, error)) - def _check_for_tag(self, treeroot, namespaces, tag): - for element in treeroot.iter(namespaces+tag): + def _check_for_tag(self, root, namespaces, tag): + """ Check if an element markup is in the ElementTree + + :param root: Root of the tree + :type root: ElementTree Element + :param namespaces: Namespaces of the element + :type namespaces: str + :param tag: Tag that will search in the tree + :type tag: str + + """ + for element in root.iter(namespaces+tag): if element.text: return element else : return None - def _check_output(self, treeroot, namespaces): - output_param = ["TARGET", "REASON", "PATH", "APPID", "VALUE"] + def _check_output(self, root, namespaces): + """ Check the significative element in the answer and display it + + :param root: Root of the tree + :type root: ElementTree Element + :param namespaces: Namespaces of the tree + :type namespaces: str + + """ + fields = ["TARGET", "REASON", "PATH", "APPID", "VALUE"] response = "" - for elt in output_param: - msg = self._check_for_tag(treeroot, namespaces, elt) + for elt in fields: + msg = self._check_for_tag(root, namespaces, elt) if msg is not None: response = response + " " + msg.text + " :" - deb = self._check_for_tag(treeroot, namespaces, "MESSAGE") + deb = self._check_for_tag(root, namespaces, "MESSAGE") if deb is not None: self._logger.debug(response + " " + deb.text) else : self._logger.info(response) def handle_omf_message(self, iq): + """ Handle published/received message + + :param iq: Stanzas that is currently published/received + :type iq: Iq Stanza + + """ namespaces = "{http://jabber.org/protocol/pubsub}" for i in iq['pubsub_event']['items']: root = ET.fromstring(str(i)) diff --git a/src/neco/resources/omf/omf_interface.py b/src/neco/resources/omf/omf_interface.py index 982d8cdd..bbfc0452 100644 --- a/src/neco/resources/omf/omf_interface.py +++ b/src/neco/resources/omf/omf_interface.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -from neco.execution.resource import Resource, clsinit -from neco.execution.attribute import Attribute +from neco.execution.resource import ResourceManager, clsinit +from neco.execution.attribute import Attribute, Flags from neco.resources.omf.omf_api import OMFAPIFactory @@ -8,7 +8,7 @@ import neco import logging @clsinit -class OMFWifiInterface(Resource): +class OMFWifiInterface(ResourceManager): """ .. class:: Class Args : @@ -32,16 +32,17 @@ class OMFWifiInterface(Resource): @classmethod def _register_attributes(cls): """Register the attributes of an OMF interface + """ - alias = Attribute("alias","Alias of the interface", default_value = "w0") + alias = Attribute("alias","Alias of the interface", default_value = "w0") mode = Attribute("mode","Mode of the interface") type = Attribute("type","Type of the interface") essid = Attribute("essid","Essid of the interface") ip = Attribute("ip","IP of the interface") - xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02") - xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02") - xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02") - xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02") + xmppSlice = Attribute("xmppSlice","Name of the slice", flags = Flags.Credential) + xmppHost = Attribute("xmppHost", "Xmpp Server",flags = Flags.Credential) + xmppPort = Attribute("xmppPort", "Xmpp Port",flags = Flags.Credential) + xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = Flags.Credential) cls._register_attribute(alias) cls._register_attribute(xmppSlice) cls._register_attribute(xmppHost) @@ -52,7 +53,7 @@ class OMFWifiInterface(Resource): cls._register_attribute(essid) cls._register_attribute(ip) - def __init__(self, ec, guid, creds): + def __init__(self, ec, guid): """ :param ec: The Experiment controller :type ec: ExperimentController @@ -63,52 +64,60 @@ class OMFWifiInterface(Resource): """ super(OMFWifiInterface, self).__init__(ec, guid) - self.set('xmppSlice', creds['xmppSlice']) - self.set('xmppHost', creds['xmppHost']) - self.set('xmppPort', creds['xmppPort']) - self.set('xmppPassword', creds['xmppPassword']) - self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) + self._omf_api = None self._alias = self.get('alias') self._logger = logging.getLogger("neco.omf.omfIface ") self._logger.setLevel(neco.LOGLEVEL) def _validate_connection(self, guid): - """Check if the connection is available. + """ Check if the connection is available. :param guid: Guid of the current RM :type guid: int :rtype: Boolean """ - rm = self.ec.resource(guid) + rm = self.ec.get_resource(guid) if rm.rtype() in self._authorized_connections: - self._logger.debug("Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid)) + self._logger.debug("Connection between %s %s and %s %s accepted" % + (self.rtype(), self._guid, rm.rtype(), guid)) return True - self._logger.debug("Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid)) + self._logger.debug("Connection between %s %s and %s %s refused" % + (self.rtype(), self._guid, rm.rtype(), guid)) return False def _get_nodes(self, conn_set): - """ - Get the RM of the node to which the application is connected + """ Get the RM of the node to which the application is connected :param conn_set: Connections of the current Guid :type conn_set: set :rtype: ResourceManager + """ for elt in conn_set: - rm = self.ec.resource(elt) + rm = self.ec.get_resource(elt) if rm.rtype() == "OMFNode": return rm return None + def deploy(self): + """Deploy the RM + + """ + super(OMFWifiInterface, self).deploy() + self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), + self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) + def start(self): """Send Xmpp Messages Using OMF protocol to configure Interface """ - self._logger.debug(self.rtype() + " ( Guid : " + str(self._guid) +") : " + self.get('mode') + " : " + self.get('type') + " : " + self.get('essid') + " : " + self.get('ip')) + self._logger.debug(" " + self.rtype() + " ( Guid : " + str(self._guid) +") : " + + self.get('mode') + " : " + self.get('type') + " : " + + self.get('essid') + " : " + self.get('ip')) #try: if self.get('mode') and self.get('type') and self.get('essid') and self.get('ip'): rm_node = self._get_nodes(self._connections) @@ -117,12 +126,19 @@ class OMFWifiInterface(Resource): attrname = "net/%s/%s" % (self._alias, attrname) #print "Send the configure message" self._omf_api.configure(rm_node.get('hostname'), attrname, attrval) + super(OMFWifiInterface, self).start() def stop(self): """Send Xmpp Message Using OMF protocol to put down the interface """ - self._omf_api.disconnect() + super(OMFWifiInterface, self).stop() + def release(self): + """Clean the RM at the end of the experiment + + """ + OMFAPIFactory.release_api(self.get('xmppSlice'), + self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) diff --git a/src/neco/resources/omf/omf_messages_5_4.py b/src/neco/resources/omf/omf_messages_5_4.py index 4b072651..c5857c2c 100644 --- a/src/neco/resources/omf/omf_messages_5_4.py +++ b/src/neco/resources/omf/omf_messages_5_4.py @@ -1,26 +1,5 @@ from xml.etree import cElementTree as ET -EXECUTE = "EXECUTE" -KILL = "KILL" -STDIN = "STDIN" -NOOP = "NOOP" -PM_INSTALL = "PM_INSTALL" -APT_INSTALL = "APT_INSTALL" -RPM_INSTALL = "RPM_INSTALL" -RESET = "RESET" -REBOOT = "REBOOT" -MODPROBE = "MODPROBE" -CONFIGURE = "CONFIGURE" -LOAD_IMAGE = "LOAD_IMAGE" -SAVE_IMAGE = "SAVE_IMAGE" -LOAD_DATA = "LOAD_DATA" -SET_LINK = "SET_LINK" -ALIAS = "ALIAS" -SET_DISCONNECTION = "SET_DISCONNECTION" -RESTART = "RESTART" -ENROLL = "ENROLL" -EXIT = "EXIT" - class MessageHandler(): """ .. class:: Class Args : @@ -36,102 +15,201 @@ class MessageHandler(): """ - def __init__(self, sliceid, expid ): + """ + + :param sliceid: Slice Name (= Xmpp Slice) + :type expid: Str + :param expid: Experiment ID (= Xmpp User) + :type expid: Str + + """ self._slice_id = sliceid self._exp_id = expid - print "init" + self._exp_id +" "+ self._slice_id - pass - - def Mid(self, parent, keyword): - mid = ET.SubElement(parent, keyword) - mid.set("id", "\'omf-payload\'") - return mid - def Mtext(self, parent, keyword, text): - mtext = ET.SubElement(parent, keyword) - mtext.text = text - return mtext - def executefunction(self, target, appid, cmdlineargs, path, env): + def _id_element(self, parent, markup): + """ Insert a markup element with an id + + :param parent: Parent element in an XML point of view + :type parent: ElementTree Element + :param markup: Name of the markup + :type markup: str + + """ + elt = ET.SubElement(parent, markup) + elt.set("id", "\'omf-payload\'") + return elt + + def _attr_element(self, parent, markup, text): + """ Insert a markup element with a text (value) + + :param parent: Parent element in an XML point of view + :type parent: ElementTree Element + :param markup: Name of the markup + :type markup: str + :param text: Value of the markup element + :type text: str + + """ + elt = ET.SubElement(parent, markup) + elt.text = text + return elt + + def execute_function(self, target, appid, cmdlineargs, path, env): + """ Build an Execute Message + + :param target: Hrn of the target node (ex : omf.plexus.wlab17) + :type target: str + :param appid: Application id + :type appid: str + :param cmdlineargs: Arguments of the application + :type cmdlineargs: str + :param path: Path of the application + :type path: str + :param env: Environment variables + :type env: str + + """ payload = ET.Element("omf-message") - execute = self.Mid(payload,"EXECUTE") - env = self.Mtext(execute, "ENV", env) - sliceid = self.Mtext(execute,"SLICEID",self._slice_id) - expid = self.Mtext(execute,"EXPID",self._exp_id) - target = self.Mtext(execute,"TARGET",target) - appid = self.Mtext(execute,"APPID",appid) - cmdlineargs = self.Mtext(execute,"CMDLINEARGS",cmdlineargs) - path = self.Mtext(execute,"PATH",path) + execute = self._id_element(payload,"EXECUTE") + env = self._attr_element(execute, "ENV", env) + sliceid = self._attr_element(execute,"SLICEID",self._slice_id) + expid = self._attr_element(execute,"EXPID",self._exp_id) + target = self._attr_element(execute,"TARGET",target) + appid = self._attr_element(execute,"APPID",appid) + cmdlineargs = self._attr_element(execute,"CMDLINEARGS",cmdlineargs) + path = self._attr_element(execute,"PATH",path) return payload - def exitfunction(self, target, appid): + def exit_function(self, target, appid): + """ Build an Exit Message + + :param target: Hrn of the target node (ex : omf.plexus.wlab17) + :type target: str + :param appid: Application id (ex : vlc#1) + :type appid: str + + """ payload = ET.Element("omf-message") - execute = self.Mid(payload,"EXIT") - sliceid = self.Mtext(execute,"SLICEID",self._slice_id) - expid = self.Mtext(execute,"EXPID",self._exp_id) - target = self.Mtext(execute,"TARGET",target) - appid = self.Mtext(execute,"APPID",appid) + execute = self._id_element(payload,"EXIT") + sliceid = self._attr_element(execute,"SLICEID",self._slice_id) + expid = self._attr_element(execute,"EXPID",self._exp_id) + target = self._attr_element(execute,"TARGET",target) + appid = self._attr_element(execute,"APPID",appid) return payload - def configurefunction(self, target, value, path): + def configure_function(self, target, value, path): + """ Build a Configure Message + + :param target: Hrn of the target node (ex : omf.plexus.wlab17) + :type target: str + :param value: guid of the RM + :type value: int + :param path: Path of the element to configure (ex : net/w0/channel) + :type path: dict + + """ payload = ET.Element("omf-message") - config = self.Mid(payload, "CONFIGURE") - sliceid = self.Mtext(config,"SLICEID",self._slice_id) - expid = self.Mtext(config,"EXPID",self._exp_id) - target = self.Mtext(config,"TARGET",target) - value = self.Mtext(config,"VALUE",value) - path = self.Mtext(config,"PATH",path) + config = self._id_element(payload, "CONFIGURE") + sliceid = self._attr_element(config,"SLICEID",self._slice_id) + expid = self._attr_element(config,"EXPID",self._exp_id) + target = self._attr_element(config,"TARGET",target) + value = self._attr_element(config,"VALUE",value) + path = self._attr_element(config,"PATH",path) return payload - def logfunction(self,level, logger, level_name, data): + def log_function(self,level, logger, level_name, data): + """ Build a Log Message + + :param level: Level of logging + :type level: str + :param logger: Element publishing the log + :type logger: str + :param level_name: Name of the level (ex : INFO) + :type level_name: str + :param data: Content to publish + :type data: str + + """ payload = ET.Element("omf-message") - log = self.Mid(payload, "LOGGING") - level = self.Mtext(log,"LEVEL",level) - sliceid = self.Mtext(log,"SLICEID",self._slice_id) - logger = self.Mtext(log,"LOGGER",logger) - expid = self.Mtext(log,"EXPID",self._exp_id) - level_name = self.Mtext(log,"LEVEL_NAME",level_name) - data = self.Mtext(log,"DATA",data) + log = self._id_element(payload, "LOGGING") + level = self._attr_element(log,"LEVEL",level) + sliceid = self._attr_element(log,"SLICEID",self._slice_id) + logger = self._attr_element(log,"LOGGER",logger) + expid = self._attr_element(log,"EXPID",self._exp_id) + level_name = self._attr_element(log,"LEVEL_NAME",level_name) + data = self._attr_element(log,"DATA",data) return payload - def aliasfunction(self, name, target): + def alias_function(self, name, target): + """ Build a Alias Message + + :param name: Name of the new alias + :type name: str + :param target: Hrn of the target node (ex : omf.plexus.wlab17) + :type target: str + + """ payload = ET.Element("omf-message") - alias = self.Mid(payload,"ALIAS") - sliceid = self.Mtext(alias,"SLICEID",self._slice_id) - expid = self.Mtext(alias,"EXPID",self._exp_id) - name = self.Mtext(alias,"NAME",name) - target = self.Mtext(alias,"TARGET",target) + alias = self._id_element(payload,"ALIAS") + sliceid = self._attr_element(alias,"SLICEID",self._slice_id) + expid = self._attr_element(alias,"EXPID",self._exp_id) + name = self._attr_element(alias,"NAME",name) + target = self._attr_element(alias,"TARGET",target) return payload - def enrollfunction(self, enrollkey, image, index, target ): + def enroll_function(self, enrollkey, image, index, target ): + """ Build an Enroll Message + + :param enrollkey: Type of enrollment (= 1) + :type enrollkey: str + :param image: Image (= * when all the nodes are concerned) + :type image: str + :param index: Index (= 1 in general) + :type index: str + :param target: Hrn of the target node (ex : omf.plexus.wlab17) + :type target: str + + """ payload = ET.Element("omf-message") - enroll = self.Mid(payload,"ENROLL") - enrollkey = self.Mtext(enroll,"ENROLLKEY",enrollkey) - sliceid = self.Mtext(enroll,"SLICEID",self._slice_id) - image = self.Mtext(enroll,"IMAGE",image) - expid = self.Mtext(enroll,"EXPID",self._exp_id) - index = self.Mtext(enroll,"INDEX",index) - target = self.Mtext(enroll,"TARGET",target) + enroll = self._id_element(payload,"ENROLL") + enrollkey = self._attr_element(enroll,"ENROLLKEY",enrollkey) + sliceid = self._attr_element(enroll,"SLICEID",self._slice_id) + image = self._attr_element(enroll,"IMAGE",image) + expid = self._attr_element(enroll,"EXPID",self._exp_id) + index = self._attr_element(enroll,"INDEX",index) + target = self._attr_element(enroll,"TARGET",target) return payload - def noopfunction(self,target): + def noop_function(self,target): + """ Build a Noop Message + + :param target: Hrn of the target node (ex : omf.plexus.wlab17) + :type target: str + + """ payload = ET.Element("omf-message") - noop = self.Mid(payload,"NOOP") - sliceid = self.Mtext(noop,"SLICEID",self._slice_id) - expid = self.Mtext(noop,"EXPID",self._exp_id) - target = self.Mtext(noop,"TARGET",target) + noop = self._id_element(payload,"NOOP") + sliceid = self._attr_element(noop,"SLICEID",self._slice_id) + expid = self._attr_element(noop,"EXPID",self._exp_id) + target = self._attr_element(noop,"TARGET",target) return payload - def newexpfunction(self, experimentid, address): + def newexp_function(self, experimentid, address): + """ Build a NewExp Message + + :param experimentid: Id of the new experiment + :type experimentid: str + :param address: Adress of the destination set of nodes + :type address: str + + """ payload = ET.Element("omf-message") - newexp = self.Mid(payload,"EXPERIMENT_NEW") - experimentid = self.Mtext(newexp,"EXPERIMENT_ID",experimentid) - sliceid = self.Mtext(newexp,"SLICEID",self._slice_id) - expid = self.Mtext(newexp,"EXPID",self._exp_id) - address = self.Mtext(newexp,"ADDRESS",address) + newexp = self._id_element(payload,"EXPERIMENT_NEW") + experimentid = self._attr_element(newexp,"EXPERIMENT_ID",experimentid) + sliceid = self._attr_element(newexp,"SLICEID",self._slice_id) + expid = self._attr_element(newexp,"EXPID",self._exp_id) + address = self._attr_element(newexp,"ADDRESS",address) return payload - def handle_message(self, msg): - # Do something!!! - return msg diff --git a/src/neco/resources/omf/omf_node.py b/src/neco/resources/omf/omf_node.py index becc4006..5bcdcdf1 100644 --- a/src/neco/resources/omf/omf_node.py +++ b/src/neco/resources/omf/omf_node.py @@ -1,6 +1,6 @@ #!/usr/bin/env python from neco.execution.resource import ResourceManager, clsinit -from neco.execution.attribute import Attribute +from neco.execution.attribute import Attribute, Flags from neco.resources.omf.omf_api import OMFAPIFactory @@ -35,21 +35,17 @@ class OMFNode(ResourceManager): hostname = Attribute("hostname", "Hostname of the machine") cpu = Attribute("cpu", "CPU of the node") ram = Attribute("ram", "RAM of the node") - # XXX: flags = "0x02" is not human readable. - # instead: - # from neco.execution.attribute import Attribute, Flags - # xmppSlice = Attribute("xmppSlice","Name of the slice", flags = Flags.Credential) - xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02") - xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02") - xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02") - xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02") + xmppSlice = Attribute("xmppSlice","Name of the slice", flags = Flags.Credential) + xmppHost = Attribute("xmppHost", "Xmpp Server",flags = Flags.Credential) + xmppPort = Attribute("xmppPort", "Xmpp Port",flags = Flags.Credential) + xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = Flags.Credential) cls._register_attribute(hostname) cls._register_attribute(ram) cls._register_attribute(cpu) cls._register_attribute(xmppSlice) cls._register_attribute(xmppHost) cls._register_attribute(xmppPort) - ls._register_attribute(xmppPassword) + cls._register_attribute(xmppPassword) @classmethod def _register_filters(cls): @@ -67,9 +63,7 @@ class OMFNode(ResourceManager): # XXX: We don't necessary need to have the credentials at the # moment we create the RM - # THE OMF API SHOULD BE CREATED ON THE DEPLOY METHOD, NOT NOW - # THIS FORCES MORE CONSTRAINES ON THE WAY WE WILL AUTHOMATE DEPLOYMENT! - def __init__(self, ec, guid, creds): + def __init__(self, ec, guid): """ :param ec: The Experiment controller :type ec: ExperimentController @@ -80,13 +74,8 @@ class OMFNode(ResourceManager): """ super(OMFNode, self).__init__(ec, guid) - self.set('xmppSlice', creds['xmppSlice']) - self.set('xmppHost', creds['xmppHost']) - self.set('xmppPort', creds['xmppPort']) - self.set('xmppPassword', creds['xmppPassword']) - # XXX: Lines should not be more than 80 characters! - self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) + self._omf_api = None self._logger = logging.getLogger("neco.omf.omfNode ") @@ -101,20 +90,30 @@ class OMFNode(ResourceManager): :rtype: Boolean """ - rm = self.ec.resource(guid) + rm = self.ec.get_resource(guid) if rm.rtype() in self._authorized_connections: - self._logger.debug("Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid)) + self._logger.debug("Connection between %s %s and %s %s accepted" % + (self.rtype(), self._guid, rm.rtype(), guid)) return True - self._logger.debug("Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid)) + self._logger.debug("Connection between %s %s and %s %s refused" % + (self.rtype(), self._guid, rm.rtype(), guid)) return False + def deploy(self): + """Deploy the RM + + """ + self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), + self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) + super(OMFNode, self).deploy() + def discover(self): """ Discover the availables nodes """ pass - def provision(self, credential): + def provision(self): """ Provision some availables nodes """ @@ -124,13 +123,23 @@ class OMFNode(ResourceManager): """Send Xmpp Message Using OMF protocol to enroll the node into the experiment """ + super(OMFNode, self).start() self._omf_api.enroll_host(self.get('hostname')) def stop(self): """Send Xmpp Message Using OMF protocol to disconnect the node """ - self._omf_api.disconnect() + super(OMFNode, self).stop() + + def release(self): + """Clean the RM at the end of the experiment + + """ + self._omf_api.release(self.get('hostname')) + OMFAPIFactory.release_api(self.get('xmppSlice'), + self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) + def configure(self): #routes = self.tc._add_route.get(self.guid, []) diff --git a/test/resources/omf/omf_vlc_exp.py b/test/resources/omf/omf_vlc_exp.py index 316e584f..9c990c4c 100755 --- a/test/resources/omf/omf_vlc_exp.py +++ b/test/resources/omf/omf_vlc_exp.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -from neco.execution.resource import ResourceFactory +from neco.execution.resource import ResourceFactory, ResourceManager, ResourceAction, ResourceState from neco.execution.ec import ExperimentController from neco.resources.omf.omf_node import OMFNode @@ -20,18 +20,13 @@ logging.basicConfig() class DummyEC(ExperimentController): pass -class OMFVLCTestCase(unittest.TestCase): +class DummyRM(ResourceManager): + pass - def setUp(self): - #self.guid_generator = guid.GuidGenerator() - self._creds = {'xmppSlice' : 'nepi' , 'xmppHost' : 'xmpp-plexus.onelab.eu' , 'xmppPort' : '5222', 'xmppPassword' : '1234' } - def tearDown(self): - pass +class OMFResourceFactoryTestCase(unittest.TestCase): def test_creation_phase(self): - ec = DummyEC() - ResourceFactory.register_type(OMFNode) ResourceFactory.register_type(OMFWifiInterface) ResourceFactory.register_type(OMFChannel) @@ -51,104 +46,149 @@ class OMFVLCTestCase(unittest.TestCase): self.assertEquals(len(ResourceFactory.resource_types()), 4) - #def xtest_creation_and_configuration_node(self): - guid = ec.register_resource("OMFNode", creds = self._creds) - node1 = ec._resources[guid] - node1.set('hostname', 'omf.plexus.wlab17') - - guid = ec.register_resource("OMFNode", creds = self._creds) - node2 = ec._resources[guid] - node2.set('hostname', "omf.plexus.wlab37") - - #def xtest_creation_and_configuration_interface(self): - guid = ec.register_resource("OMFWifiInterface", creds = self._creds) - iface1 = ec._resources[guid] - iface1.set('alias', "w0") - iface1.set('mode', "adhoc") - iface1.set('type', "g") - iface1.set('essid', "helloworld") - iface1.set('ip', "10.0.0.17") - - guid = ec.register_resource("OMFWifiInterface", creds = self._creds) - iface2 = ec._resources[guid] - iface2.set('alias', "w0") - iface2.set('mode', "adhoc") - iface2.set('type', 'g') - iface2.set('essid', "helloworld") - iface2.set('ip', "10.0.0.37") - - #def xtest_creation_and_configuration_channel(self): - guid = ec.register_resource("OMFChannel", creds = self._creds) - channel = ec._resources[guid] - channel.set('channel', "6") - - #def xtest_creation_and_configuration_application(self): - guid = ec.register_resource("OMFApplication", creds = self._creds) - app1 = ec._resources[guid] - app1.set('appid', 'Vlc#1') - app1.set('path', "/opt/vlc-1.1.13/cvlc") - app1.set('args', "/opt/10-by-p0d.avi --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts}'") - app1.set('env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority") - - guid = ec.register_resource("OMFApplication", creds = self._creds) - app2 = ec._resources[guid] - app2.set('appid', 'Vlc#2') - app2.set('path', "/opt/vlc-1.1.13/cvlc") - app2.set('args', "rtp://10.0.0.37:1234") - app2.set('env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority") - self.assertEquals(len(OMFAPIFactory._Api), 1) - - #def test_connection(self): - app1.connect(node1._guid) - node1.connect(app1._guid) - - node1.connect(iface1._guid) - iface1.connect(node1._guid) - - iface1.connect(channel._guid) - channel.connect(iface1._guid) - - channel.connect(iface2._guid) - iface2.connect(channel._guid) - - iface2.connect(node2._guid) - node2.connect(iface2._guid) - - node2.connect(app2._guid) - app2.connect(node2._guid) - - #def test_start_node(self): - node1.start() - node2.start() - time.sleep(1) - #pass - - #def test_start_interface(self): - iface1.start() - iface2.start() - - #def test_start_channel(self): - channel.start() - time.sleep(1) - - #def test_start_application(self): - app1.start() - time.sleep(2) - app2.start() - - time.sleep(10) - - #def test_stop_application(self): - app1.stop() - app2.stop() - time.sleep(2) - - - #def test_stop_nodes(self): - node1.stop() - #node2.stop() + +class OMFVLCTestCase(unittest.TestCase): + + def setUp(self): + self.ec = DummyEC() + ResourceFactory.register_type(OMFNode) + ResourceFactory.register_type(OMFWifiInterface) + ResourceFactory.register_type(OMFChannel) + ResourceFactory.register_type(OMFApplication) + + def tearDown(self): + self.ec.shutdown() + + def test_creation_and_configuration_node(self): + + node1 = self.ec.register_resource("OMFNode") + self.ec.set(node1, 'hostname', 'omf.plexus.wlab17') + self.ec.set(node1, 'xmppSlice', "nepi") + self.ec.set(node1, 'xmppHost', "xmpp-plexus.onelab.eu") + self.ec.set(node1, 'xmppPort', "5222") + self.ec.set(node1, 'xmppPassword', "1234") + + self.assertEquals(self.ec.get(node1, 'hostname'), 'omf.plexus.wlab17') + self.assertEquals(self.ec.get(node1, 'xmppSlice'), 'nepi') + self.assertEquals(self.ec.get(node1, 'xmppHost'), 'xmpp-plexus.onelab.eu') + self.assertEquals(self.ec.get(node1, 'xmppPort'), '5222') + self.assertEquals(self.ec.get(node1, 'xmppPassword'), '1234') + + def test_creation_and_configuration_interface(self): + + iface1 = self.ec.register_resource("OMFWifiInterface") + self.ec.set(iface1, 'alias', "w0") + self.ec.set(iface1, 'mode', "adhoc") + self.ec.set(iface1, 'type', "g") + self.ec.set(iface1, 'essid', "vlcexp") + self.ec.set(iface1, 'ip', "10.0.0.17") + self.ec.set(iface1, 'xmppSlice', "nepi") + self.ec.set(iface1, 'xmppHost', "xmpp-plexus.onelab.eu") + self.ec.set(iface1, 'xmppPort', "5222") + self.ec.set(iface1, 'xmppPassword', "1234") + + self.assertEquals(self.ec.get(iface1, 'alias'), 'w0') + self.assertEquals(self.ec.get(iface1, 'mode'), 'adhoc') + self.assertEquals(self.ec.get(iface1, 'type'), 'g') + self.assertEquals(self.ec.get(iface1, 'essid'), 'vlcexp') + self.assertEquals(self.ec.get(iface1, 'ip'), '10.0.0.17') + self.assertEquals(self.ec.get(iface1, 'xmppSlice'), 'nepi') + self.assertEquals(self.ec.get(iface1, 'xmppHost'), 'xmpp-plexus.onelab.eu') + self.assertEquals(self.ec.get(iface1, 'xmppPort'), '5222') + self.assertEquals(self.ec.get(iface1, 'xmppPassword'), '1234') + + def test_creation_and_configuration_channel(self): + + channel = self.ec.register_resource("OMFChannel") + self.ec.set(channel, 'channel', "6") + self.ec.set(channel, 'xmppSlice', "nepi") + self.ec.set(channel, 'xmppHost', "xmpp-plexus.onelab.eu") + self.ec.set(channel, 'xmppPort', "5222") + self.ec.set(channel, 'xmppPassword', "1234") + + self.assertEquals(self.ec.get(channel, 'channel'), '6') + self.assertEquals(self.ec.get(channel, 'xmppSlice'), 'nepi') + self.assertEquals(self.ec.get(channel, 'xmppHost'), 'xmpp-plexus.onelab.eu') + self.assertEquals(self.ec.get(channel, 'xmppPort'), '5222') + self.assertEquals(self.ec.get(channel, 'xmppPassword'), '1234') + + def test_creation_and_configuration_application(self): + + app1 = self.ec.register_resource("OMFApplication") + self.ec.set(app1, 'appid', 'Vlc#1') + self.ec.set(app1, 'path', "/opt/vlc-1.1.13/cvlc") + self.ec.set(app1, 'args', "/opt/10-by-p0d.avi --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts}'") + self.ec.set(app1, 'env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority") + self.ec.set(app1, 'xmppSlice', "nepi") + self.ec.set(app1, 'xmppHost', "xmpp-plexus.onelab.eu") + self.ec.set(app1, 'xmppPort', "5222") + self.ec.set(app1, 'xmppPassword', "1234") + + self.assertEquals(self.ec.get(app1, 'appid'), 'Vlc#1') + self.assertEquals(self.ec.get(app1, 'path'), '/opt/vlc-1.1.13/cvlc') + self.assertEquals(self.ec.get(app1, 'args'), "/opt/10-by-p0d.avi --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts}'") + self.assertEquals(self.ec.get(app1, 'env'), 'DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority') + self.assertEquals(self.ec.get(app1, 'xmppSlice'), 'nepi') + self.assertEquals(self.ec.get(app1, 'xmppHost'), 'xmpp-plexus.onelab.eu') + self.assertEquals(self.ec.get(app1, 'xmppPort'), '5222') + self.assertEquals(self.ec.get(app1, 'xmppPassword'), '1234') + + def test_connection(self): + + node1 = self.ec.register_resource("OMFNode") + iface1 = self.ec.register_resource("OMFWifiInterface") + channel = self.ec.register_resource("OMFChannel") + app1 = self.ec.register_resource("OMFApplication") + app2 = self.ec.register_resource("OMFApplication") + + self.ec.register_connection(app1, node1) + self.ec.register_connection(app2, node1) + self.ec.register_connection(node1, iface1) + self.ec.register_connection(iface1, channel) + + self.assertEquals(len(self.ec.get_resource(node1).connections), 3) + self.assertEquals(len(self.ec.get_resource(iface1).connections), 2) + self.assertEquals(len(self.ec.get_resource(channel).connections), 1) + self.assertEquals(len(self.ec.get_resource(app1).connections), 1) + self.assertEquals(len(self.ec.get_resource(app2).connections), 1) + + def test_condition(self): + + node1 = self.ec.register_resource("OMFNode") + iface1 = self.ec.register_resource("OMFWifiInterface") + channel = self.ec.register_resource("OMFChannel") + app1 = self.ec.register_resource("OMFApplication") + app2 = self.ec.register_resource("OMFApplication") + + self.ec.register_connection(app1, node1) + self.ec.register_connection(app2, node1) + self.ec.register_connection(node1, iface1) + self.ec.register_connection(iface1, channel) + + # For the moment + self.ec.register_condition([iface1, channel], ResourceAction.START, node1, ResourceState.STARTED , 2) + self.ec.register_condition(channel, ResourceAction.START, iface1, ResourceState.STARTED , 1) + self.ec.register_condition(app1, ResourceAction.START, channel, ResourceState.STARTED , 1) + + # Real test + self.ec.register_condition(app2, ResourceAction.START, app1, ResourceState.STARTED , 4) + + self.assertEquals(len(self.ec.get_resource(node1).conditions), 0) + self.assertEquals(len(self.ec.get_resource(iface1).conditions), 1) + self.assertEquals(len(self.ec.get_resource(channel).conditions), 1) + self.assertEquals(len(self.ec.get_resource(app1).conditions), 1) + + + def xtest_deploy(self): + ec.deploy() + + #In order to release everythings + time.sleep(45) + ec.shutdown() if __name__ == '__main__': unittest.main() + + -- 2.47.0