--- /dev/null
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+# Julien Tribino <julien.tribino@inria.fr>
+
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay
+from nepi.execution.attribute import Attribute, Flags
+from nepi.resources.omf.omf6_resource import OMF6Resource
+from nepi.resources.omf.node6 import OMF6Node
+from nepi.resources.omf.omf6_api import OMF6APIFactory
+
+import os, time
+from nepi.util import sshfuncs
+
+@clsinit_copy
+class OMF6Application(OMF6Resource):
+ """
+ .. class:: Class Args :
+
+ :param ec: The Experiment controller
+ :type ec: ExperimentController
+ :param guid: guid of the RM
+ :type guid: int
+
+ """
+ _rtype = "OMF6Application"
+ _authorized_connections = ["OMF6Node"]
+
+ @classmethod
+ def _register_attributes(cls):
+ """ Register the attributes of an OMF application
+
+ """
+ command = Attribute("command", "Command to execute")
+ env = Attribute("env", "Environnement variable of the application")
+# sources = Attribute("sources", "Sources of the application",
+# flags = Flags.ExecReadOnly)
+# sshuser = Attribute("sshUser", "user to connect with ssh",
+# flags = Flags.ExecReadOnly)
+# sshkey = Attribute("sshKey", "key to use for ssh",
+# flags = Flags.ExecReadOnly)
+ cls._register_attribute(command)
+ cls._register_attribute(env)
+# cls._register_attribute(sources)
+# cls._register_attribute(sshuser)
+# cls._register_attribute(sshkey)
+
+ def __init__(self, ec, guid):
+ """
+ :param ec: The Experiment controller
+ :type ec: ExperimentController
+ :param guid: guid of the RM
+ :type guid: int
+ :param creds: Credentials to communicate with the rm (XmppClient for OMF)
+ :type creds: dict
+
+ """
+ super(OMF6Application, self).__init__(ec, guid)
+
+ self.set('command', "")
+ self.set('env', "")
+
+ self._node = None
+ self._topic_app = None
+
+ self._omf_api = None
+
+ @property
+ def exp_id(self):
+ return self.ec.exp_id
+
+ @property
+ def node(self):
+ rm_list = self.get_connected(OMF6Node.get_rtype())
+ if rm_list: return rm_list[0]
+ return None
+
+ def valid_connection(self, guid):
+ """ Check if the connection with the guid in parameter is possible.
+ Only meaningful connections are allowed.
+
+ :param guid: Guid of RM it will be connected
+ :type guid: int
+ :rtype: Boolean
+
+ """
+ rm = self.ec.get_resource(guid)
+ if rm.get_rtype() not in self._authorized_connections:
+ msg = ("Connection between %s %s and %s %s refused: "
+ "An Application can be connected only to a Node" ) % \
+ (self.get_rtype(), self._guid, rm.get_rtype(), guid)
+ self.debug(msg)
+
+ return False
+
+ elif len(self.connections) != 0 :
+ msg = ("Connection between %s %s and %s %s refused: "
+ "This Application is already connected" ) % \
+ (self.get_rtype(), self._guid, rm.get_rtype(), guid)
+ self.debug(msg)
+
+ return False
+
+ else :
+ msg = "Connection between %s %s and %s %s accepted" % (
+ self.get_rtype(), self._guid, rm.get_rtype(), guid)
+ self.debug(msg)
+
+ return True
+
+ def do_deploy(self):
+ """ Deploy the RM. It means nothing special for an application
+ for now (later it will be upload sources, ...)
+ It becomes DEPLOYED after getting the xmpp client.
+
+ """
+
+ self.set('xmppUser',self.node.get('xmppUser'))
+ self.set('xmppHost',self.node.get('xmppHost'))
+ self.set('xmppPort',self.node.get('xmppPort'))
+ self.set('xmppPassword',self.node.get('xmppPassword'))
+
+ if not (self.get('xmppUser') and self.get('xmppHost')
+ and self.get('xmppPort') and self.get('xmppPassword')):
+ msg = "Credentials are not initialzed. XMPP Connections impossible"
+ self.error(msg)
+ raise RuntimeError, msg
+
+ if not self._omf_api :
+ self._omf_api = OMF6APIFactory.get_api(self.get('xmppHost'),
+ self.get('xmppUser'), self.get('xmppPort'),
+ self.get('xmppPassword'), exp_id = self.exp_id)
+
+# if self.get('sources'):
+# gateway = ResourceGateway.AMtoGateway[self.get('xmppHost')]
+# user = self.get('sshUser') or self.get('xmppSlice')
+# dst = user + "@"+ gateway + ":"
+# (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst)
+
+ self._topic_app = self.node.get('hostname') +'_'+ str(self.guid) +'_app'
+
+ self._omf_api.enroll_topic(self._topic_app)
+
+ props = {}
+ if self.get('command'):
+ props['application:binary_path'] = self.get('command')
+ props['application:hrn'] = self.get('command')
+ props['application:membership'] = self._topic_app
+ props['application:type'] = "application"
+ self._omf_api.frcp_create( self.node.get('hostname'), "application", props = props)
+
+
+
+ super(OMF6Application, self).do_deploy()
+
+ def do_start(self):
+ """ Start the RM. It means : Send Xmpp Message Using OMF protocol
+ to execute the application.
+ It becomes STARTED before the messages are sent (for coordination)
+
+ """
+ if not self.get('command') :
+ msg = "Application's Command is not initialized"
+ self.error(msg)
+ raise RuntimeError, msg
+
+ if not self.get('env'):
+ self.set('env', " ")
+
+ props = {}
+ props['state'] = "running"
+
+ guards = {}
+ guards['type'] = "application"
+ guards['name'] = self.get('command')
+ time.sleep(2)
+ self._omf_api.frcp_configure(self._topic_app, props = props, guards = guards )
+
+
+ super(OMF6Application, self).do_start()
+
+ def do_stop(self):
+ """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to
+ kill the application.
+ State is set to STOPPED after the message is sent.
+
+ """
+
+ super(OMF6Application, self).do_stop()
+
+ def do_release(self):
+ """ Clean the RM at the end of the experiment and release the API.
+
+ """
+ props = {}
+ props['frcp:type'] = "application"
+
+ self._omf_api.frcp_release(self.node.get('hostname'),self._topic_app, props = props )
+
+ if self._omf_api:
+ OMF6APIFactory.release_api(self.get('xmppHost'),
+ self.get('xmppUser'), self.get('xmppPort'),
+ self.get('xmppPassword'), exp_id = self.exp_id)
+
+ super(OMF6Application, self).do_release()
+
--- /dev/null
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+# Julien Tribino <julien.tribino@inria.fr>
+
+from xml.etree import cElementTree as ET
+
+class MessageHandler():
+ """
+ .. class:: Class Args :
+
+ :param sliceid: Slice Name (= Xmpp Slice)
+ :type expid: str
+ :param expid: Experiment ID (= Xmpp User)
+ :type expid: str
+
+ .. note::
+
+ This class is used only for OMF 5.4 Protocol and is going to become unused
+
+ """
+
+ def __init__(self):
+ """
+
+ """
+ pass
+
+ def _type_element(self, type_elt, xmlns, msg_id):
+ """ Insert a markup element with an id
+
+ """
+ elt = ET.Element(type_elt)
+ elt.set("xmlns", xmlns)
+ elt.set("mid", msg_id)
+ return elt
+
+
+
+ def _attr_element(self, parent, markup, text, type_key=None, type_value = None):
+ """ Insert a markup element with a text (value)
+
+ :param parent: Parent element in an XML point of view
+ :type parent: ElementTree Element
+ :param markup: Name of the markup
+ :type markup: str
+ :param text: Value of the markup element
+ :type text: str
+
+ """
+ elt = ET.SubElement(parent, markup)
+ if type_key and type_value:
+ elt.set(type_key, type_value)
+ elt.text = text
+ return elt
+
+ def _id_element(self, parent, markup, key, value):
+ """ Insert a markup element with a text (value)
+
+ :param parent: Parent element in an XML point of view
+ :type parent: ElementTree Element
+ :param markup: Name of the markup
+ :type markup: str
+ :param text: Value of the markup element
+ :type text: str
+
+ """
+ elt = ET.SubElement(parent, markup)
+ elt.set(key, value)
+ return elt
+
+ def create_function(self, msg_id, src, rtype, timestamp, props = None, guards = None):
+ """ Build a create message
+ """
+ payload = self._type_element("create", "http://schema.mytestbed.net/omf/6.0/protocol", msg_id )
+ self._attr_element(payload,"src",src)
+ self._attr_element(payload,"ts",timestamp)
+ self._attr_element(payload,"rtype",rtype)
+
+ if props :
+ if rtype == "application" :
+ properties = self._id_element(payload,"props","xmlns:application",
+ "http://schema.mytestbed.net/omf/6.0/protocol/application")
+ else:
+ properties = self._attr_element(payload,"props","")
+
+ for prop in props.keys():
+ self._attr_element(properties,prop,props[prop],type_key="type", type_value = "string")
+
+ if guards :
+ guardians = self._attr_element(payload,"guard","")
+ for guard in guards.keys():
+ self._attr_element(guardians,guard,guards[guard],type_key="type", type_value = "string")
+
+ return payload
+
+ def configure_function(self, msg_id, src, timestamp, props = None, guards = None):
+ """ Build a configure message
+ """
+ payload = self._type_element("configure", "http://schema.mytestbed.net/omf/6.0/protocol", msg_id )
+ self._attr_element(payload,"src",src)
+ self._attr_element(payload,"ts",timestamp)
+
+ if props :
+ properties = self._attr_element(payload,"props","")
+ for prop in props.keys():
+ self._attr_element(properties,prop,props[prop],type_key="type", type_value = "symbol")
+
+ if guards :
+ guardians = self._attr_element(payload,"guard","")
+ for guard in guards.keys():
+ self._attr_element(guardians,guard,guards[guard],type_key="type", type_value = "string")
+
+ return payload
+
+ def request_function(self, msg_id, src, timestamp, props = None, guards = None):
+ """ Build a request message
+
+ """
+ payload = self._type_element("request", "http://schema.mytestbed.net/omf/6.0/protocol", msg_id )
+ self._attr_element(payload,"src",src)
+ self._attr_element(payload,"ts",timestamp)
+
+ if props :
+ properties = self._attr_element(payload,"props","")
+ for prop in props.keys():
+ self._attr_element(properties,prop,props[prop])
+
+ if guards :
+ guardians = self._attr_element(payload,"guard","")
+ for guard in guards.keys():
+ self._attr_element(guardians,guard,guards[guard])
+ return payload
+
+ def inform_function(self, msg_id, src, timestamp, cid, itype):
+ """ Build an inform message
+
+ """
+ payload = self._type_element("inform", "http://schema.mytestbed.net/omf/6.0/protocol", msg_id )
+ sliceid = self._attr_element(payload,"src",src)
+ expid = self._attr_element(config,"ts",timestamp)
+ target = self._attr_element(config,"cid",cid)
+ value = self._attr_element(config,"itype",value)
+ path = self._attr_element(config,"properties",path)
+ return payload
+
+ def release_function(self, msg_id, src, timestamp, res_id = None, props = None, guards = None):
+ """ Build a release message
+
+ """
+ payload = self._type_element("release", "http://schema.mytestbed.net/omf/6.0/protocol", msg_id )
+ self._attr_element(payload,"src",src)
+ self._attr_element(payload,"ts",timestamp)
+ if res_id :
+ self._attr_element(payload,"res_id",timestamp)
+
+ if props :
+ properties = self._id_element(payload,"props","xmlns:frcp",
+ "http://schema.mytestbed.net/omf/6.0/protocol")
+ for prop in props.keys():
+ self._attr_element(properties,prop,props[prop])
+
+ if guards :
+ guardians = self._attr_element(payload,"guard","")
+ for guard in guards.keys():
+ self._attr_element(guardians,guard,guards[guard])
+
+ return payload
+
--- /dev/null
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+# Julien Tribino <julien.tribino@inria.fr>
+
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay
+from nepi.execution.attribute import Attribute, Flags
+from nepi.resources.omf.omf6_resource import OMF6Resource
+from nepi.resources.omf.omf6_api import OMF6APIFactory
+
+import time
+
+@clsinit_copy
+class OMF6Node(OMF6Resource):
+ """
+ .. class:: Class Args :
+
+ :param ec: The Experiment controller
+ :type ec: ExperimentController
+ :param guid: guid of the RM
+ :type guid: int
+ :param creds: Credentials to communicate with the rm (XmppClient for OMF)
+ :type creds: dict
+
+ """
+ _rtype = "OMF6Node"
+ _authorized_connections = ["OMF6Application" , "OMFWifiInterface"]
+
+ @classmethod
+ def _register_attributes(cls):
+ """Register the attributes of an OMF Node
+
+ """
+ hostname = Attribute("hostname", "Hostname of the machine")
+
+ cls._register_attribute(hostname)
+
+ # XXX: We don't necessary need to have the credentials at the
+ # moment we create the RM
+ def __init__(self, ec, guid):
+ """
+ :param ec: The Experiment controller
+ :type ec: ExperimentController
+ :param guid: guid of the RM
+ :type guid: int
+
+ """
+ super(OMF6Node, self).__init__(ec, guid)
+
+ self._omf_api = None
+
+ @property
+ def exp_id(self):
+ return self.ec.exp_id
+
+ def valid_connection(self, guid):
+ """ Check if the connection with the guid in parameter is possible.
+ Only meaningful connections are allowed.
+
+ :param guid: Guid of the current RM
+ :type guid: int
+ :rtype: Boolean
+
+ """
+ rm = self.ec.get_resource(guid)
+ if rm.get_rtype() in self._authorized_connections:
+ msg = "Connection between %s %s and %s %s accepted" % (
+ self.get_rtype(), self._guid, rm.get_rtype(), guid)
+ self.debug(msg)
+
+ return True
+
+ msg = "Connection between %s %s and %s %s refused" % (
+ self.get_rtype(), self._guid, rm.get_rtype(), guid)
+ self.debug(msg)
+
+ return False
+
+ def do_deploy(self):
+ """ Deploy the RM. It means : Send Xmpp Message Using OMF protocol
+ to enroll the node into the experiment.
+ It becomes DEPLOYED after sending messages to enroll the node
+
+ """
+ if not (self.get('xmppUser') and self.get('xmppHost')
+ and self.get('xmppPort') and self.get('xmppPassword')):
+ msg = "Credentials are not initialzed. XMPP Connections impossible"
+ self.error(msg)
+ raise RuntimeError, msg
+
+ if not self._omf_api :
+ self._omf_api = OMF6APIFactory.get_api(self.get('xmppHost'),
+ self.get('xmppUser'), self.get('xmppPort'),
+ self.get('xmppPassword'), exp_id = self.exp_id)
+
+ if not self.get('hostname') :
+ msg = "Hostname's value is not initialized"
+ self.error(msg)
+ raise RuntimeError, msg
+
+ self._omf_api.enroll_topic(self.get('hostname'))
+
+ super(OMF6Node, self).do_deploy()
+
+ def do_release(self):
+ """ Clean the RM at the end of the experiment
+
+ """
+ if self._omf_api:
+ # Should be deleted from the RC
+ #self._omf_api.frcp_release(self.get('hostname'))
+
+ OMF6APIFactory.release_api(self.get('xmppHost'),
+ self.get('xmppUser'), self.get('xmppPort'),
+ self.get('xmppPassword'), exp_id = self.exp_id)
+
+ super(OMF6Node, self).do_release()
+
--- /dev/null
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+# Julien Tribino <julien.tribino@inria.fr>
+
+import ssl
+import sys
+import time
+import hashlib
+import threading
+
+from nepi.util.timefuncs import tsformat
+import os
+
+from nepi.util.logger import Logger
+
+from nepi.resources.omf.omf_client import OMFClient
+from nepi.resources.omf.messages_6 import MessageHandler
+
+class OMF6API(Logger):
+ """
+ .. class:: Class Args :
+
+ :param slice: Xmpp Slice
+ :type slice: str
+ :param host: Xmpp Server
+ :type host: str
+ :param port: Xmpp Port
+ :type port: str
+ :param password: Xmpp password
+ :type password: str
+ :param xmpp_root: Root of the Xmpp Topic Architecture
+ :type xmpp_root: str
+
+ .. note::
+
+ This class is the implementation of an OMF 5.4 API.
+ Since the version 5.4.1, the Topic Architecture start with OMF_5.4
+ instead of OMF used for OMF5.3
+
+ """
+ def __init__(self, host, user = "nepi", port="5222", password="1234",
+ exp_id = None):
+ """
+
+ :param slice: Xmpp Slice
+ :type slice: str
+ :param host: Xmpp Server
+ :type host: str
+ :param port: Xmpp Port
+ :type port: str
+ :param password: Xmpp password
+ :type password: str
+ :param xmpp_root: Root of the Xmpp Topic Architecture
+ :type xmpp_root: str
+
+ """
+ super(OMF6API, self).__init__("OMF6API")
+ self._exp_id = exp_id
+ self._user = user # name of the machine that run Nepi
+ self._host = host # name of the xmpp server
+ self._port = port # port of the xmpp server
+ self._password = password # password to connect to xmpp
+ self._jid = "%s-%s@%s" % (self._user, self._exp_id, self._host)
+ self._src = "xmpp://" + self._jid
+
+ self._topics = []
+
+ # OMF xmpp client
+ self._client = None
+
+ # message handler
+ self._message = None
+
+ if sys.version_info < (3, 0):
+ reload(sys)
+ sys.setdefaultencoding('utf8')
+
+ # instantiate the xmpp client
+ self._init_client()
+
+ # register nepi topic
+ self._enroll_nepi()
+
+
+ def _init_client(self):
+ """ Initialize XMPP Client
+
+ """
+ xmpp = OMFClient(self._jid, self._password)
+ # PROTOCOL_SSLv3 required for compatibility with OpenFire
+ xmpp.ssl_version = ssl.PROTOCOL_SSLv3
+
+ if xmpp.connect((self._host, self._port)):
+ xmpp.process(block=False)
+ self.check_ready(xmpp)
+ self._client = xmpp
+ self._message = MessageHandler()
+ else:
+ msg = "Unable to connect to the XMPP server."
+ self.error(msg)
+ raise RuntimeError(msg)
+
+ def check_ready(self, xmpp):
+ delay = 1.0
+ for i in xrange(4):
+ if xmpp.ready:
+ break
+ else:
+ time.sleep(delay)
+ delay = delay * 1.5
+ else:
+ msg = "XMPP Client is not ready after long time"
+ self.error(msg, out, err)
+ raise RuntimeError, msg
+
+ @property
+ def _nepi_topic(self):
+ msg = "nepi-" + self._exp_id
+ self.debug(msg)
+ return msg
+
+ def _enroll_nepi(self):
+ """ Create and Subscribe to the Session Topic
+
+ """
+ nepi_topic = self._nepi_topic
+ self._client.create(nepi_topic)
+ self._client.subscribe(nepi_topic)
+
+
+ def enroll_topic(self, topic):
+ """ Create and Subscribe to the session topic and the resources
+ corresponding to the hostname
+
+ :param hostname: Full hrn of the node
+ :type hostname: str
+
+ """
+ if topic in self._topics:
+ return
+
+ self._topics.append(topic)
+
+# try :
+ self._client.create(topic)
+# except:
+# msg = "Topic already existing"
+# self.info(msg)
+ self._client.subscribe(topic)
+
+ def frcp_inform(self, topic, cid, itype):
+ """ Configure attribute on the node
+
+ """
+ msg_id = os.urandom(16).encode('hex')
+ timestamp = tsformat()
+ payload = self._message.inform_function(msg_id, self._src, timestamp, props = props ,guards = guards)
+
+ self._client.publish(payload, xmpp_node)
+
+ def frcp_configure(self, topic, props = None, guards = None ):
+ """ Configure attribute on the node
+
+ """
+ msg_id = os.urandom(16).encode('hex')
+ timestamp = tsformat()
+ payload = self._message.configure_function(msg_id, self._src, timestamp ,props = props ,guards = guards)
+ self._client.publish(payload, topic)
+
+
+ def frcp_create(self, topic, rtype, props = None, guards = None ):
+ """ Send to the stdin of the application the value
+
+ """
+ msg_id = os.urandom(16).encode('hex')
+ timestamp = tsformat()
+ payload = self._message.create_function(msg_id, self._src, rtype, timestamp , props = props ,guards = guards)
+ self._client.publish(payload, topic)
+
+
+ def frcp_request(self, topic, props = None, guards = None ):
+ """ Execute command on the node
+
+ """
+ msg_id = os.urandom(16).encode('hex')
+ timestamp = tsformat()
+ payload = self._message.request_function(msg_id, self._src, timestamp, props = props ,guards = guards)
+ self._client.publish(payload, xmpp_node)
+
+ def frcp_release(self, parent, child, res_id = None, props = None, guards = None ):
+ """ Delete the session and logger topics. Then disconnect
+
+ """
+ msg_id = os.urandom(16).encode('hex')
+ timestamp = tsformat()
+ payload = self._message.release_function(msg_id, self._src, timestamp, res_id = res_id, props = props ,guards = guards)
+ self._client.publish(payload, parent)
+
+ if child in self._topics:
+ self._topics.remove(child)
+
+ self._client.delete(child)
+
+ def disconnect(self) :
+ """ Delete the session and logger topics. Then disconnect
+
+ """
+ self._client.delete(self._nepi_topic)
+
+ #XXX Why there is a sleep there ?
+ time.sleep(1)
+
+ # Wait the send queue to be empty before disconnect
+ self._client.disconnect(wait=True)
+ msg = " Disconnected from XMPP Server"
+ self.debug(msg)
+
+
+class OMF6APIFactory(object):
+ """
+ .. note::
+
+ It allows the different RM to use the same xmpp client if they use
+ the same credentials. For the moment, it is focused on XMPP.
+
+ """
+ # use lock to avoid concurrent access to the Api list at the same times by 2
+ # different threads
+ lock = threading.Lock()
+ _apis = dict()
+
+ @classmethod
+ def get_api(cls, host, user, port, password, exp_id = None):
+ """ Get an OMF Api
+
+ :param slice: Xmpp Slice Name
+ :type slice: str
+ :param host: Xmpp Server Adress
+ :type host: str
+ :param port: Xmpp Port (Default : 5222)
+ :type port: str
+ :param password: Xmpp Password
+ :type password: str
+
+ """
+ if host and user and port and password:
+ key = cls._make_key(host, user, port, password, exp_id)
+ cls.lock.acquire()
+ if key in cls._apis:
+ #print "Api Counter : " + str(cls._apis[key]['cnt'])
+ cls._apis[key]['cnt'] += 1
+ cls.lock.release()
+ return cls._apis[key]['api']
+ else :
+ omf_api = cls.create_api(host, user, port, password, exp_id)
+ cls.lock.release()
+ return omf_api
+ return None
+
+ @classmethod
+ def create_api(cls, host, user, port, password, exp_id):
+ """ Create an OMF API if this one doesn't exist yet with this credentials
+
+ :param slice: Xmpp Slice Name
+ :type slice: str
+ :param host: Xmpp Server Adress
+ :type host: str
+ :param port: Xmpp Port (Default : 5222)
+ :type port: str
+ :param password: Xmpp Password
+ :type password: str
+
+ """
+ omf_api = OMF6API(host, user = user, port = port, password = password, exp_id = exp_id)
+ key = cls._make_key(host, user, port, password, exp_id)
+ cls._apis[key] = {}
+ cls._apis[key]['api'] = omf_api
+ cls._apis[key]['cnt'] = 1
+ return omf_api
+
+ @classmethod
+ def release_api(cls, host, user, port, password, exp_id = None):
+ """ Release an OMF API with this credentials
+
+ :param slice: Xmpp Slice Name
+ :type slice: str
+ :param host: Xmpp Server Adress
+ :type host: str
+ :param port: Xmpp Port (Default : 5222)
+ :type port: str
+ :param password: Xmpp Password
+ :type password: str
+
+ """
+ if host and user and port and password:
+ key = cls._make_key(host, user, port, password, exp_id)
+ if key in cls._apis:
+ cls._apis[key]['cnt'] -= 1
+ #print "Api Counter : " + str(cls._apis[key]['cnt'])
+ if cls._apis[key]['cnt'] == 0:
+ omf_api = cls._apis[key]['api']
+ omf_api.disconnect()
+
+
+ @classmethod
+ def _make_key(cls, *args):
+ """ Hash the credentials in order to create a key
+
+ :param args: list of arguments used to create the hash (user, host, port, ...)
+ :type args: list of args
+
+ """
+ skey = "".join(map(str, args))
+ return hashlib.md5(skey).hexdigest()
+
+
+
--- /dev/null
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+# Julien Tribino <julien.tribino@inria.fr>
+
+from nepi.util.logger import Logger
+
+import traceback
+import xml.etree.ElementTree as ET
+
+# inherit from BaseXmpp and XMLstream classes
+class OMF6Parser(Logger):
+ """
+ .. class:: Class Args :
+
+ :param jid: Jabber Id (= Xmpp Slice + Date)
+ :type jid: str
+ :param password: Jabber Password (= Xmpp Password)
+ :type password: str
+
+ .. note::
+
+ This class is an XMPP Client with customized method
+
+ """
+
+ def __init__(self):
+ """
+
+ :param jid: Jabber Id (= Xmpp Slice + Date)
+ :type jid: str
+ :param password: Jabber Password (= Xmpp Password)
+ :type password: str
+
+
+ """
+ super(OMF6Parser, self).__init__("OMF6API")
+
+
+
+ def _check_for_tag(self, root, namespaces, tag):
+ """ Check if an element markup is in the ElementTree
+
+ :param root: Root of the tree
+ :type root: ElementTree Element
+ :param namespaces: Namespaces of the element
+ :type namespaces: str
+ :param tag: Tag that will search in the tree
+ :type tag: str
+
+ """
+ for element in root.iter(namespaces+tag):
+ if element.text:
+ return element.text
+ else :
+ return None
+
+ def _check_for_props(self, root, namespaces):
+ """ Check if an element markup is in the ElementTree
+
+ :param root: Root of the tree
+ :type root: ElementTree Element
+ :param namespaces: Namespaces of the element
+ :type namespaces: str
+
+ """
+ props = {}
+ for properties in root.iter(namespaces+'props'):
+ for element in properties.iter():
+ if element.tag and element.text:
+ props[element.tag] = element.text
+ return props
+
+ def _check_for_membership(self, root, namespaces):
+ """ Check if an element markup is in the ElementTree
+
+ :param root: Root of the tree
+ :type root: ElementTree Element
+ :param namespaces: Namespaces of the element
+ :type namespaces: str
+
+ """
+ for element in root.iter(namespaces+'membership'):
+ for elt in element.iter(namespaces+'it'):
+ ##XXX : change
+ return elt.text
+
+
+ def _check_output(self, root, namespaces):
+ """ Check the significative element in the answer and display it
+
+ :param root: Root of the tree
+ :type root: ElementTree Element
+ :param namespaces: Namespaces of the tree
+ :type namespaces: str
+
+ """
+ fields = ["TARGET", "REASON", "PATH", "APPID", "VALUE"]
+ response = ""
+ for elt in fields:
+ msg = self._check_for_tag(root, namespaces, elt)
+ if msg is not None:
+ response = response + " " + msg.text + " :"
+ deb = self._check_for_tag(root, namespaces, "MESSAGE")
+ if deb is not None:
+ msg = response + " " + deb.text
+ self.debug(msg)
+ else :
+ self.info(response)
+
+
+ def _inform_creation_ok(self, root, namespaces):
+ uid = self._check_for_tag(root, namespaces, "uid")
+ member = self._check_for_membership(root, namespaces)
+ binary_path = self._check_for_tag(root, namespaces, "binary_path")
+ msg = "CREATION OK -- "
+ if binary_path :
+ msg = msg + "The resource : '"+binary_path
+ if uid :
+ msg = msg + "' is listening to the topics : '"+ uid
+ if member :
+ msg = msg + "' and '"+ member +"'"
+ self.info(msg)
+
+ def _inform_creation_failed(self, root, namespaces):
+ reason = self._check_for_tag(root, namespaces, "reason")
+ msg = "CREATION FAILED - The reason : "+reason
+ self.error(msg)
+
+ def _inform_status(self, root, namespaces):
+ props = self._check_for_props(root, namespaces)
+ msg = "STATUS -- "
+ for elt in props.keys():
+ ns, tag = elt.split('}')
+ if tag == "it":
+ msg = msg + "membership : " + props[elt]+" -- "
+ else:
+ msg = msg + tag +" : " + props[elt]+" -- "
+ msg = msg + " STATUS "
+ self.info(msg)
+
+ def _inform_released(self, root, namespaces):
+ parent_id = self._check_for_tag(root, namespaces, "src")
+ child_id = self._check_for_tag(root, namespaces, "res_id")
+ msg = "RELEASED - The resource : '"+res_id+ \
+ "' has been released by : '"+ src
+ self.info(msg)
+
+ def _inform_error(self, root, namespaces):
+ reason = self._check_for_tag(root, namespaces, "reason")
+ msg = "The reason : "+reason
+ self.error(msg)
+
+ def _inform_warn(self, root, namespaces):
+ reason = self._check_for_tag(root, namespaces, "reason")
+ msg = "The reason : "+reason
+ self.warn(msg)
+
+ def _parse_inform(self, root, namespaces):
+ """ Check the significative element in the answer and display it
+
+ :param root: Root of the tree
+ :type root: ElementTree Element
+ :param namespaces: Namespaces of the tree
+ :type namespaces: str
+
+ """
+ itype = self._check_for_tag(root, namespaces, "itype")
+ if itype :
+ method_name = '_inform_'+ itype.replace('.', '_').lower()
+ method = getattr(self, method_name)
+ if method :
+ method(root, namespaces)
+ else :
+ msg = "There is no method to parse the response of the type " + itype
+ self.info(msg)
+ return
+
+
+ def handle(self, iq):
+ namespaces = "{http://schema.mytestbed.net/omf/6.0/protocol}"
+ for i in iq['pubsub_event']['items']:
+ root = ET.fromstring(str(i))
+ #ET.dump(root)
+ self._parse_inform(root, namespaces)
+
--- /dev/null
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Julien Tribino <julien.tribino@inria.fr>
+# Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
+
+from nepi.execution.attribute import Attribute, Flags, Types
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay
+
+
+@clsinit_copy
+class OMF6Resource(ResourceManager):
+ """
+ Generic resource gathering XMPP credential information and common methods
+ for OMF nodes, channels, applications, etc.
+ """
+ _rtype = "OMFResource"
+
+ @classmethod
+ def _register_attributes(cls):
+
+ xmppHost = Attribute("xmppHost", "Xmpp Server",
+ flags = Flags.Credential)
+ xmppUser = Attribute("xmppUser", "Xmpp User")
+ xmppPort = Attribute("xmppPort", "Xmpp Port",
+ flags = Flags.Credential)
+ xmppPassword = Attribute("xmppPassword", "Xmpp Password",
+ flags = Flags.Credential)
+
+ cls._register_attribute(xmppHost)
+ cls._register_attribute(xmppUser)
+ cls._register_attribute(xmppPort)
+ cls._register_attribute(xmppPassword)
+
+ def __init__(self, ec, guid):
+ super(OMF6Resource, self).__init__(ec, guid)
+ pass
+
# Julien Tribino <julien.tribino@inria.fr>
from nepi.util.logger import Logger
-
+from nepi.resources.omf.omf6_parser import OMF6Parser
try:
import sleekxmpp
self._ready = False
self._registered = False
self._server = None
+ self._parser = None
+
+
self.register_plugin('xep_0077') # In-band registration
self.register_plugin('xep_0030')
self.add_event_handler("session_start", self.start)
self.add_event_handler("register", self.register)
self.add_event_handler("pubsub_publish", self.handle_omf_message)
+
+ #Init the parser
+ self._init_parser()
+ def _init_parser(self):
+ """ Init the parser depending on the OMF Version
+
+ """
+ self._parser = OMF6Parser()
+
@property
def ready(self):
""" Check if the client is ready
try:
self['xep_0060'].create_node(self._server, node, config = config)
except:
- error = traceback.format_exc()
- msg = ' Could not create topic: %s\ntraceback:\n%s' % (node, error)
+ #error = traceback.format_exc()
+ #msg = ' Could not create topic: %s\ntraceback:\n%s' % (node, error)
+ msg = 'Could not create the topic : '+node+' . Maybe the topic already exists'
self.error(msg)
def delete(self, node):
% (self.boundjid.bare, node, error)
self.error(msg)
- def _check_for_tag(self, root, namespaces, tag):
- """ Check if an element markup is in the ElementTree
-
- :param root: Root of the tree
- :type root: ElementTree Element
- :param namespaces: Namespaces of the element
- :type namespaces: str
- :param tag: Tag that will search in the tree
- :type tag: str
-
- """
- for element in root.iter(namespaces+tag):
- if element.text:
- return element
- else :
- return None
-
- def _check_output(self, root, namespaces):
- """ Check the significative element in the answer and display it
-
- :param root: Root of the tree
- :type root: ElementTree Element
- :param namespaces: Namespaces of the tree
- :type namespaces: str
-
- """
- fields = ["TARGET", "REASON", "PATH", "APPID", "VALUE"]
- response = ""
- for elt in fields:
- msg = self._check_for_tag(root, namespaces, elt)
- if msg is not None:
- response = response + " " + msg.text + " :"
- deb = self._check_for_tag(root, namespaces, "MESSAGE")
- if deb is not None:
- msg = response + " " + deb.text
- self.debug(msg)
- else :
- self.info(response)
def handle_omf_message(self, iq):
""" Handle published/received message
:type iq: Iq Stanza
"""
- namespaces = "{http://jabber.org/protocol/pubsub}"
- for i in iq['pubsub_event']['items']:
- root = ET.fromstring(str(i))
- self._check_output(root, namespaces)
-
+ self._parser.handle(iq)