From: Julien Tribino Date: Fri, 7 Mar 2014 09:22:14 +0000 (+0100) Subject: update OMF 6 and test it. It works once but not twice X-Git-Tag: nepi-3.1.0~30^2~3 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=c05f5499a6212c1ad45199bbf1f2044aa2404022;p=nepi.git update OMF 6 and test it. It works once but not twice --- diff --git a/src/nepi/resources/omf/application6.py b/src/nepi/resources/omf/application6.py new file mode 100644 index 00000000..549c030f --- /dev/null +++ b/src/nepi/resources/omf/application6.py @@ -0,0 +1,222 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac +# Julien Tribino + +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState, reschedule_delay +from nepi.execution.attribute import Attribute, Flags +from nepi.resources.omf.omf6_resource import OMF6Resource +from nepi.resources.omf.node6 import OMF6Node +from nepi.resources.omf.omf6_api import OMF6APIFactory + +import os, time +from nepi.util import sshfuncs + +@clsinit_copy +class OMF6Application(OMF6Resource): + """ + .. class:: Class Args : + + :param ec: The Experiment controller + :type ec: ExperimentController + :param guid: guid of the RM + :type guid: int + + """ + _rtype = "OMF6Application" + _authorized_connections = ["OMF6Node"] + + @classmethod + def _register_attributes(cls): + """ Register the attributes of an OMF application + + """ + command = Attribute("command", "Command to execute") + env = Attribute("env", "Environnement variable of the application") +# sources = Attribute("sources", "Sources of the application", +# flags = Flags.ExecReadOnly) +# sshuser = Attribute("sshUser", "user to connect with ssh", +# flags = Flags.ExecReadOnly) +# sshkey = Attribute("sshKey", "key to use for ssh", +# flags = Flags.ExecReadOnly) + cls._register_attribute(command) + cls._register_attribute(env) +# cls._register_attribute(sources) +# cls._register_attribute(sshuser) +# cls._register_attribute(sshkey) + + def __init__(self, ec, guid): + """ + :param ec: The Experiment controller + :type ec: ExperimentController + :param guid: guid of the RM + :type guid: int + :param creds: Credentials to communicate with the rm (XmppClient for OMF) + :type creds: dict + + """ + super(OMF6Application, self).__init__(ec, guid) + + self.set('command', "") + self.set('env', "") + + self._node = None + self._topic_app = None + + self._omf_api = None + + @property + def exp_id(self): + return self.ec.exp_id + + @property + def node(self): + rm_list = self.get_connected(OMF6Node.get_rtype()) + if rm_list: return rm_list[0] + return None + + def valid_connection(self, guid): + """ Check if the connection with the guid in parameter is possible. + Only meaningful connections are allowed. + + :param guid: Guid of RM it will be connected + :type guid: int + :rtype: Boolean + + """ + rm = self.ec.get_resource(guid) + if rm.get_rtype() not in self._authorized_connections: + msg = ("Connection between %s %s and %s %s refused: " + "An Application can be connected only to a Node" ) % \ + (self.get_rtype(), self._guid, rm.get_rtype(), guid) + self.debug(msg) + + return False + + elif len(self.connections) != 0 : + msg = ("Connection between %s %s and %s %s refused: " + "This Application is already connected" ) % \ + (self.get_rtype(), self._guid, rm.get_rtype(), guid) + self.debug(msg) + + return False + + else : + msg = "Connection between %s %s and %s %s accepted" % ( + self.get_rtype(), self._guid, rm.get_rtype(), guid) + self.debug(msg) + + return True + + def do_deploy(self): + """ Deploy the RM. It means nothing special for an application + for now (later it will be upload sources, ...) + It becomes DEPLOYED after getting the xmpp client. + + """ + + self.set('xmppUser',self.node.get('xmppUser')) + self.set('xmppHost',self.node.get('xmppHost')) + self.set('xmppPort',self.node.get('xmppPort')) + self.set('xmppPassword',self.node.get('xmppPassword')) + + if not (self.get('xmppUser') and self.get('xmppHost') + and self.get('xmppPort') and self.get('xmppPassword')): + msg = "Credentials are not initialzed. XMPP Connections impossible" + self.error(msg) + raise RuntimeError, msg + + if not self._omf_api : + self._omf_api = OMF6APIFactory.get_api(self.get('xmppHost'), + self.get('xmppUser'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.exp_id) + +# if self.get('sources'): +# gateway = ResourceGateway.AMtoGateway[self.get('xmppHost')] +# user = self.get('sshUser') or self.get('xmppSlice') +# dst = user + "@"+ gateway + ":" +# (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst) + + self._topic_app = self.node.get('hostname') +'_'+ str(self.guid) +'_app' + + self._omf_api.enroll_topic(self._topic_app) + + props = {} + if self.get('command'): + props['application:binary_path'] = self.get('command') + props['application:hrn'] = self.get('command') + props['application:membership'] = self._topic_app + props['application:type'] = "application" + self._omf_api.frcp_create( self.node.get('hostname'), "application", props = props) + + + + super(OMF6Application, self).do_deploy() + + def do_start(self): + """ Start the RM. It means : Send Xmpp Message Using OMF protocol + to execute the application. + It becomes STARTED before the messages are sent (for coordination) + + """ + if not self.get('command') : + msg = "Application's Command is not initialized" + self.error(msg) + raise RuntimeError, msg + + if not self.get('env'): + self.set('env', " ") + + props = {} + props['state'] = "running" + + guards = {} + guards['type'] = "application" + guards['name'] = self.get('command') + time.sleep(2) + self._omf_api.frcp_configure(self._topic_app, props = props, guards = guards ) + + + super(OMF6Application, self).do_start() + + def do_stop(self): + """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to + kill the application. + State is set to STOPPED after the message is sent. + + """ + + super(OMF6Application, self).do_stop() + + def do_release(self): + """ Clean the RM at the end of the experiment and release the API. + + """ + props = {} + props['frcp:type'] = "application" + + self._omf_api.frcp_release(self.node.get('hostname'),self._topic_app, props = props ) + + if self._omf_api: + OMF6APIFactory.release_api(self.get('xmppHost'), + self.get('xmppUser'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.exp_id) + + super(OMF6Application, self).do_release() + diff --git a/src/nepi/resources/omf/messages_6.py b/src/nepi/resources/omf/messages_6.py new file mode 100644 index 00000000..3a024b45 --- /dev/null +++ b/src/nepi/resources/omf/messages_6.py @@ -0,0 +1,184 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac +# Julien Tribino + +from xml.etree import cElementTree as ET + +class MessageHandler(): + """ + .. class:: Class Args : + + :param sliceid: Slice Name (= Xmpp Slice) + :type expid: str + :param expid: Experiment ID (= Xmpp User) + :type expid: str + + .. note:: + + This class is used only for OMF 5.4 Protocol and is going to become unused + + """ + + def __init__(self): + """ + + """ + pass + + def _type_element(self, type_elt, xmlns, msg_id): + """ Insert a markup element with an id + + """ + elt = ET.Element(type_elt) + elt.set("xmlns", xmlns) + elt.set("mid", msg_id) + return elt + + + + def _attr_element(self, parent, markup, text, type_key=None, type_value = None): + """ Insert a markup element with a text (value) + + :param parent: Parent element in an XML point of view + :type parent: ElementTree Element + :param markup: Name of the markup + :type markup: str + :param text: Value of the markup element + :type text: str + + """ + elt = ET.SubElement(parent, markup) + if type_key and type_value: + elt.set(type_key, type_value) + elt.text = text + return elt + + def _id_element(self, parent, markup, key, value): + """ Insert a markup element with a text (value) + + :param parent: Parent element in an XML point of view + :type parent: ElementTree Element + :param markup: Name of the markup + :type markup: str + :param text: Value of the markup element + :type text: str + + """ + elt = ET.SubElement(parent, markup) + elt.set(key, value) + return elt + + def create_function(self, msg_id, src, rtype, timestamp, props = None, guards = None): + """ Build a create message + """ + payload = self._type_element("create", "http://schema.mytestbed.net/omf/6.0/protocol", msg_id ) + self._attr_element(payload,"src",src) + self._attr_element(payload,"ts",timestamp) + self._attr_element(payload,"rtype",rtype) + + if props : + if rtype == "application" : + properties = self._id_element(payload,"props","xmlns:application", + "http://schema.mytestbed.net/omf/6.0/protocol/application") + else: + properties = self._attr_element(payload,"props","") + + for prop in props.keys(): + self._attr_element(properties,prop,props[prop],type_key="type", type_value = "string") + + if guards : + guardians = self._attr_element(payload,"guard","") + for guard in guards.keys(): + self._attr_element(guardians,guard,guards[guard],type_key="type", type_value = "string") + + return payload + + def configure_function(self, msg_id, src, timestamp, props = None, guards = None): + """ Build a configure message + """ + payload = self._type_element("configure", "http://schema.mytestbed.net/omf/6.0/protocol", msg_id ) + self._attr_element(payload,"src",src) + self._attr_element(payload,"ts",timestamp) + + if props : + properties = self._attr_element(payload,"props","") + for prop in props.keys(): + self._attr_element(properties,prop,props[prop],type_key="type", type_value = "symbol") + + if guards : + guardians = self._attr_element(payload,"guard","") + for guard in guards.keys(): + self._attr_element(guardians,guard,guards[guard],type_key="type", type_value = "string") + + return payload + + def request_function(self, msg_id, src, timestamp, props = None, guards = None): + """ Build a request message + + """ + payload = self._type_element("request", "http://schema.mytestbed.net/omf/6.0/protocol", msg_id ) + self._attr_element(payload,"src",src) + self._attr_element(payload,"ts",timestamp) + + if props : + properties = self._attr_element(payload,"props","") + for prop in props.keys(): + self._attr_element(properties,prop,props[prop]) + + if guards : + guardians = self._attr_element(payload,"guard","") + for guard in guards.keys(): + self._attr_element(guardians,guard,guards[guard]) + return payload + + def inform_function(self, msg_id, src, timestamp, cid, itype): + """ Build an inform message + + """ + payload = self._type_element("inform", "http://schema.mytestbed.net/omf/6.0/protocol", msg_id ) + sliceid = self._attr_element(payload,"src",src) + expid = self._attr_element(config,"ts",timestamp) + target = self._attr_element(config,"cid",cid) + value = self._attr_element(config,"itype",value) + path = self._attr_element(config,"properties",path) + return payload + + def release_function(self, msg_id, src, timestamp, res_id = None, props = None, guards = None): + """ Build a release message + + """ + payload = self._type_element("release", "http://schema.mytestbed.net/omf/6.0/protocol", msg_id ) + self._attr_element(payload,"src",src) + self._attr_element(payload,"ts",timestamp) + if res_id : + self._attr_element(payload,"res_id",timestamp) + + if props : + properties = self._id_element(payload,"props","xmlns:frcp", + "http://schema.mytestbed.net/omf/6.0/protocol") + for prop in props.keys(): + self._attr_element(properties,prop,props[prop]) + + if guards : + guardians = self._attr_element(payload,"guard","") + for guard in guards.keys(): + self._attr_element(guardians,guard,guards[guard]) + + return payload + diff --git a/src/nepi/resources/omf/node6.py b/src/nepi/resources/omf/node6.py new file mode 100644 index 00000000..6f8a1441 --- /dev/null +++ b/src/nepi/resources/omf/node6.py @@ -0,0 +1,134 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac +# Julien Tribino + +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState, reschedule_delay +from nepi.execution.attribute import Attribute, Flags +from nepi.resources.omf.omf6_resource import OMF6Resource +from nepi.resources.omf.omf6_api import OMF6APIFactory + +import time + +@clsinit_copy +class OMF6Node(OMF6Resource): + """ + .. class:: Class Args : + + :param ec: The Experiment controller + :type ec: ExperimentController + :param guid: guid of the RM + :type guid: int + :param creds: Credentials to communicate with the rm (XmppClient for OMF) + :type creds: dict + + """ + _rtype = "OMF6Node" + _authorized_connections = ["OMF6Application" , "OMFWifiInterface"] + + @classmethod + def _register_attributes(cls): + """Register the attributes of an OMF Node + + """ + hostname = Attribute("hostname", "Hostname of the machine") + + cls._register_attribute(hostname) + + # XXX: We don't necessary need to have the credentials at the + # moment we create the RM + def __init__(self, ec, guid): + """ + :param ec: The Experiment controller + :type ec: ExperimentController + :param guid: guid of the RM + :type guid: int + + """ + super(OMF6Node, self).__init__(ec, guid) + + self._omf_api = None + + @property + def exp_id(self): + return self.ec.exp_id + + def valid_connection(self, guid): + """ Check if the connection with the guid in parameter is possible. + Only meaningful connections are allowed. + + :param guid: Guid of the current RM + :type guid: int + :rtype: Boolean + + """ + rm = self.ec.get_resource(guid) + if rm.get_rtype() in self._authorized_connections: + msg = "Connection between %s %s and %s %s accepted" % ( + self.get_rtype(), self._guid, rm.get_rtype(), guid) + self.debug(msg) + + return True + + msg = "Connection between %s %s and %s %s refused" % ( + self.get_rtype(), self._guid, rm.get_rtype(), guid) + self.debug(msg) + + return False + + def do_deploy(self): + """ Deploy the RM. It means : Send Xmpp Message Using OMF protocol + to enroll the node into the experiment. + It becomes DEPLOYED after sending messages to enroll the node + + """ + if not (self.get('xmppUser') and self.get('xmppHost') + and self.get('xmppPort') and self.get('xmppPassword')): + msg = "Credentials are not initialzed. XMPP Connections impossible" + self.error(msg) + raise RuntimeError, msg + + if not self._omf_api : + self._omf_api = OMF6APIFactory.get_api(self.get('xmppHost'), + self.get('xmppUser'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.exp_id) + + if not self.get('hostname') : + msg = "Hostname's value is not initialized" + self.error(msg) + raise RuntimeError, msg + + self._omf_api.enroll_topic(self.get('hostname')) + + super(OMF6Node, self).do_deploy() + + def do_release(self): + """ Clean the RM at the end of the experiment + + """ + if self._omf_api: + # Should be deleted from the RC + #self._omf_api.frcp_release(self.get('hostname')) + + OMF6APIFactory.release_api(self.get('xmppHost'), + self.get('xmppUser'), self.get('xmppPort'), + self.get('xmppPassword'), exp_id = self.exp_id) + + super(OMF6Node, self).do_release() + diff --git a/src/nepi/resources/omf/omf6_api.py b/src/nepi/resources/omf/omf6_api.py new file mode 100644 index 00000000..5ec13af0 --- /dev/null +++ b/src/nepi/resources/omf/omf6_api.py @@ -0,0 +1,333 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac +# Julien Tribino + +import ssl +import sys +import time +import hashlib +import threading + +from nepi.util.timefuncs import tsformat +import os + +from nepi.util.logger import Logger + +from nepi.resources.omf.omf_client import OMFClient +from nepi.resources.omf.messages_6 import MessageHandler + +class OMF6API(Logger): + """ + .. class:: Class Args : + + :param slice: Xmpp Slice + :type slice: str + :param host: Xmpp Server + :type host: str + :param port: Xmpp Port + :type port: str + :param password: Xmpp password + :type password: str + :param xmpp_root: Root of the Xmpp Topic Architecture + :type xmpp_root: str + + .. note:: + + This class is the implementation of an OMF 5.4 API. + Since the version 5.4.1, the Topic Architecture start with OMF_5.4 + instead of OMF used for OMF5.3 + + """ + def __init__(self, host, user = "nepi", port="5222", password="1234", + exp_id = None): + """ + + :param slice: Xmpp Slice + :type slice: str + :param host: Xmpp Server + :type host: str + :param port: Xmpp Port + :type port: str + :param password: Xmpp password + :type password: str + :param xmpp_root: Root of the Xmpp Topic Architecture + :type xmpp_root: str + + """ + super(OMF6API, self).__init__("OMF6API") + self._exp_id = exp_id + self._user = user # name of the machine that run Nepi + self._host = host # name of the xmpp server + self._port = port # port of the xmpp server + self._password = password # password to connect to xmpp + self._jid = "%s-%s@%s" % (self._user, self._exp_id, self._host) + self._src = "xmpp://" + self._jid + + self._topics = [] + + # OMF xmpp client + self._client = None + + # message handler + self._message = None + + if sys.version_info < (3, 0): + reload(sys) + sys.setdefaultencoding('utf8') + + # instantiate the xmpp client + self._init_client() + + # register nepi topic + self._enroll_nepi() + + + def _init_client(self): + """ Initialize XMPP Client + + """ + xmpp = OMFClient(self._jid, self._password) + # PROTOCOL_SSLv3 required for compatibility with OpenFire + xmpp.ssl_version = ssl.PROTOCOL_SSLv3 + + if xmpp.connect((self._host, self._port)): + xmpp.process(block=False) + self.check_ready(xmpp) + self._client = xmpp + self._message = MessageHandler() + else: + msg = "Unable to connect to the XMPP server." + self.error(msg) + raise RuntimeError(msg) + + def check_ready(self, xmpp): + delay = 1.0 + for i in xrange(4): + if xmpp.ready: + break + else: + time.sleep(delay) + delay = delay * 1.5 + else: + msg = "XMPP Client is not ready after long time" + self.error(msg, out, err) + raise RuntimeError, msg + + @property + def _nepi_topic(self): + msg = "nepi-" + self._exp_id + self.debug(msg) + return msg + + def _enroll_nepi(self): + """ Create and Subscribe to the Session Topic + + """ + nepi_topic = self._nepi_topic + self._client.create(nepi_topic) + self._client.subscribe(nepi_topic) + + + def enroll_topic(self, topic): + """ Create and Subscribe to the session topic and the resources + corresponding to the hostname + + :param hostname: Full hrn of the node + :type hostname: str + + """ + if topic in self._topics: + return + + self._topics.append(topic) + +# try : + self._client.create(topic) +# except: +# msg = "Topic already existing" +# self.info(msg) + self._client.subscribe(topic) + + def frcp_inform(self, topic, cid, itype): + """ Configure attribute on the node + + """ + msg_id = os.urandom(16).encode('hex') + timestamp = tsformat() + payload = self._message.inform_function(msg_id, self._src, timestamp, props = props ,guards = guards) + + self._client.publish(payload, xmpp_node) + + def frcp_configure(self, topic, props = None, guards = None ): + """ Configure attribute on the node + + """ + msg_id = os.urandom(16).encode('hex') + timestamp = tsformat() + payload = self._message.configure_function(msg_id, self._src, timestamp ,props = props ,guards = guards) + self._client.publish(payload, topic) + + + def frcp_create(self, topic, rtype, props = None, guards = None ): + """ Send to the stdin of the application the value + + """ + msg_id = os.urandom(16).encode('hex') + timestamp = tsformat() + payload = self._message.create_function(msg_id, self._src, rtype, timestamp , props = props ,guards = guards) + self._client.publish(payload, topic) + + + def frcp_request(self, topic, props = None, guards = None ): + """ Execute command on the node + + """ + msg_id = os.urandom(16).encode('hex') + timestamp = tsformat() + payload = self._message.request_function(msg_id, self._src, timestamp, props = props ,guards = guards) + self._client.publish(payload, xmpp_node) + + def frcp_release(self, parent, child, res_id = None, props = None, guards = None ): + """ Delete the session and logger topics. Then disconnect + + """ + msg_id = os.urandom(16).encode('hex') + timestamp = tsformat() + payload = self._message.release_function(msg_id, self._src, timestamp, res_id = res_id, props = props ,guards = guards) + self._client.publish(payload, parent) + + if child in self._topics: + self._topics.remove(child) + + self._client.delete(child) + + def disconnect(self) : + """ Delete the session and logger topics. Then disconnect + + """ + self._client.delete(self._nepi_topic) + + #XXX Why there is a sleep there ? + time.sleep(1) + + # Wait the send queue to be empty before disconnect + self._client.disconnect(wait=True) + msg = " Disconnected from XMPP Server" + self.debug(msg) + + +class OMF6APIFactory(object): + """ + .. note:: + + It allows the different RM to use the same xmpp client if they use + the same credentials. For the moment, it is focused on XMPP. + + """ + # use lock to avoid concurrent access to the Api list at the same times by 2 + # different threads + lock = threading.Lock() + _apis = dict() + + @classmethod + def get_api(cls, host, user, port, password, exp_id = None): + """ Get an OMF Api + + :param slice: Xmpp Slice Name + :type slice: str + :param host: Xmpp Server Adress + :type host: str + :param port: Xmpp Port (Default : 5222) + :type port: str + :param password: Xmpp Password + :type password: str + + """ + if host and user and port and password: + key = cls._make_key(host, user, port, password, exp_id) + cls.lock.acquire() + if key in cls._apis: + #print "Api Counter : " + str(cls._apis[key]['cnt']) + cls._apis[key]['cnt'] += 1 + cls.lock.release() + return cls._apis[key]['api'] + else : + omf_api = cls.create_api(host, user, port, password, exp_id) + cls.lock.release() + return omf_api + return None + + @classmethod + def create_api(cls, host, user, port, password, exp_id): + """ Create an OMF API if this one doesn't exist yet with this credentials + + :param slice: Xmpp Slice Name + :type slice: str + :param host: Xmpp Server Adress + :type host: str + :param port: Xmpp Port (Default : 5222) + :type port: str + :param password: Xmpp Password + :type password: str + + """ + omf_api = OMF6API(host, user = user, port = port, password = password, exp_id = exp_id) + key = cls._make_key(host, user, port, password, exp_id) + cls._apis[key] = {} + cls._apis[key]['api'] = omf_api + cls._apis[key]['cnt'] = 1 + return omf_api + + @classmethod + def release_api(cls, host, user, port, password, exp_id = None): + """ Release an OMF API with this credentials + + :param slice: Xmpp Slice Name + :type slice: str + :param host: Xmpp Server Adress + :type host: str + :param port: Xmpp Port (Default : 5222) + :type port: str + :param password: Xmpp Password + :type password: str + + """ + if host and user and port and password: + key = cls._make_key(host, user, port, password, exp_id) + if key in cls._apis: + cls._apis[key]['cnt'] -= 1 + #print "Api Counter : " + str(cls._apis[key]['cnt']) + if cls._apis[key]['cnt'] == 0: + omf_api = cls._apis[key]['api'] + omf_api.disconnect() + + + @classmethod + def _make_key(cls, *args): + """ Hash the credentials in order to create a key + + :param args: list of arguments used to create the hash (user, host, port, ...) + :type args: list of args + + """ + skey = "".join(map(str, args)) + return hashlib.md5(skey).hexdigest() + + + diff --git a/src/nepi/resources/omf/omf6_parser.py b/src/nepi/resources/omf/omf6_parser.py new file mode 100644 index 00000000..9029ce26 --- /dev/null +++ b/src/nepi/resources/omf/omf6_parser.py @@ -0,0 +1,201 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac +# Julien Tribino + +from nepi.util.logger import Logger + +import traceback +import xml.etree.ElementTree as ET + +# inherit from BaseXmpp and XMLstream classes +class OMF6Parser(Logger): + """ + .. class:: Class Args : + + :param jid: Jabber Id (= Xmpp Slice + Date) + :type jid: str + :param password: Jabber Password (= Xmpp Password) + :type password: str + + .. note:: + + This class is an XMPP Client with customized method + + """ + + def __init__(self): + """ + + :param jid: Jabber Id (= Xmpp Slice + Date) + :type jid: str + :param password: Jabber Password (= Xmpp Password) + :type password: str + + + """ + super(OMF6Parser, self).__init__("OMF6API") + + + + def _check_for_tag(self, root, namespaces, tag): + """ Check if an element markup is in the ElementTree + + :param root: Root of the tree + :type root: ElementTree Element + :param namespaces: Namespaces of the element + :type namespaces: str + :param tag: Tag that will search in the tree + :type tag: str + + """ + for element in root.iter(namespaces+tag): + if element.text: + return element.text + else : + return None + + def _check_for_props(self, root, namespaces): + """ Check if an element markup is in the ElementTree + + :param root: Root of the tree + :type root: ElementTree Element + :param namespaces: Namespaces of the element + :type namespaces: str + + """ + props = {} + for properties in root.iter(namespaces+'props'): + for element in properties.iter(): + if element.tag and element.text: + props[element.tag] = element.text + return props + + def _check_for_membership(self, root, namespaces): + """ Check if an element markup is in the ElementTree + + :param root: Root of the tree + :type root: ElementTree Element + :param namespaces: Namespaces of the element + :type namespaces: str + + """ + for element in root.iter(namespaces+'membership'): + for elt in element.iter(namespaces+'it'): + ##XXX : change + return elt.text + + + def _check_output(self, root, namespaces): + """ Check the significative element in the answer and display it + + :param root: Root of the tree + :type root: ElementTree Element + :param namespaces: Namespaces of the tree + :type namespaces: str + + """ + fields = ["TARGET", "REASON", "PATH", "APPID", "VALUE"] + response = "" + for elt in fields: + msg = self._check_for_tag(root, namespaces, elt) + if msg is not None: + response = response + " " + msg.text + " :" + deb = self._check_for_tag(root, namespaces, "MESSAGE") + if deb is not None: + msg = response + " " + deb.text + self.debug(msg) + else : + self.info(response) + + + def _inform_creation_ok(self, root, namespaces): + uid = self._check_for_tag(root, namespaces, "uid") + member = self._check_for_membership(root, namespaces) + binary_path = self._check_for_tag(root, namespaces, "binary_path") + msg = "CREATION OK -- " + if binary_path : + msg = msg + "The resource : '"+binary_path + if uid : + msg = msg + "' is listening to the topics : '"+ uid + if member : + msg = msg + "' and '"+ member +"'" + self.info(msg) + + def _inform_creation_failed(self, root, namespaces): + reason = self._check_for_tag(root, namespaces, "reason") + msg = "CREATION FAILED - The reason : "+reason + self.error(msg) + + def _inform_status(self, root, namespaces): + props = self._check_for_props(root, namespaces) + msg = "STATUS -- " + for elt in props.keys(): + ns, tag = elt.split('}') + if tag == "it": + msg = msg + "membership : " + props[elt]+" -- " + else: + msg = msg + tag +" : " + props[elt]+" -- " + msg = msg + " STATUS " + self.info(msg) + + def _inform_released(self, root, namespaces): + parent_id = self._check_for_tag(root, namespaces, "src") + child_id = self._check_for_tag(root, namespaces, "res_id") + msg = "RELEASED - The resource : '"+res_id+ \ + "' has been released by : '"+ src + self.info(msg) + + def _inform_error(self, root, namespaces): + reason = self._check_for_tag(root, namespaces, "reason") + msg = "The reason : "+reason + self.error(msg) + + def _inform_warn(self, root, namespaces): + reason = self._check_for_tag(root, namespaces, "reason") + msg = "The reason : "+reason + self.warn(msg) + + def _parse_inform(self, root, namespaces): + """ Check the significative element in the answer and display it + + :param root: Root of the tree + :type root: ElementTree Element + :param namespaces: Namespaces of the tree + :type namespaces: str + + """ + itype = self._check_for_tag(root, namespaces, "itype") + if itype : + method_name = '_inform_'+ itype.replace('.', '_').lower() + method = getattr(self, method_name) + if method : + method(root, namespaces) + else : + msg = "There is no method to parse the response of the type " + itype + self.info(msg) + return + + + def handle(self, iq): + namespaces = "{http://schema.mytestbed.net/omf/6.0/protocol}" + for i in iq['pubsub_event']['items']: + root = ET.fromstring(str(i)) + #ET.dump(root) + self._parse_inform(root, namespaces) + diff --git a/src/nepi/resources/omf/omf6_resource.py b/src/nepi/resources/omf/omf6_resource.py new file mode 100644 index 00000000..c92f86d4 --- /dev/null +++ b/src/nepi/resources/omf/omf6_resource.py @@ -0,0 +1,53 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Julien Tribino +# Lucia Guevgeozian + +from nepi.execution.attribute import Attribute, Flags, Types +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState, reschedule_delay + + +@clsinit_copy +class OMF6Resource(ResourceManager): + """ + Generic resource gathering XMPP credential information and common methods + for OMF nodes, channels, applications, etc. + """ + _rtype = "OMFResource" + + @classmethod + def _register_attributes(cls): + + xmppHost = Attribute("xmppHost", "Xmpp Server", + flags = Flags.Credential) + xmppUser = Attribute("xmppUser", "Xmpp User") + xmppPort = Attribute("xmppPort", "Xmpp Port", + flags = Flags.Credential) + xmppPassword = Attribute("xmppPassword", "Xmpp Password", + flags = Flags.Credential) + + cls._register_attribute(xmppHost) + cls._register_attribute(xmppUser) + cls._register_attribute(xmppPort) + cls._register_attribute(xmppPassword) + + def __init__(self, ec, guid): + super(OMF6Resource, self).__init__(ec, guid) + pass + diff --git a/src/nepi/resources/omf/omf_client.py b/src/nepi/resources/omf/omf_client.py index 396e2d7b..ea53f60d 100644 --- a/src/nepi/resources/omf/omf_client.py +++ b/src/nepi/resources/omf/omf_client.py @@ -19,7 +19,7 @@ # Julien Tribino from nepi.util.logger import Logger - +from nepi.resources.omf.omf6_parser import OMF6Parser try: import sleekxmpp @@ -71,6 +71,9 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger): self._ready = False self._registered = False self._server = None + self._parser = None + + self.register_plugin('xep_0077') # In-band registration self.register_plugin('xep_0030') @@ -80,7 +83,16 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger): self.add_event_handler("session_start", self.start) self.add_event_handler("register", self.register) self.add_event_handler("pubsub_publish", self.handle_omf_message) + + #Init the parser + self._init_parser() + def _init_parser(self): + """ Init the parser depending on the OMF Version + + """ + self._parser = OMF6Parser() + @property def ready(self): """ Check if the client is ready @@ -190,8 +202,9 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger): try: self['xep_0060'].create_node(self._server, node, config = config) except: - error = traceback.format_exc() - msg = ' Could not create topic: %s\ntraceback:\n%s' % (node, error) + #error = traceback.format_exc() + #msg = ' Could not create topic: %s\ntraceback:\n%s' % (node, error) + msg = 'Could not create the topic : '+node+' . Maybe the topic already exists' self.error(msg) def delete(self, node): @@ -321,44 +334,6 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger): % (self.boundjid.bare, node, error) self.error(msg) - def _check_for_tag(self, root, namespaces, tag): - """ Check if an element markup is in the ElementTree - - :param root: Root of the tree - :type root: ElementTree Element - :param namespaces: Namespaces of the element - :type namespaces: str - :param tag: Tag that will search in the tree - :type tag: str - - """ - for element in root.iter(namespaces+tag): - if element.text: - return element - else : - return None - - def _check_output(self, root, namespaces): - """ Check the significative element in the answer and display it - - :param root: Root of the tree - :type root: ElementTree Element - :param namespaces: Namespaces of the tree - :type namespaces: str - - """ - fields = ["TARGET", "REASON", "PATH", "APPID", "VALUE"] - response = "" - for elt in fields: - msg = self._check_for_tag(root, namespaces, elt) - if msg is not None: - response = response + " " + msg.text + " :" - deb = self._check_for_tag(root, namespaces, "MESSAGE") - if deb is not None: - msg = response + " " + deb.text - self.debug(msg) - else : - self.info(response) def handle_omf_message(self, iq): """ Handle published/received message @@ -367,9 +342,5 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger): :type iq: Iq Stanza """ - namespaces = "{http://jabber.org/protocol/pubsub}" - for i in iq['pubsub_event']['items']: - root = ET.fromstring(str(i)) - self._check_output(root, namespaces) - + self._parser.handle(iq)