From dac2434143d573f76c14eac647645d478cfb4f80 Mon Sep 17 00:00:00 2001 From: Julien Tribino Date: Mon, 14 Apr 2014 11:18:24 +0200 Subject: [PATCH] First version of OMF6 working. Just problem of wifi driver are still there --- src/nepi/resources/omf/application.py | 154 +++++++++--- src/nepi/resources/omf/application6.py | 222 ------------------ src/nepi/resources/omf/channel.py | 53 ++++- src/nepi/resources/omf/interface.py | 152 +++++++++--- src/nepi/resources/omf/messages_6.py | 14 +- src/nepi/resources/omf/node.py | 52 ++-- src/nepi/resources/omf/node6.py | 134 ----------- .../resources/omf/{omf_api.py => omf5_api.py} | 104 +------- src/nepi/resources/omf/omf6_api.py | 163 +++---------- src/nepi/resources/omf/omf6_parser.py | 46 +++- src/nepi/resources/omf/omf6_resource.py | 5 +- src/nepi/resources/omf/omf_api_factory.py | 130 ++++++++++ src/nepi/resources/omf/omf_client.py | 10 +- src/nepi/resources/omf/omf_resource.py | 12 +- 14 files changed, 549 insertions(+), 702 deletions(-) delete mode 100644 src/nepi/resources/omf/application6.py delete mode 100644 src/nepi/resources/omf/node6.py rename src/nepi/resources/omf/{omf_api.py => omf5_api.py} (75%) create mode 100644 src/nepi/resources/omf/omf_api_factory.py diff --git a/src/nepi/resources/omf/application.py b/src/nepi/resources/omf/application.py index bc4d0e77..1ec831fd 100644 --- a/src/nepi/resources/omf/application.py +++ b/src/nepi/resources/omf/application.py @@ -18,12 +18,14 @@ # Author: Alina Quereilhac # Julien Tribino +import os + from nepi.execution.resource import ResourceManager, clsinit_copy, \ ResourceState, reschedule_delay from nepi.execution.attribute import Attribute, Flags from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource from nepi.resources.omf.node import OMFNode -from nepi.resources.omf.omf_api import OMFAPIFactory +from nepi.resources.omf.omf_api_factory import OMFAPIFactory from nepi.util import sshfuncs @@ -46,10 +48,11 @@ class OMFApplication(OMFResource): """ Register the attributes of an OMF application """ - appid = Attribute("appid", "Name of the application") - path = Attribute("path", "Path of the application") - args = Attribute("args", "Argument of the application") + command = Attribute("command", "Command to execute") env = Attribute("env", "Environnement variable of the application") + + # For OMF 5: + appid = Attribute("appid", "Name of the application") stdin = Attribute("stdin", "Input of the application", default = "") sources = Attribute("sources", "Sources of the application", flags = Flags.ExecReadOnly) @@ -57,9 +60,9 @@ class OMFApplication(OMFResource): flags = Flags.ExecReadOnly) sshkey = Attribute("sshKey", "key to use for ssh", flags = Flags.ExecReadOnly) + cls._register_attribute(appid) - cls._register_attribute(path) - cls._register_attribute(args) + cls._register_attribute(command) cls._register_attribute(env) cls._register_attribute(stdin) cls._register_attribute(sources) @@ -78,17 +81,27 @@ class OMFApplication(OMFResource): """ super(OMFApplication, self).__init__(ec, guid) + self.set('command', "") self.set('appid', "") - self.set('path', "") - self.set('args', "") + self.path= "" + self.args = "" self.set('env', "") self._node = None self._omf_api = None + self._topic_app = None + self.create_id = None + self.release_id = None self.add_set_hook() + def _init_command(self): + comm = self.get('command').split(' ') + self.path= comm[0] + if len(comm)>1: + self.args = ' '.join(comm[1:]) + @property def exp_id(self): return self.ec.exp_id @@ -153,55 +166,104 @@ class OMFApplication(OMFResource): It becomes DEPLOYED after getting the xmpp client. """ + if not self.node or self.node.state < ResourceState.READY: + self.debug("---- RESCHEDULING DEPLOY ---- node state %s " + % self.node.state ) + self.ec.schedule(reschedule_delay, self.deploy) + return + + self._init_command() - self.set('xmppSlice',self.node.get('xmppSlice')) - self.set('xmppHost',self.node.get('xmppHost')) + self.set('xmppUser',self.node.get('xmppUser')) + self.set('xmppServer',self.node.get('xmppServer')) self.set('xmppPort',self.node.get('xmppPort')) self.set('xmppPassword',self.node.get('xmppPassword')) + self.set('version',self.node.get('version')) - if not (self.get('xmppSlice') and self.get('xmppHost') - and self.get('xmppPort') and self.get('xmppPassword')): - msg = "Credentials are not initialzed. XMPP Connections impossible" + if not self.get('xmppServer'): + msg = "XmppServer is not initialzed. XMPP Connections impossible" self.error(msg) raise RuntimeError, msg - if not self._omf_api : - self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), - self.get('xmppHost'), self.get('xmppPort'), - self.get('xmppPassword'), exp_id = self.exp_id) + if not (self.get('xmppUser') or self.get('xmppPort') + or self.get('xmppPassword')): + msg = "Credentials are not all initialzed. Default values will be used" + self.warn(msg) - if self.get('sources'): - gateway = ResourceGateway.AMtoGateway[self.get('xmppHost')] - user = self.get('sshUser') or self.get('xmppSlice') - dst = user + "@"+ gateway + ":" - (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst) + if not self._omf_api : + self._omf_api = OMFAPIFactory.get_api(self.get('version'), + self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.exp_id) + + if self.get('version') == "5": + if self.get('sources'): + gateway = ResourceGateway.AMtoGateway[self.get('xmppServer')] + user = self.get('sshUser') or self.get('xmppUser') + dst = user + "@"+ gateway + ":" + (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst) + else : + # For OMF 6 : + if not self.create_id: + props = {} + if self.get('command'): + props['application:binary_path'] = self.get('command') + props['application:hrn'] = self.get('command') + props['application:membership'] = self._topic_app + props['application:type'] = "application" + + self.create_id = os.urandom(16).encode('hex') + self._omf_api.frcp_create( self.create_id, self.node.get('hostname'), "application", props = props) + + uid = self.check_deploy(self.create_id) + if not uid: + self.ec.schedule(reschedule_delay, self.deploy) + return + + self._topic_app = uid + self._omf_api.enroll_topic(self._topic_app) super(OMFApplication, self).do_deploy() + def check_deploy(self, cid): + uid = self._omf_api.check_mailbox("create", cid) + if uid : + return uid + return False + def do_start(self): """ Start the RM. It means : Send Xmpp Message Using OMF protocol to execute the application. It becomes STARTED before the messages are sent (for coordination) """ - if not (self.get('appid') and self.get('path')) : - msg = "Application's information are not initialized" + if not self.get('command') : + msg = "Application's Command is not initialized" self.error(msg) raise RuntimeError, msg - if not self.get('args'): - self.set('args', " ") if not self.get('env'): self.set('env', " ") - # Some information to check the information in parameter - msg = " " + self.get_rtype() + " ( Guid : " + str(self._guid) +") : " + \ - self.get('appid') + " : " + self.get('path') + " : " + \ - self.get('args') + " : " + self.get('env') - self.info(msg) + if self.get('version') == "5": + # Some information to check the command for OMF5 + msg = " " + self.get_rtype() + " ( Guid : " + str(self._guid) +") : " + \ + self.get('appid') + " : " + self.path + " : " + \ + self.args + " : " + self.get('env') + self.debug(msg) + + self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \ + self.get('args'), self.get('path'), self.get('env')) + else: + #For OMF 6 + props = {} + props['state'] = "running" + + guards = {} + guards['type'] = "application" + guards['name'] = self.get('command') + + self._omf_api.frcp_configure(self._topic_app, props = props, guards = guards ) - self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \ - self.get('args'), self.get('path'), self.get('env')) super(OMFApplication, self).do_start() @@ -211,18 +273,34 @@ class OMFApplication(OMFResource): State is set to STOPPED after the message is sent. """ - - self._omf_api.exit(self.node.get('hostname'),self.get('appid')) + if self.get('version') == 5: + self._omf_api.exit(self.node.get('hostname'),self.get('appid')) super(OMFApplication, self).do_stop() + def check_release(self, cid): + res = self._omf_api.check_mailbox("release", cid) + if res : + return res + return False + def do_release(self): """ Clean the RM at the end of the experiment and release the API. """ + if self.get('version') == "6": + if not self.release_id: + self.release_id = os.urandom(16).encode('hex') + self._omf_api.frcp_release( self.release_id, self.node.get('hostname'),self._topic_app, res_id=self._topic_app) + + cid = self.check_release(self.release_id) + if not cid: + self.ec.schedule(reschedule_delay, self.release) + return + if self._omf_api: - OMFAPIFactory.release_api(self.get('xmppSlice'), - self.get('xmppHost'), self.get('xmppPort'), - self.get('xmppPassword'), exp_id = self.exp_id) + OMFAPIFactory.release_api(self.get('version'), + self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.exp_id) super(OMFApplication, self).do_release() diff --git a/src/nepi/resources/omf/application6.py b/src/nepi/resources/omf/application6.py deleted file mode 100644 index 549c030f..00000000 --- a/src/nepi/resources/omf/application6.py +++ /dev/null @@ -1,222 +0,0 @@ -# -# NEPI, a framework to manage network experiments -# Copyright (C) 2013 INRIA -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# -# Author: Alina Quereilhac -# Julien Tribino - -from nepi.execution.resource import ResourceManager, clsinit_copy, \ - ResourceState, reschedule_delay -from nepi.execution.attribute import Attribute, Flags -from nepi.resources.omf.omf6_resource import OMF6Resource -from nepi.resources.omf.node6 import OMF6Node -from nepi.resources.omf.omf6_api import OMF6APIFactory - -import os, time -from nepi.util import sshfuncs - -@clsinit_copy -class OMF6Application(OMF6Resource): - """ - .. class:: Class Args : - - :param ec: The Experiment controller - :type ec: ExperimentController - :param guid: guid of the RM - :type guid: int - - """ - _rtype = "OMF6Application" - _authorized_connections = ["OMF6Node"] - - @classmethod - def _register_attributes(cls): - """ Register the attributes of an OMF application - - """ - command = Attribute("command", "Command to execute") - env = Attribute("env", "Environnement variable of the application") -# sources = Attribute("sources", "Sources of the application", -# flags = Flags.ExecReadOnly) -# sshuser = Attribute("sshUser", "user to connect with ssh", -# flags = Flags.ExecReadOnly) -# sshkey = Attribute("sshKey", "key to use for ssh", -# flags = Flags.ExecReadOnly) - cls._register_attribute(command) - cls._register_attribute(env) -# cls._register_attribute(sources) -# cls._register_attribute(sshuser) -# cls._register_attribute(sshkey) - - def __init__(self, ec, guid): - """ - :param ec: The Experiment controller - :type ec: ExperimentController - :param guid: guid of the RM - :type guid: int - :param creds: Credentials to communicate with the rm (XmppClient for OMF) - :type creds: dict - - """ - super(OMF6Application, self).__init__(ec, guid) - - self.set('command', "") - self.set('env', "") - - self._node = None - self._topic_app = None - - self._omf_api = None - - @property - def exp_id(self): - return self.ec.exp_id - - @property - def node(self): - rm_list = self.get_connected(OMF6Node.get_rtype()) - if rm_list: return rm_list[0] - return None - - def valid_connection(self, guid): - """ Check if the connection with the guid in parameter is possible. - Only meaningful connections are allowed. - - :param guid: Guid of RM it will be connected - :type guid: int - :rtype: Boolean - - """ - rm = self.ec.get_resource(guid) - if rm.get_rtype() not in self._authorized_connections: - msg = ("Connection between %s %s and %s %s refused: " - "An Application can be connected only to a Node" ) % \ - (self.get_rtype(), self._guid, rm.get_rtype(), guid) - self.debug(msg) - - return False - - elif len(self.connections) != 0 : - msg = ("Connection between %s %s and %s %s refused: " - "This Application is already connected" ) % \ - (self.get_rtype(), self._guid, rm.get_rtype(), guid) - self.debug(msg) - - return False - - else : - msg = "Connection between %s %s and %s %s accepted" % ( - self.get_rtype(), self._guid, rm.get_rtype(), guid) - self.debug(msg) - - return True - - def do_deploy(self): - """ Deploy the RM. It means nothing special for an application - for now (later it will be upload sources, ...) - It becomes DEPLOYED after getting the xmpp client. - - """ - - self.set('xmppUser',self.node.get('xmppUser')) - self.set('xmppHost',self.node.get('xmppHost')) - self.set('xmppPort',self.node.get('xmppPort')) - self.set('xmppPassword',self.node.get('xmppPassword')) - - if not (self.get('xmppUser') and self.get('xmppHost') - and self.get('xmppPort') and self.get('xmppPassword')): - msg = "Credentials are not initialzed. XMPP Connections impossible" - self.error(msg) - raise RuntimeError, msg - - if not self._omf_api : - self._omf_api = OMF6APIFactory.get_api(self.get('xmppHost'), - self.get('xmppUser'), self.get('xmppPort'), - self.get('xmppPassword'), exp_id = self.exp_id) - -# if self.get('sources'): -# gateway = ResourceGateway.AMtoGateway[self.get('xmppHost')] -# user = self.get('sshUser') or self.get('xmppSlice') -# dst = user + "@"+ gateway + ":" -# (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst) - - self._topic_app = self.node.get('hostname') +'_'+ str(self.guid) +'_app' - - self._omf_api.enroll_topic(self._topic_app) - - props = {} - if self.get('command'): - props['application:binary_path'] = self.get('command') - props['application:hrn'] = self.get('command') - props['application:membership'] = self._topic_app - props['application:type'] = "application" - self._omf_api.frcp_create( self.node.get('hostname'), "application", props = props) - - - - super(OMF6Application, self).do_deploy() - - def do_start(self): - """ Start the RM. It means : Send Xmpp Message Using OMF protocol - to execute the application. - It becomes STARTED before the messages are sent (for coordination) - - """ - if not self.get('command') : - msg = "Application's Command is not initialized" - self.error(msg) - raise RuntimeError, msg - - if not self.get('env'): - self.set('env', " ") - - props = {} - props['state'] = "running" - - guards = {} - guards['type'] = "application" - guards['name'] = self.get('command') - time.sleep(2) - self._omf_api.frcp_configure(self._topic_app, props = props, guards = guards ) - - - super(OMF6Application, self).do_start() - - def do_stop(self): - """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to - kill the application. - State is set to STOPPED after the message is sent. - - """ - - super(OMF6Application, self).do_stop() - - def do_release(self): - """ Clean the RM at the end of the experiment and release the API. - - """ - props = {} - props['frcp:type'] = "application" - - self._omf_api.frcp_release(self.node.get('hostname'),self._topic_app, props = props ) - - if self._omf_api: - OMF6APIFactory.release_api(self.get('xmppHost'), - self.get('xmppUser'), self.get('xmppPort'), - self.get('xmppPassword'), exp_id = self.exp_id) - - super(OMF6Application, self).do_release() - diff --git a/src/nepi/resources/omf/channel.py b/src/nepi/resources/omf/channel.py index 6a0b02b2..fd083a70 100644 --- a/src/nepi/resources/omf/channel.py +++ b/src/nepi/resources/omf/channel.py @@ -23,7 +23,7 @@ from nepi.execution.resource import ResourceManager, clsinit_copy, \ from nepi.execution.attribute import Attribute, Flags from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource -from nepi.resources.omf.omf_api import OMFAPIFactory +from nepi.resources.omf.omf_api_factory import OMFAPIFactory @clsinit_copy @@ -42,6 +42,22 @@ class OMFChannel(OMFResource): _rtype = "OMFChannel" _authorized_connections = ["OMFWifiInterface", "OMFNode"] + ChannelToFreq = dict({ + "1" : "2412", + "2" : "2417", + "3" : "2422", + "4" : "2427", + "5" : "2432", + "6" : "2437", + "7" : "2442", + "8" : "2447", + "9" : "2452", + "10" : "2457", + "11" : "2462", + "12" : "2467", + "13" : "2472", + }) + @classmethod def _register_attributes(cls): """Register the attributes of an OMF channel @@ -61,6 +77,7 @@ class OMFChannel(OMFResource): super(OMFChannel, self).__init__(ec, guid) self._nodes_guid = list() + self.frequency = None self._omf_api = None @@ -78,17 +95,14 @@ class OMFChannel(OMFResource): """ rm = self.ec.get_resource(guid) - if rm.get_rtype() in self._authorized_connections: msg = "Connection between %s %s and %s %s accepted" % ( self.get_rtype(), self._guid, rm.get_rtype(), guid) self.debug(msg) return True - msg = "Connection between %s %s and %s %s refused" % ( self.get_rtype(), self._guid, rm.get_rtype(), guid) self.debug(msg) - return False def _get_target(self, conn_set): @@ -115,18 +129,31 @@ class OMFChannel(OMFResource): res.append(couple) return res + def get_frequency(self, channel): + return OMFChannel.ChannelToFreq[channel] + def do_deploy(self): """ Deploy the RM. It means : Get the xmpp client and send messages using OMF 5.4 protocol to configure the channel. It becomes DEPLOYED after sending messages to configure the channel - """ - if not (self.get('xmppSlice') and self.get('xmppHost') - and self.get('xmppPort') and self.get('xmppPassword')): - msg = "Credentials are not initialzed. XMPP Connections impossible" + """ + if self.get('version') == "6": + self.frequency = self.get_frequency(self.get('channel')) + super(OMFChannel, self).do_deploy() + return + + + if not self.get('xmppServer'): + msg = "XmppServer is not initialzed. XMPP Connections impossible" self.error(msg) raise RuntimeError, msg + if not (self.get('xmppUser') or self.get('xmppPort') + or self.get('xmppPassword')): + msg = "Credentials are not all initialzed. Default values will be used" + self.warn(msg) + if not self._omf_api : self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), @@ -139,6 +166,8 @@ class OMFChannel(OMFResource): self._nodes_guid = self._get_target(self._connections) + + if self._nodes_guid == "reschedule" : self.ec.schedule("2s", self.deploy) else: @@ -147,16 +176,16 @@ class OMFChannel(OMFResource): attrname = "net/%s/%s" % (couple[1], 'channel') self._omf_api.configure(couple[0], attrname, attrval) - super(OMFChannel, self).do_deploy() + super(OMFChannel, self).do_deploy() def do_release(self): """ Clean the RM at the end of the experiment and release the API """ if self._omf_api : - OMFAPIFactory.release_api(self.get('xmppSlice'), - self.get('xmppHost'), self.get('xmppPort'), - self.get('xmppPassword'), exp_id = self.exp_id) + OMFAPIFactory.release_api(self.get('version'), + self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.exp_id) super(OMFChannel, self).do_release() diff --git a/src/nepi/resources/omf/interface.py b/src/nepi/resources/omf/interface.py index 1fcd59b9..7926fb60 100644 --- a/src/nepi/resources/omf/interface.py +++ b/src/nepi/resources/omf/interface.py @@ -18,6 +18,7 @@ # Author: Alina Quereilhac # Julien Tribino +import os, time from nepi.execution.resource import ResourceManager, clsinit_copy, \ ResourceState, reschedule_delay from nepi.execution.attribute import Attribute, Flags @@ -25,7 +26,7 @@ from nepi.execution.attribute import Attribute, Flags from nepi.resources.omf.node import OMFNode from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource from nepi.resources.omf.channel import OMFChannel -from nepi.resources.omf.omf_api import OMFAPIFactory +from nepi.resources.omf.omf_api_factory import OMFAPIFactory @clsinit_copy class OMFWifiInterface(OMFResource): @@ -49,13 +50,17 @@ class OMFWifiInterface(OMFResource): """ alias = Attribute("alias","Alias of the interface", default = "w0") + type = Attribute("type","Choose between : a, b, g, n") + name = Attribute("name","Alias of the interface", default = "wlan0") mode = Attribute("mode","Mode of the interface") - type = Attribute("type","Type of the interface") + hw_mode = Attribute("hw_mode","Choose between : a, b, g, n") essid = Attribute("essid","Essid of the interface") ip = Attribute("ip","IP of the interface") cls._register_attribute(alias) - cls._register_attribute(mode) cls._register_attribute(type) + cls._register_attribute(name) + cls._register_attribute(mode) + cls._register_attribute(hw_mode) cls._register_attribute(essid) cls._register_attribute(ip) @@ -73,9 +78,15 @@ class OMFWifiInterface(OMFResource): self._conf = False - self._omf_api = None self._alias = self.get('alias') + self.create_id = None + self.release_id = None + self._topic_iface = None + self._omf_api = None + self._type = "" + + def valid_connection(self, guid): """ Check if the connection with the guid in parameter is possible. Only meaningful connections are allowed. @@ -90,13 +101,11 @@ class OMFWifiInterface(OMFResource): msg = "Connection between %s %s and %s %s accepted" % \ (self.get_rtype(), self._guid, rm.get_rtype(), guid) self.debug(msg) - return True msg = "Connection between %s %s and %s %s refused" % \ (self.get_rtype(), self._guid, rm.get_rtype(), guid) self.debug(msg) - return False @property @@ -144,63 +153,136 @@ class OMFWifiInterface(OMFResource): attrname = "net/%s/%s" % (self._alias, "ip") self._omf_api.configure(self.node.get('hostname'), attrname, attrval) - return True + + def configure_on_omf5(self): + # Just for information +# self.debug(" " + self.get_rtype() + " ( Guid : " + str(self._guid) +") : " + \ +# self.get('mode') + " : " + self.get('type') + " : " + \ +# self.get('essid') + " : " + self.get('ip')) + if self.state < ResourceState.PROVISIONED: + if self._conf == False: + self._conf = self.configure_iface() + if self._conf == True: + self.configure_ip() + + + def do_deploy(self): """ Deploy the RM. It means : Get the xmpp client and send messages using OMF 5.4 protocol to configure the interface. It becomes DEPLOYED after sending messages to configure the interface """ - self.set('xmppSlice',self.node.get('xmppSlice')) - self.set('xmppHost',self.node.get('xmppHost')) + if not self.node or self.node.state < ResourceState.READY: + self.debug("---- RESCHEDULING DEPLOY ---- node state %s " + % self.node.state ) + self.ec.schedule(reschedule_delay, self.deploy) + return + + if not self.channel or self.channel.state < ResourceState.READY: + self.debug("---- RESCHEDULING DEPLOY ---- channel state %s " + % self.channel.state ) + self.ec.schedule(reschedule_delay, self.deploy) + return + + self.set('xmppUser',self.node.get('xmppUser')) + self.set('xmppServer',self.node.get('xmppServer')) self.set('xmppPort',self.node.get('xmppPort')) self.set('xmppPassword',self.node.get('xmppPassword')) + self.set('version',self.node.get('version')) - if not (self.get('xmppSlice') and self.get('xmppHost') - and self.get('xmppPort') and self.get('xmppPassword')): - msg = "Credentials are not initialzed. XMPP Connections impossible" + if not self.get('xmppServer'): + msg = "XmppServer is not initialzed. XMPP Connections impossible" self.error(msg) raise RuntimeError, msg + if not (self.get('xmppUser') or self.get('xmppPort') + or self.get('xmppPassword')): + msg = "Credentials are not all initialzed. Default values will be used" + self.warn(msg) + if not self._omf_api : - self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), - self.get('xmppHost'), self.get('xmppPort'), - self.get('xmppPassword'), exp_id = self.exp_id) + self._omf_api = OMFAPIFactory.get_api(self.get('version'), + self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.exp_id) - if not (self.get('mode') and self.get('type') and self.get('essid') \ - and self.get('ip')): + if not (self.get('name') and self.get('mode') and self.get('essid') \ + and self.get('hw_mode') and self.get('ip')): msg = "Interface's variable are not initialized" self.error(msg) raise RuntimeError, msg - if not self.node.get('hostname') : - msg = "The channel is connected with an undefined node" - self.error(msg) - raise RuntimeError, msg + self.set('type',self.get('hw_mode')) - # Just for information - self.debug(" " + self.get_rtype() + " ( Guid : " + str(self._guid) +") : " + \ - self.get('mode') + " : " + self.get('type') + " : " + \ - self.get('essid') + " : " + self.get('ip')) - - # Check if the node is already deployed - if self.state < ResourceState.PROVISIONED: - if self._conf == False: - self._conf = self.configure_iface() - if self._conf == True: - self.configure_ip() + if self.get('version') == "5": + self.configure_on_omf5() + else : + self.configure_on_omf6() super(OMFWifiInterface, self).do_deploy() + def configure_on_omf6(self): + if not self.create_id : + props = {} + props['wlan:if_name'] = self.get('name') + props['wlan:mode'] = { + "mode": self.get('mode'), + "hw_mode" : self.get('hw_mode'), + "channel" : self.channel.get('channel'), + "essid" : self.get('essid'), + "ip_addr" : self.get('ip'), + "frequency" : self.channel.frequency, + "phy" : "%0%" + } + props['wlan:hrn'] = self.get('name') + props['wlan:type'] = "wlan" + + self.create_id = os.urandom(16).encode('hex') + self._omf_api.frcp_create( self.create_id, self.node.get('hostname'), "wlan", props = props) + + self.check_deploy(self.create_id) + self._omf_api.enroll_topic(self._topic_iface) + + def check_deploy(self, cid): + delay = 1.0 + for i in xrange(10): + uid = self._omf_api.check_mailbox("create", cid) + if uid: + self._topic_iface = uid + break + else: + time.sleep(delay) + delay = delay * 1.5 + else: + msg = "Couldn't retrieve the confirmation of the creation" + self.error(msg) + raise RuntimeError, msg + + def check_release(self, cid): + res = self._omf_api.check_mailbox("release", cid) + if res : + return res + return False + def do_release(self): """ Clean the RM at the end of the experiment and release the API """ + if self.get('version') == "6": + if not self.release_id: + self.release_id = os.urandom(16).encode('hex') + self._omf_api.frcp_release( self.release_id, self.node.get('hostname'),self._topic_iface, res_id=self._topic_iface) + + cid = self.check_release(self.release_id) + if not cid: + self.ec.schedule(reschedule_delay, self.release) + return + if self._omf_api: - OMFAPIFactory.release_api(self.get('xmppSlice'), - self.get('xmppHost'), self.get('xmppPort'), - self.get('xmppPassword'), exp_id = self.exp_id) + OMFAPIFactory.release_api(self.get('version'), + self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.exp_id) super(OMFWifiInterface, self).do_release() diff --git a/src/nepi/resources/omf/messages_6.py b/src/nepi/resources/omf/messages_6.py index 3a024b45..282e03e8 100644 --- a/src/nepi/resources/omf/messages_6.py +++ b/src/nepi/resources/omf/messages_6.py @@ -96,11 +96,19 @@ class MessageHandler(): if rtype == "application" : properties = self._id_element(payload,"props","xmlns:application", "http://schema.mytestbed.net/omf/6.0/protocol/application") + elif rtype == "wlan" : + properties = self._id_element(payload,"props","xmlns:wlan", + "http://schema.mytestbed.net/omf/6.0/protocol/wlan") else: properties = self._attr_element(payload,"props","") for prop in props.keys(): - self._attr_element(properties,prop,props[prop],type_key="type", type_value = "string") + if isinstance(props[prop],str): + self._attr_element(properties,prop,props[prop],type_key="type", type_value = "string") + elif isinstance(props[prop],dict): + key = self._attr_element(properties,prop,"",type_key="type", type_value = "hash") + for comp in props[prop].keys(): + self._attr_element(key,comp,props[prop][comp],type_key="type", type_value = "string") if guards : guardians = self._attr_element(payload,"guard","") @@ -120,7 +128,7 @@ class MessageHandler(): properties = self._attr_element(payload,"props","") for prop in props.keys(): self._attr_element(properties,prop,props[prop],type_key="type", type_value = "symbol") - + if guards : guardians = self._attr_element(payload,"guard","") for guard in guards.keys(): @@ -167,7 +175,7 @@ class MessageHandler(): self._attr_element(payload,"src",src) self._attr_element(payload,"ts",timestamp) if res_id : - self._attr_element(payload,"res_id",timestamp) + self._attr_element(payload,"res_id",res_id) if props : properties = self._id_element(payload,"props","xmlns:frcp", diff --git a/src/nepi/resources/omf/node.py b/src/nepi/resources/omf/node.py index de00a07e..6658f625 100644 --- a/src/nepi/resources/omf/node.py +++ b/src/nepi/resources/omf/node.py @@ -22,7 +22,7 @@ from nepi.execution.resource import ResourceManager, clsinit_copy, \ ResourceState, reschedule_delay from nepi.execution.attribute import Attribute, Flags from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource -from nepi.resources.omf.omf_api import OMFAPIFactory +from nepi.resources.omf.omf_api_factory import OMFAPIFactory import time @@ -83,12 +83,11 @@ class OMFNode(OMFResource): msg = "Connection between %s %s and %s %s accepted" % ( self.get_rtype(), self._guid, rm.get_rtype(), guid) self.debug(msg) - return True msg = "Connection between %s %s and %s %s refused" % ( self.get_rtype(), self._guid, rm.get_rtype(), guid) - self.debug(msg) + self.error(msg) return False @@ -98,23 +97,35 @@ class OMFNode(OMFResource): It becomes DEPLOYED after sending messages to enroll the node """ - if not (self.get('xmppSlice') and self.get('xmppHost') - and self.get('xmppPort') and self.get('xmppPassword')): - msg = "Credentials are not initialzed. XMPP Connections impossible" + if not self.get('xmppServer'): + msg = "XmppServer is not initialzed. XMPP Connections impossible" + self.error(msg) + raise RuntimeError, msg + + if not self.get('version'): + msg = "Version of OMF is not indicated" self.error(msg) raise RuntimeError, msg + if not (self.get('xmppUser') or self.get('xmppPort') + or self.get('xmppPassword')): + msg = "Credentials are not all initialzed. Default values will be used" + self.warn(msg) + if not self._omf_api : - self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), - self.get('xmppHost'), self.get('xmppPort'), - self.get('xmppPassword'), exp_id = self.exp_id) + self._omf_api = OMFAPIFactory.get_api(self.get('version'), + self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.exp_id) if not self.get('hostname') : msg = "Hostname's value is not initialized" self.error(msg) raise RuntimeError, msg - self._omf_api.enroll_host(self.get('hostname')) + if self.get('version') == "5": + self._omf_api.enroll_host(self.get('hostname')) + else: + self._omf_api.enroll_topic(self.get('hostname')) super(OMFNode, self).do_deploy() @@ -122,12 +133,23 @@ class OMFNode(OMFResource): """ Clean the RM at the end of the experiment """ - if self._omf_api: - self._omf_api.release(self.get('hostname')) + from nepi.resources.omf.application import OMFApplication + rm_list = self.get_connected(OMFApplication.get_rtype()) + if rm_list: + for rm in rm_list: + if rm.state < ResourceState.RELEASED: + self.ec.schedule(reschedule_delay, self.release) + return - OMFAPIFactory.release_api(self.get('xmppSlice'), - self.get('xmppHost'), self.get('xmppPort'), - self.get('xmppPassword'), exp_id = self.exp_id) + if self._omf_api: + if self.get('version') == "5": + self._omf_api.release(self.get('hostname')) + else: + self._omf_api.unenroll_topic(self.get('hostname')) + + OMFAPIFactory.release_api(self.get('version'), + self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.exp_id) super(OMFNode, self).do_release() diff --git a/src/nepi/resources/omf/node6.py b/src/nepi/resources/omf/node6.py deleted file mode 100644 index 6f8a1441..00000000 --- a/src/nepi/resources/omf/node6.py +++ /dev/null @@ -1,134 +0,0 @@ -# -# NEPI, a framework to manage network experiments -# Copyright (C) 2013 INRIA -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# -# Author: Alina Quereilhac -# Julien Tribino - -from nepi.execution.resource import ResourceManager, clsinit_copy, \ - ResourceState, reschedule_delay -from nepi.execution.attribute import Attribute, Flags -from nepi.resources.omf.omf6_resource import OMF6Resource -from nepi.resources.omf.omf6_api import OMF6APIFactory - -import time - -@clsinit_copy -class OMF6Node(OMF6Resource): - """ - .. class:: Class Args : - - :param ec: The Experiment controller - :type ec: ExperimentController - :param guid: guid of the RM - :type guid: int - :param creds: Credentials to communicate with the rm (XmppClient for OMF) - :type creds: dict - - """ - _rtype = "OMF6Node" - _authorized_connections = ["OMF6Application" , "OMFWifiInterface"] - - @classmethod - def _register_attributes(cls): - """Register the attributes of an OMF Node - - """ - hostname = Attribute("hostname", "Hostname of the machine") - - cls._register_attribute(hostname) - - # XXX: We don't necessary need to have the credentials at the - # moment we create the RM - def __init__(self, ec, guid): - """ - :param ec: The Experiment controller - :type ec: ExperimentController - :param guid: guid of the RM - :type guid: int - - """ - super(OMF6Node, self).__init__(ec, guid) - - self._omf_api = None - - @property - def exp_id(self): - return self.ec.exp_id - - def valid_connection(self, guid): - """ Check if the connection with the guid in parameter is possible. - Only meaningful connections are allowed. - - :param guid: Guid of the current RM - :type guid: int - :rtype: Boolean - - """ - rm = self.ec.get_resource(guid) - if rm.get_rtype() in self._authorized_connections: - msg = "Connection between %s %s and %s %s accepted" % ( - self.get_rtype(), self._guid, rm.get_rtype(), guid) - self.debug(msg) - - return True - - msg = "Connection between %s %s and %s %s refused" % ( - self.get_rtype(), self._guid, rm.get_rtype(), guid) - self.debug(msg) - - return False - - def do_deploy(self): - """ Deploy the RM. It means : Send Xmpp Message Using OMF protocol - to enroll the node into the experiment. - It becomes DEPLOYED after sending messages to enroll the node - - """ - if not (self.get('xmppUser') and self.get('xmppHost') - and self.get('xmppPort') and self.get('xmppPassword')): - msg = "Credentials are not initialzed. XMPP Connections impossible" - self.error(msg) - raise RuntimeError, msg - - if not self._omf_api : - self._omf_api = OMF6APIFactory.get_api(self.get('xmppHost'), - self.get('xmppUser'), self.get('xmppPort'), - self.get('xmppPassword'), exp_id = self.exp_id) - - if not self.get('hostname') : - msg = "Hostname's value is not initialized" - self.error(msg) - raise RuntimeError, msg - - self._omf_api.enroll_topic(self.get('hostname')) - - super(OMF6Node, self).do_deploy() - - def do_release(self): - """ Clean the RM at the end of the experiment - - """ - if self._omf_api: - # Should be deleted from the RC - #self._omf_api.frcp_release(self.get('hostname')) - - OMF6APIFactory.release_api(self.get('xmppHost'), - self.get('xmppUser'), self.get('xmppPort'), - self.get('xmppPassword'), exp_id = self.exp_id) - - super(OMF6Node, self).do_release() - diff --git a/src/nepi/resources/omf/omf_api.py b/src/nepi/resources/omf/omf5_api.py similarity index 75% rename from src/nepi/resources/omf/omf_api.py rename to src/nepi/resources/omf/omf5_api.py index 5def17b9..e01a7546 100644 --- a/src/nepi/resources/omf/omf_api.py +++ b/src/nepi/resources/omf/omf5_api.py @@ -21,15 +21,13 @@ import ssl import sys import time -import hashlib -import threading from nepi.util.logger import Logger from nepi.resources.omf.omf_client import OMFClient from nepi.resources.omf.messages_5_4 import MessageHandler -class OMFAPI(Logger): +class OMF5API(Logger): """ .. class:: Class Args : @@ -313,103 +311,3 @@ class OMFAPI(Logger): msg = " Disconnected from XMPP Server" self.debug(msg) - -class OMFAPIFactory(object): - """ - .. note:: - - It allows the different RM to use the same xmpp client if they use - the same credentials. For the moment, it is focused on XMPP. - - """ - # use lock to avoid concurrent access to the Api list at the same times by 2 - # different threads - lock = threading.Lock() - _apis = dict() - - @classmethod - def get_api(cls, slice, host, port, password, exp_id = None): - """ Get an OMF Api - - :param slice: Xmpp Slice Name - :type slice: str - :param host: Xmpp Server Adress - :type host: str - :param port: Xmpp Port (Default : 5222) - :type port: str - :param password: Xmpp Password - :type password: str - - """ - if slice and host and port and password: - key = cls._make_key(slice, host, port, password, exp_id) - cls.lock.acquire() - if key in cls._apis: - #print "Api Counter : " + str(cls._apis[key]['cnt']) - cls._apis[key]['cnt'] += 1 - cls.lock.release() - return cls._apis[key]['api'] - else : - omf_api = cls.create_api(slice, host, port, password, exp_id) - cls.lock.release() - return omf_api - return None - - @classmethod - def create_api(cls, slice, host, port, password, exp_id): - """ Create an OMF API if this one doesn't exist yet with this credentials - - :param slice: Xmpp Slice Name - :type slice: str - :param host: Xmpp Server Adress - :type host: str - :param port: Xmpp Port (Default : 5222) - :type port: str - :param password: Xmpp Password - :type password: str - - """ - omf_api = OMFAPI(slice, host, port, password, exp_id = exp_id) - key = cls._make_key(slice, host, port, password, exp_id) - cls._apis[key] = {} - cls._apis[key]['api'] = omf_api - cls._apis[key]['cnt'] = 1 - return omf_api - - @classmethod - def release_api(cls, slice, host, port, password, exp_id = None): - """ Release an OMF API with this credentials - - :param slice: Xmpp Slice Name - :type slice: str - :param host: Xmpp Server Adress - :type host: str - :param port: Xmpp Port (Default : 5222) - :type port: str - :param password: Xmpp Password - :type password: str - - """ - if slice and host and port and password: - key = cls._make_key(slice, host, port, password, exp_id) - if key in cls._apis: - cls._apis[key]['cnt'] -= 1 - #print "Api Counter : " + str(cls._apis[key]['cnt']) - if cls._apis[key]['cnt'] == 0: - omf_api = cls._apis[key]['api'] - omf_api.disconnect() - - - @classmethod - def _make_key(cls, *args): - """ Hash the credentials in order to create a key - - :param args: list of arguments used to create the hash (user, host, port, ...) - :type args: list of args - - """ - skey = "".join(map(str, args)) - return hashlib.md5(skey).hexdigest() - - - diff --git a/src/nepi/resources/omf/omf6_api.py b/src/nepi/resources/omf/omf6_api.py index 5ec13af0..c2ecb498 100644 --- a/src/nepi/resources/omf/omf6_api.py +++ b/src/nepi/resources/omf/omf6_api.py @@ -21,8 +21,6 @@ import ssl import sys import time -import hashlib -import threading from nepi.util.timefuncs import tsformat import os @@ -38,8 +36,8 @@ class OMF6API(Logger): :param slice: Xmpp Slice :type slice: str - :param host: Xmpp Server - :type host: str + :param server: Xmpp Server + :type server: str :param port: Xmpp Port :type port: str :param password: Xmpp password @@ -54,14 +52,14 @@ class OMF6API(Logger): instead of OMF used for OMF5.3 """ - def __init__(self, host, user = "nepi", port="5222", password="1234", + def __init__(self, server, user = "nepi", port="5222", password="1234", exp_id = None): """ :param slice: Xmpp Slice :type slice: str - :param host: Xmpp Server - :type host: str + :param server: Xmpp Server + :type server: str :param port: Xmpp Port :type port: str :param password: Xmpp password @@ -73,10 +71,10 @@ class OMF6API(Logger): super(OMF6API, self).__init__("OMF6API") self._exp_id = exp_id self._user = user # name of the machine that run Nepi - self._host = host # name of the xmpp server + self._server = server # name of the xmpp server self._port = port # port of the xmpp server self._password = password # password to connect to xmpp - self._jid = "%s-%s@%s" % (self._user, self._exp_id, self._host) + self._jid = "%s-%s@%s" % (self._user, self._exp_id, self._server) self._src = "xmpp://" + self._jid self._topics = [] @@ -106,7 +104,7 @@ class OMF6API(Logger): # PROTOCOL_SSLv3 required for compatibility with OpenFire xmpp.ssl_version = ssl.PROTOCOL_SSLv3 - if xmpp.connect((self._host, self._port)): + if xmpp.connect((self._server, self._port)): xmpp.process(block=False) self.check_ready(xmpp) self._client = xmpp @@ -144,7 +142,7 @@ class OMF6API(Logger): self._client.subscribe(nepi_topic) - def enroll_topic(self, topic): + def create_and_enroll_topic(self, topic): """ Create and Subscribe to the session topic and the resources corresponding to the hostname @@ -157,13 +155,22 @@ class OMF6API(Logger): self._topics.append(topic) -# try : self._client.create(topic) -# except: -# msg = "Topic already existing" -# self.info(msg) self._client.subscribe(topic) + + def enroll_topic(self, topic): + """ Create and Subscribe to the session topic and the resources + corresponding to the hostname + + """ + if topic in self._topics: + return + + self._topics.append(topic) + self._client.subscribe(topic) + + def frcp_inform(self, topic, cid, itype): """ Configure attribute on the node @@ -184,11 +191,10 @@ class OMF6API(Logger): self._client.publish(payload, topic) - def frcp_create(self, topic, rtype, props = None, guards = None ): + def frcp_create(self, msg_id, topic, rtype, props = None, guards = None ): """ Send to the stdin of the application the value """ - msg_id = os.urandom(16).encode('hex') timestamp = tsformat() payload = self._message.create_function(msg_id, self._src, rtype, timestamp , props = props ,guards = guards) self._client.publish(payload, topic) @@ -203,11 +209,10 @@ class OMF6API(Logger): payload = self._message.request_function(msg_id, self._src, timestamp, props = props ,guards = guards) self._client.publish(payload, xmpp_node) - def frcp_release(self, parent, child, res_id = None, props = None, guards = None ): + def frcp_release(self, msg_id, parent, child, res_id = None, props = None, guards = None ): """ Delete the session and logger topics. Then disconnect """ - msg_id = os.urandom(16).encode('hex') timestamp = tsformat() payload = self._message.release_function(msg_id, self._src, timestamp, res_id = res_id, props = props ,guards = guards) self._client.publish(payload, parent) @@ -215,119 +220,29 @@ class OMF6API(Logger): if child in self._topics: self._topics.remove(child) - self._client.delete(child) + self._client.unsubscribe(child) + #self._client.delete(child) + + def check_mailbox(self, itype, attr): + return self._client.check_mailbox(itype, attr) + + def unenroll_topic(self, topic): + """ Create and Subscribe to the session topic and the resources + corresponding to the hostname + + """ + if topic in self._topics: + self._topics.remove(topic) + self._client.unsubscribe(topic) def disconnect(self) : """ Delete the session and logger topics. Then disconnect """ self._client.delete(self._nepi_topic) - - #XXX Why there is a sleep there ? - time.sleep(1) - + # Wait the send queue to be empty before disconnect self._client.disconnect(wait=True) msg = " Disconnected from XMPP Server" self.debug(msg) - -class OMF6APIFactory(object): - """ - .. note:: - - It allows the different RM to use the same xmpp client if they use - the same credentials. For the moment, it is focused on XMPP. - - """ - # use lock to avoid concurrent access to the Api list at the same times by 2 - # different threads - lock = threading.Lock() - _apis = dict() - - @classmethod - def get_api(cls, host, user, port, password, exp_id = None): - """ Get an OMF Api - - :param slice: Xmpp Slice Name - :type slice: str - :param host: Xmpp Server Adress - :type host: str - :param port: Xmpp Port (Default : 5222) - :type port: str - :param password: Xmpp Password - :type password: str - - """ - if host and user and port and password: - key = cls._make_key(host, user, port, password, exp_id) - cls.lock.acquire() - if key in cls._apis: - #print "Api Counter : " + str(cls._apis[key]['cnt']) - cls._apis[key]['cnt'] += 1 - cls.lock.release() - return cls._apis[key]['api'] - else : - omf_api = cls.create_api(host, user, port, password, exp_id) - cls.lock.release() - return omf_api - return None - - @classmethod - def create_api(cls, host, user, port, password, exp_id): - """ Create an OMF API if this one doesn't exist yet with this credentials - - :param slice: Xmpp Slice Name - :type slice: str - :param host: Xmpp Server Adress - :type host: str - :param port: Xmpp Port (Default : 5222) - :type port: str - :param password: Xmpp Password - :type password: str - - """ - omf_api = OMF6API(host, user = user, port = port, password = password, exp_id = exp_id) - key = cls._make_key(host, user, port, password, exp_id) - cls._apis[key] = {} - cls._apis[key]['api'] = omf_api - cls._apis[key]['cnt'] = 1 - return omf_api - - @classmethod - def release_api(cls, host, user, port, password, exp_id = None): - """ Release an OMF API with this credentials - - :param slice: Xmpp Slice Name - :type slice: str - :param host: Xmpp Server Adress - :type host: str - :param port: Xmpp Port (Default : 5222) - :type port: str - :param password: Xmpp Password - :type password: str - - """ - if host and user and port and password: - key = cls._make_key(host, user, port, password, exp_id) - if key in cls._apis: - cls._apis[key]['cnt'] -= 1 - #print "Api Counter : " + str(cls._apis[key]['cnt']) - if cls._apis[key]['cnt'] == 0: - omf_api = cls._apis[key]['api'] - omf_api.disconnect() - - - @classmethod - def _make_key(cls, *args): - """ Hash the credentials in order to create a key - - :param args: list of arguments used to create the hash (user, host, port, ...) - :type args: list of args - - """ - skey = "".join(map(str, args)) - return hashlib.md5(skey).hexdigest() - - - diff --git a/src/nepi/resources/omf/omf6_parser.py b/src/nepi/resources/omf/omf6_parser.py index 9029ce26..41420292 100644 --- a/src/nepi/resources/omf/omf6_parser.py +++ b/src/nepi/resources/omf/omf6_parser.py @@ -50,8 +50,16 @@ class OMF6Parser(Logger): """ super(OMF6Parser, self).__init__("OMF6API") + self.mailbox={} - + self.init_mailbox() + + def init_mailbox(self): + self.mailbox['create'] = [] + self.mailbox['configure'] = [] + self.mailbox['request'] = [] + self.mailbox['release'] = [] + self.mailbox['inform'] = [] def _check_for_tag(self, root, namespaces, tag): """ Check if an element markup is in the ElementTree @@ -125,7 +133,9 @@ class OMF6Parser(Logger): def _inform_creation_ok(self, root, namespaces): + #ET.dump(root) uid = self._check_for_tag(root, namespaces, "uid") + cid = self._check_for_tag(root, namespaces, "cid") member = self._check_for_membership(root, namespaces) binary_path = self._check_for_tag(root, namespaces, "binary_path") msg = "CREATION OK -- " @@ -135,12 +145,17 @@ class OMF6Parser(Logger): msg = msg + "' is listening to the topics : '"+ uid if member : msg = msg + "' and '"+ member +"'" - self.info(msg) + if cid: + self.info(msg) + self.mailbox['create'].append([cid, uid ]) def _inform_creation_failed(self, root, namespaces): reason = self._check_for_tag(root, namespaces, "reason") + cid = self._check_for_tag(root, namespaces, "cid") msg = "CREATION FAILED - The reason : "+reason - self.error(msg) + if cid: + self.error(msg) + self.mailbox['create'].append([cid, uid ]) def _inform_status(self, root, namespaces): props = self._check_for_props(root, namespaces) @@ -155,11 +170,15 @@ class OMF6Parser(Logger): self.info(msg) def _inform_released(self, root, namespaces): + #ET.dump(root) parent_id = self._check_for_tag(root, namespaces, "src") child_id = self._check_for_tag(root, namespaces, "res_id") - msg = "RELEASED - The resource : '"+res_id+ \ - "' has been released by : '"+ src - self.info(msg) + cid = self._check_for_tag(root, namespaces, "cid") + if cid : + msg = "RELEASED - The resource : '"+child_id+ \ + "' has been released by : '"+ parent_id + self.info(msg) + self.mailbox['release'].append(cid) def _inform_error(self, root, namespaces): reason = self._check_for_tag(root, namespaces, "reason") @@ -192,6 +211,21 @@ class OMF6Parser(Logger): return + def check_mailbox(self, itype, attr): + if itype == "create": + for res in self.mailbox[itype]: + binary, uid = res + if binary == attr: + self.mailbox[itype].remove(res) + return uid + elif itype == "release": + for res in self.mailbox[itype]: + if attr == res: + self.mailbox[itype].remove(res) + return res + + + def handle(self, iq): namespaces = "{http://schema.mytestbed.net/omf/6.0/protocol}" for i in iq['pubsub_event']['items']: diff --git a/src/nepi/resources/omf/omf6_resource.py b/src/nepi/resources/omf/omf6_resource.py index c92f86d4..c88d049c 100644 --- a/src/nepi/resources/omf/omf6_resource.py +++ b/src/nepi/resources/omf/omf6_resource.py @@ -34,18 +34,21 @@ class OMF6Resource(ResourceManager): @classmethod def _register_attributes(cls): - xmppHost = Attribute("xmppHost", "Xmpp Server", + xmppHost = Attribute("xmppServer", "Xmpp Server", flags = Flags.Credential) xmppUser = Attribute("xmppUser", "Xmpp User") xmppPort = Attribute("xmppPort", "Xmpp Port", flags = Flags.Credential) xmppPassword = Attribute("xmppPassword", "Xmpp Password", flags = Flags.Credential) + version = Attribute("xmppPassword", "Xmpp Password", + flags = Flags.Credential) cls._register_attribute(xmppHost) cls._register_attribute(xmppUser) cls._register_attribute(xmppPort) cls._register_attribute(xmppPassword) + cls._register_attribute(version) def __init__(self, ec, guid): super(OMF6Resource, self).__init__(ec, guid) diff --git a/src/nepi/resources/omf/omf_api_factory.py b/src/nepi/resources/omf/omf_api_factory.py new file mode 100644 index 00000000..00a20f9e --- /dev/null +++ b/src/nepi/resources/omf/omf_api_factory.py @@ -0,0 +1,130 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac +# Julien Tribino + + +import time +import hashlib +import threading + +from nepi.resources.omf.omf5_api import OMF5API +from nepi.resources.omf.omf6_api import OMF6API + +class OMFAPIFactory(object): + """ + .. note:: + + It allows the different RM to use the same xmpp client if they use + the same credentials. For the moment, it is focused on XMPP. + + """ + # use lock to avoid concurrent access to the Api list at the same times by 2 + # different threads + lock = threading.Lock() + _apis = dict() + + @classmethod + def get_api(cls, version, server, user, port, password, exp_id = None): + """ Get an OMF Api + + :param slice: Xmpp Slice Name + :type slice: str + :param server: Xmpp Server Adress + :type server: str + :param port: Xmpp Port (Default : 5222) + :type port: str + :param password: Xmpp Password + :type password: str + + """ + if version and user and server and port and password: + key = cls._make_key(version, server, user, port, password, exp_id) + cls.lock.acquire() + if key in cls._apis: + #print "Api Counter : " + str(cls._apis[key]['cnt']) + cls._apis[key]['cnt'] += 1 + cls.lock.release() + return cls._apis[key]['api'] + else : + omf_api = cls.create_api(version, server, user, port, password, exp_id) + cls.lock.release() + return omf_api + return None + + @classmethod + def create_api(cls, version, server, user, port, password, exp_id): + """ Create an OMF API if this one doesn't exist yet with this credentials + + :param slice: Xmpp Slice Name + :type slice: str + :param server: Xmpp Server Adress + :type server: str + :param port: Xmpp Port (Default : 5222) + :type port: str + :param password: Xmpp Password + :type password: str + + """ + key = cls._make_key(version, server, user, port, password, exp_id) + if version == "5": + omf_api = OMF5API(server, user, port, password, exp_id = exp_id) + else : + omf_api = OMF6API(server, user = user, port = port, password = password, exp_id = exp_id) + cls._apis[key] = {} + cls._apis[key]['api'] = omf_api + cls._apis[key]['cnt'] = 1 + return omf_api + + @classmethod + def release_api(cls, version, server, user, port, password, exp_id = None): + """ Release an OMF API with this credentials + + :param slice: Xmpp Slice Name + :type slice: str + :param server: Xmpp Server Adress + :type server: str + :param port: Xmpp Port (Default : 5222) + :type port: str + :param password: Xmpp Password + :type password: str + + """ + if version and user and server and port and password: + key = cls._make_key(version, server, user, port, password, exp_id) + if key in cls._apis: + cls._apis[key]['cnt'] -= 1 + #print "Api Counter : " + str(cls._apis[key]['cnt']) + if cls._apis[key]['cnt'] == 0: + omf_api = cls._apis[key]['api'] + omf_api.disconnect() + + + @classmethod + def _make_key(cls, *args): + """ Hash the credentials in order to create a key + + :param args: list of arguments used to create the hash (server, user, port, ...) + :type args: list of args + + """ + skey = "".join(map(str, args)) + return hashlib.md5(skey).hexdigest() + + + diff --git a/src/nepi/resources/omf/omf_client.py b/src/nepi/resources/omf/omf_client.py index ea53f60d..87635e5e 100644 --- a/src/nepi/resources/omf/omf_client.py +++ b/src/nepi/resources/omf/omf_client.py @@ -73,8 +73,6 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger): self._server = None self._parser = None - - self.register_plugin('xep_0077') # In-band registration self.register_plugin('xep_0030') self.register_plugin('xep_0059') @@ -222,8 +220,9 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger): msg = ' Deleted node: %s' % node self.info(msg) except: - error = traceback.format_exc() - msg = ' Could not delete topic: %s\ntraceback:\n%s' % (node, error) + #error = traceback.format_exc() + #msg = ' Could not delete topic: %s\ntraceback:\n%s' % (node, error) + msg = 'Could not delete the topic : '+node+' . Maybe It is not the owner of the topic' self.error(msg) def publish(self, data, node): @@ -334,6 +333,9 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger): % (self.boundjid.bare, node, error) self.error(msg) + def check_mailbox(self, itype, attr): + return self._parser.check_mailbox(itype, attr) + def handle_omf_message(self, iq): """ Handle published/received message diff --git a/src/nepi/resources/omf/omf_resource.py b/src/nepi/resources/omf/omf_resource.py index f1bb40cc..faec4656 100644 --- a/src/nepi/resources/omf/omf_resource.py +++ b/src/nepi/resources/omf/omf_resource.py @@ -34,7 +34,6 @@ class ResourceGateway: "wilabt" : "ops.wilab2.ilabt.iminds.be", "nitos" : "nitlab.inf.uth.gr", "nicta" : "??.??.??", - }) AMtoGateway = dict({ @@ -54,19 +53,22 @@ class OMFResource(ResourceManager): @classmethod def _register_attributes(cls): - xmppSlice = Attribute("xmppSlice","Name of the slice", + xmppServer = Attribute("xmppServer", "Xmpp Server", flags = Flags.Credential) - xmppHost = Attribute("xmppHost", "Xmpp Server", + xmppUser = Attribute("xmppUser","Name of the Xmpp User/Slice", flags = Flags.Credential) xmppPort = Attribute("xmppPort", "Xmpp Port", flags = Flags.Credential) xmppPassword = Attribute("xmppPassword", "Xmpp Password", flags = Flags.Credential) + version = Attribute("version", "Version of OMF : Either 5 or 6", + default = "6", ) - cls._register_attribute(xmppSlice) - cls._register_attribute(xmppHost) + cls._register_attribute(xmppUser) + cls._register_attribute(xmppServer) cls._register_attribute(xmppPort) cls._register_attribute(xmppPassword) + cls._register_attribute(version) def __init__(self, ec, guid): super(OMFResource, self).__init__(ec, guid) -- 2.43.0