update OMF 6 and test it. It works once but not twice
authorJulien Tribino <julien.tribino@inria.fr>
Fri, 7 Mar 2014 09:22:14 +0000 (10:22 +0100)
committerJulien Tribino <julien.tribino@inria.fr>
Fri, 7 Mar 2014 09:22:14 +0000 (10:22 +0100)
src/nepi/resources/omf/application6.py [new file with mode: 0644]
src/nepi/resources/omf/messages_6.py [new file with mode: 0644]
src/nepi/resources/omf/node6.py [new file with mode: 0644]
src/nepi/resources/omf/omf6_api.py [new file with mode: 0644]
src/nepi/resources/omf/omf6_parser.py [new file with mode: 0644]
src/nepi/resources/omf/omf6_resource.py [new file with mode: 0644]
src/nepi/resources/omf/omf_client.py

diff --git a/src/nepi/resources/omf/application6.py b/src/nepi/resources/omf/application6.py
new file mode 100644 (file)
index 0000000..549c030
--- /dev/null
@@ -0,0 +1,222 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#         Julien Tribino <julien.tribino@inria.fr>
+
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay
+from nepi.execution.attribute import Attribute, Flags 
+from nepi.resources.omf.omf6_resource import OMF6Resource
+from nepi.resources.omf.node6 import OMF6Node
+from nepi.resources.omf.omf6_api import OMF6APIFactory
+
+import os, time
+from nepi.util import sshfuncs
+
+@clsinit_copy
+class OMF6Application(OMF6Resource):
+    """
+    .. class:: Class Args :
+      
+        :param ec: The Experiment controller
+        :type ec: ExperimentController
+        :param guid: guid of the RM
+        :type guid: int
+
+    """
+    _rtype = "OMF6Application"
+    _authorized_connections = ["OMF6Node"]
+
+    @classmethod
+    def _register_attributes(cls):
+        """ Register the attributes of an OMF application
+
+        """
+        command = Attribute("command", "Command to execute")
+        env = Attribute("env", "Environnement variable of the application")
+#        sources = Attribute("sources", "Sources of the application", 
+#                     flags = Flags.ExecReadOnly)
+#        sshuser = Attribute("sshUser", "user to connect with ssh", 
+#                     flags = Flags.ExecReadOnly)
+#        sshkey = Attribute("sshKey", "key to use for ssh", 
+#                     flags = Flags.ExecReadOnly)
+        cls._register_attribute(command)
+        cls._register_attribute(env)
+#        cls._register_attribute(sources)
+#        cls._register_attribute(sshuser)
+#        cls._register_attribute(sshkey)
+
+    def __init__(self, ec, guid):
+        """
+        :param ec: The Experiment controller
+        :type ec: ExperimentController
+        :param guid: guid of the RM
+        :type guid: int
+        :param creds: Credentials to communicate with the rm (XmppClient for OMF)
+        :type creds: dict
+
+        """
+        super(OMF6Application, self).__init__(ec, guid)
+
+        self.set('command', "")
+        self.set('env', "")
+
+        self._node = None
+        self._topic_app = None
+
+        self._omf_api = None
+
+    @property
+    def exp_id(self):
+        return self.ec.exp_id
+
+    @property
+    def node(self):
+        rm_list = self.get_connected(OMF6Node.get_rtype())
+        if rm_list: return rm_list[0]
+        return None
+
+    def valid_connection(self, guid):
+        """ Check if the connection with the guid in parameter is possible. 
+        Only meaningful connections are allowed.
+
+        :param guid: Guid of RM it will be connected
+        :type guid: int
+        :rtype:  Boolean
+
+        """
+        rm = self.ec.get_resource(guid)
+        if rm.get_rtype() not in self._authorized_connections:
+            msg = ("Connection between %s %s and %s %s refused: "
+                    "An Application can be connected only to a Node" ) % \
+                (self.get_rtype(), self._guid, rm.get_rtype(), guid)
+            self.debug(msg)
+
+            return False
+
+        elif len(self.connections) != 0 :
+            msg = ("Connection between %s %s and %s %s refused: "
+                    "This Application is already connected" ) % \
+                (self.get_rtype(), self._guid, rm.get_rtype(), guid)
+            self.debug(msg)
+
+            return False
+
+        else :
+            msg = "Connection between %s %s and %s %s accepted" % (
+                    self.get_rtype(), self._guid, rm.get_rtype(), guid)
+            self.debug(msg)
+
+            return True
+
+    def do_deploy(self):
+        """ Deploy the RM. It means nothing special for an application 
+        for now (later it will be upload sources, ...)
+        It becomes DEPLOYED after getting the xmpp client.
+
+        """
+
+        self.set('xmppUser',self.node.get('xmppUser'))
+        self.set('xmppHost',self.node.get('xmppHost'))
+        self.set('xmppPort',self.node.get('xmppPort'))
+        self.set('xmppPassword',self.node.get('xmppPassword'))
+
+        if not (self.get('xmppUser') and self.get('xmppHost')
+              and self.get('xmppPort') and self.get('xmppPassword')):
+            msg = "Credentials are not initialzed. XMPP Connections impossible"
+            self.error(msg)
+            raise RuntimeError, msg
+
+        if not self._omf_api :
+            self._omf_api = OMF6APIFactory.get_api(self.get('xmppHost'), 
+                self.get('xmppUser'), self.get('xmppPort'), 
+                self.get('xmppPassword'), exp_id = self.exp_id)
+
+#        if self.get('sources'):
+#            gateway = ResourceGateway.AMtoGateway[self.get('xmppHost')]
+#            user = self.get('sshUser') or self.get('xmppSlice')
+#            dst = user + "@"+ gateway + ":"
+#            (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst)
+         
+        self._topic_app = self.node.get('hostname') +'_'+ str(self.guid) +'_app'
+
+        self._omf_api.enroll_topic(self._topic_app)
+
+        props = {}
+        if self.get('command'):
+            props['application:binary_path'] = self.get('command')
+            props['application:hrn'] = self.get('command')
+            props['application:membership'] = self._topic_app
+        props['application:type'] = "application"
+        self._omf_api.frcp_create( self.node.get('hostname'), "application", props = props)
+
+
+
+        super(OMF6Application, self).do_deploy()
+
+    def do_start(self):
+        """ Start the RM. It means : Send Xmpp Message Using OMF protocol 
+         to execute the application. 
+         It becomes STARTED before the messages are sent (for coordination)
+
+        """
+        if not self.get('command') :
+            msg = "Application's Command is not initialized"
+            self.error(msg)
+            raise RuntimeError, msg
+
+        if not self.get('env'):
+            self.set('env', " ")
+
+        props = {}
+        props['state'] = "running"
+
+        guards = {}
+        guards['type'] = "application"
+        guards['name'] = self.get('command')
+        time.sleep(2)
+        self._omf_api.frcp_configure(self._topic_app, props = props, guards = guards )
+
+
+        super(OMF6Application, self).do_start()
+
+    def do_stop(self):
+        """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to 
+        kill the application. 
+        State is set to STOPPED after the message is sent.
+
+        """
+
+        super(OMF6Application, self).do_stop()
+
+    def do_release(self):
+        """ Clean the RM at the end of the experiment and release the API.
+
+        """
+        props = {}
+        props['frcp:type'] = "application"
+
+        self._omf_api.frcp_release(self.node.get('hostname'),self._topic_app, props = props )
+
+        if self._omf_api:
+            OMF6APIFactory.release_api(self.get('xmppHost'), 
+                self.get('xmppUser'), self.get('xmppPort'), 
+                self.get('xmppPassword'), exp_id = self.exp_id)
+
+        super(OMF6Application, self).do_release()
+
diff --git a/src/nepi/resources/omf/messages_6.py b/src/nepi/resources/omf/messages_6.py
new file mode 100644 (file)
index 0000000..3a024b4
--- /dev/null
@@ -0,0 +1,184 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#         Julien Tribino <julien.tribino@inria.fr>
+
+from xml.etree import cElementTree as ET
+
+class MessageHandler():
+    """
+    .. class:: Class Args :
+      
+        :param sliceid: Slice Name (= Xmpp Slice)
+        :type expid: str
+        :param expid: Experiment ID (= Xmpp User)
+        :type expid: str
+
+    .. note::
+
+       This class is used only for OMF 5.4 Protocol and is going to become unused
+
+    """
+
+    def __init__(self):
+        """
+        
+        """
+        pass
+
+    def _type_element(self, type_elt, xmlns, msg_id):
+        """ Insert a markup element with an id
+
+        """
+        elt = ET.Element(type_elt)
+        elt.set("xmlns", xmlns)
+        elt.set("mid", msg_id)
+        return elt
+
+    
+
+    def _attr_element(self, parent, markup, text, type_key=None, type_value = None):
+        """ Insert a markup element with a text (value)
+
+        :param parent: Parent element in an XML point of view
+        :type parent: ElementTree Element
+        :param markup: Name of the markup
+        :type markup: str
+        :param text: Value of the markup element
+        :type text: str
+
+        """
+        elt = ET.SubElement(parent, markup)
+        if type_key and type_value:
+            elt.set(type_key, type_value)
+        elt.text = text
+        return elt
+
+    def _id_element(self, parent, markup, key, value):
+        """ Insert a markup element with a text (value)
+
+        :param parent: Parent element in an XML point of view
+        :type parent: ElementTree Element
+        :param markup: Name of the markup
+        :type markup: str
+        :param text: Value of the markup element
+        :type text: str
+
+        """
+        elt = ET.SubElement(parent, markup)
+        elt.set(key, value)
+        return elt
+
+    def create_function(self, msg_id, src, rtype, timestamp, props = None, guards = None):
+        """ Build a create message
+        """
+        payload = self._type_element("create", "http://schema.mytestbed.net/omf/6.0/protocol", msg_id )
+        self._attr_element(payload,"src",src)
+        self._attr_element(payload,"ts",timestamp)
+        self._attr_element(payload,"rtype",rtype)
+
+        if props :
+            if rtype == "application" :
+                properties = self._id_element(payload,"props","xmlns:application",
+                      "http://schema.mytestbed.net/omf/6.0/protocol/application")
+            else:
+                properties = self._attr_element(payload,"props","")
+
+            for prop in props.keys():
+                self._attr_element(properties,prop,props[prop],type_key="type", type_value = "string")
+
+        if guards :
+            guardians = self._attr_element(payload,"guard","")
+            for guard in guards.keys():
+                self._attr_element(guardians,guard,guards[guard],type_key="type", type_value = "string")
+
+        return payload
+
+    def configure_function(self, msg_id, src, timestamp, props = None, guards = None):
+        """ Build a configure message
+        """
+        payload = self._type_element("configure", "http://schema.mytestbed.net/omf/6.0/protocol", msg_id )
+        self._attr_element(payload,"src",src)
+        self._attr_element(payload,"ts",timestamp)
+
+        if props :
+            properties = self._attr_element(payload,"props","")
+            for prop in props.keys():
+                self._attr_element(properties,prop,props[prop],type_key="type", type_value = "symbol")
+
+        if guards :
+            guardians = self._attr_element(payload,"guard","")
+            for guard in guards.keys():
+                self._attr_element(guardians,guard,guards[guard],type_key="type", type_value = "string")
+
+        return payload
+
+    def request_function(self, msg_id, src, timestamp,  props = None, guards = None):
+        """ Build a request message
+
+        """
+        payload = self._type_element("request", "http://schema.mytestbed.net/omf/6.0/protocol", msg_id )
+        self._attr_element(payload,"src",src)
+        self._attr_element(payload,"ts",timestamp)
+
+        if props :
+            properties = self._attr_element(payload,"props","")
+            for prop in props.keys():
+                self._attr_element(properties,prop,props[prop])
+
+        if guards :
+            guardians = self._attr_element(payload,"guard","")
+            for guard in guards.keys():
+                self._attr_element(guardians,guard,guards[guard])
+        return payload
+
+    def inform_function(self, msg_id, src, timestamp, cid, itype):
+        """ Build an inform message
+
+        """
+        payload = self._type_element("inform", "http://schema.mytestbed.net/omf/6.0/protocol", msg_id )
+        sliceid = self._attr_element(payload,"src",src)
+        expid = self._attr_element(config,"ts",timestamp)
+        target = self._attr_element(config,"cid",cid)
+        value = self._attr_element(config,"itype",value)
+        path = self._attr_element(config,"properties",path)
+        return payload
+
+    def release_function(self, msg_id, src, timestamp, res_id = None, props = None, guards = None):
+        """ Build a release message
+
+        """
+        payload = self._type_element("release", "http://schema.mytestbed.net/omf/6.0/protocol", msg_id )
+        self._attr_element(payload,"src",src)
+        self._attr_element(payload,"ts",timestamp)
+        if res_id :
+            self._attr_element(payload,"res_id",timestamp)
+        if props :
+            properties = self._id_element(payload,"props","xmlns:frcp",
+                      "http://schema.mytestbed.net/omf/6.0/protocol")
+            for prop in props.keys():
+                self._attr_element(properties,prop,props[prop])
+
+        if guards :
+            guardians = self._attr_element(payload,"guard","")
+            for guard in guards.keys():
+                self._attr_element(guardians,guard,guards[guard])
+
+        return payload
+
diff --git a/src/nepi/resources/omf/node6.py b/src/nepi/resources/omf/node6.py
new file mode 100644 (file)
index 0000000..6f8a144
--- /dev/null
@@ -0,0 +1,134 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#         Julien Tribino <julien.tribino@inria.fr>
+
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay
+from nepi.execution.attribute import Attribute, Flags 
+from nepi.resources.omf.omf6_resource import OMF6Resource
+from nepi.resources.omf.omf6_api import OMF6APIFactory
+
+import time
+
+@clsinit_copy
+class OMF6Node(OMF6Resource):
+    """
+    .. class:: Class Args :
+      
+        :param ec: The Experiment controller
+        :type ec: ExperimentController
+        :param guid: guid of the RM
+        :type guid: int
+        :param creds: Credentials to communicate with the rm (XmppClient for OMF)
+        :type creds: dict
+
+    """
+    _rtype = "OMF6Node"
+    _authorized_connections = ["OMF6Application" , "OMFWifiInterface"]
+
+    @classmethod
+    def _register_attributes(cls):
+        """Register the attributes of an OMF Node
+
+        """
+        hostname = Attribute("hostname", "Hostname of the machine")
+
+        cls._register_attribute(hostname)
+
+    # XXX: We don't necessary need to have the credentials at the 
+    # moment we create the RM
+    def __init__(self, ec, guid):
+        """
+        :param ec: The Experiment controller
+        :type ec: ExperimentController
+        :param guid: guid of the RM
+        :type guid: int
+
+        """
+        super(OMF6Node, self).__init__(ec, guid)
+
+        self._omf_api = None 
+
+    @property
+    def exp_id(self):
+        return self.ec.exp_id
+
+    def valid_connection(self, guid):
+        """ Check if the connection with the guid in parameter is possible. 
+        Only meaningful connections are allowed.
+
+        :param guid: Guid of the current RM
+        :type guid: int
+        :rtype:  Boolean
+
+        """
+        rm = self.ec.get_resource(guid)
+        if rm.get_rtype() in self._authorized_connections:
+            msg = "Connection between %s %s and %s %s accepted" % (
+                    self.get_rtype(), self._guid, rm.get_rtype(), guid)
+            self.debug(msg)
+
+            return True
+
+        msg = "Connection between %s %s and %s %s refused" % (
+                self.get_rtype(), self._guid, rm.get_rtype(), guid)
+        self.debug(msg)
+
+        return False
+
+    def do_deploy(self):
+        """ Deploy the RM. It means : Send Xmpp Message Using OMF protocol 
+            to enroll the node into the experiment.
+            It becomes DEPLOYED after sending messages to enroll the node
+
+        """ 
+        if not (self.get('xmppUser') and self.get('xmppHost')
+              and self.get('xmppPort') and self.get('xmppPassword')):
+            msg = "Credentials are not initialzed. XMPP Connections impossible"
+            self.error(msg)
+            raise RuntimeError, msg
+
+        if not self._omf_api :
+            self._omf_api = OMF6APIFactory.get_api(self.get('xmppHost'), 
+                self.get('xmppUser'), self.get('xmppPort'), 
+                self.get('xmppPassword'), exp_id = self.exp_id)
+
+        if not self.get('hostname') :
+            msg = "Hostname's value is not initialized"
+            self.error(msg)
+            raise RuntimeError, msg
+
+        self._omf_api.enroll_topic(self.get('hostname'))
+
+        super(OMF6Node, self).do_deploy()
+
+    def do_release(self):
+        """ Clean the RM at the end of the experiment
+
+        """
+        if self._omf_api:
+            # Should be deleted from the RC
+            #self._omf_api.frcp_release(self.get('hostname'))
+
+            OMF6APIFactory.release_api(self.get('xmppHost'), 
+                self.get('xmppUser'), self.get('xmppPort'), 
+                self.get('xmppPassword'), exp_id = self.exp_id)
+
+        super(OMF6Node, self).do_release()
+
diff --git a/src/nepi/resources/omf/omf6_api.py b/src/nepi/resources/omf/omf6_api.py
new file mode 100644 (file)
index 0000000..5ec13af
--- /dev/null
@@ -0,0 +1,333 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#         Julien Tribino <julien.tribino@inria.fr>
+
+import ssl
+import sys
+import time
+import hashlib
+import threading
+
+from nepi.util.timefuncs import tsformat 
+import os
+
+from nepi.util.logger import Logger
+
+from nepi.resources.omf.omf_client import OMFClient
+from nepi.resources.omf.messages_6 import MessageHandler
+
+class OMF6API(Logger):
+    """
+    .. class:: Class Args :
+      
+        :param slice: Xmpp Slice
+        :type slice: str
+        :param host: Xmpp Server
+        :type host: str
+        :param port: Xmpp Port
+        :type port: str
+        :param password: Xmpp password
+        :type password: str
+        :param xmpp_root: Root of the Xmpp Topic Architecture
+        :type xmpp_root: str
+
+    .. note::
+
+       This class is the implementation of an OMF 5.4 API. 
+       Since the version 5.4.1, the Topic Architecture start with OMF_5.4 
+       instead of OMF used for OMF5.3
+
+    """
+    def __init__(self, host, user = "nepi", port="5222", password="1234",
+            exp_id = None):
+        """
+    
+        :param slice: Xmpp Slice
+        :type slice: str
+        :param host: Xmpp Server
+        :type host: str
+        :param port: Xmpp Port
+        :type port: str
+        :param password: Xmpp password
+        :type password: str
+        :param xmpp_root: Root of the Xmpp Topic Architecture
+        :type xmpp_root: str
+
+        """
+        super(OMF6API, self).__init__("OMF6API")
+        self._exp_id = exp_id
+        self._user = user # name of the machine that run Nepi
+        self._host = host # name of the xmpp server
+        self._port = port # port of the xmpp server
+        self._password = password # password to connect to xmpp
+        self._jid = "%s-%s@%s" % (self._user, self._exp_id, self._host)
+        self._src = "xmpp://" + self._jid
+        
+        self._topics = []
+
+        # OMF xmpp client
+        self._client = None
+
+        # message handler
+        self._message = None
+
+        if sys.version_info < (3, 0):
+            reload(sys)
+            sys.setdefaultencoding('utf8')
+
+        # instantiate the xmpp client
+        self._init_client()
+
+        # register nepi topic
+        self._enroll_nepi()
+
+
+    def _init_client(self):
+        """ Initialize XMPP Client
+
+        """
+        xmpp = OMFClient(self._jid, self._password)
+        # PROTOCOL_SSLv3 required for compatibility with OpenFire
+        xmpp.ssl_version = ssl.PROTOCOL_SSLv3
+
+        if xmpp.connect((self._host, self._port)):
+            xmpp.process(block=False)
+            self.check_ready(xmpp)
+            self._client = xmpp
+            self._message = MessageHandler()
+        else:
+            msg = "Unable to connect to the XMPP server."
+            self.error(msg)
+            raise RuntimeError(msg)
+
+    def check_ready(self, xmpp):
+        delay = 1.0
+        for i in xrange(4):
+            if xmpp.ready:
+                break
+            else:
+                time.sleep(delay)
+                delay = delay * 1.5
+        else:
+            msg = "XMPP Client is not ready after long time"
+            self.error(msg, out, err)
+            raise RuntimeError, msg
+
+    @property
+    def _nepi_topic(self):
+        msg = "nepi-" + self._exp_id
+        self.debug(msg)
+        return msg
+
+    def _enroll_nepi(self):
+        """ Create and Subscribe to the Session Topic
+
+        """
+        nepi_topic = self._nepi_topic
+        self._client.create(nepi_topic)
+        self._client.subscribe(nepi_topic)
+
+
+    def enroll_topic(self, topic):
+        """ Create and Subscribe to the session topic and the resources
+            corresponding to the hostname
+
+        :param hostname: Full hrn of the node
+        :type hostname: str
+
+        """
+        if topic in self._topics:
+            return 
+
+        self._topics.append(topic)
+
+#        try :
+        self._client.create(topic)
+#        except:
+#            msg = "Topic already existing"
+#            self.info(msg)
+        self._client.subscribe(topic)
+
+    def frcp_inform(self, topic, cid, itype):
+        """ Configure attribute on the node
+
+        """
+        msg_id = os.urandom(16).encode('hex')
+        timestamp = tsformat()
+        payload = self._message.inform_function(msg_id, self._src, timestamp, props = props ,guards = guards) 
+        
+        self._client.publish(payload, xmpp_node)
+
+    def frcp_configure(self, topic, props = None, guards = None ):
+        """ Configure attribute on the node
+
+        """
+        msg_id = os.urandom(16).encode('hex')
+        timestamp = tsformat()
+        payload = self._message.configure_function(msg_id, self._src, timestamp ,props = props ,guards = guards) 
+        self._client.publish(payload, topic)
+
+    
+    def frcp_create(self, topic, rtype, props = None, guards = None ):
+        """ Send to the stdin of the application the value
+
+        """
+        msg_id = os.urandom(16).encode('hex')
+        timestamp = tsformat()
+        payload = self._message.create_function(msg_id, self._src, rtype, timestamp , props = props ,guards = guards) 
+        self._client.publish(payload, topic)
+
+
+    def frcp_request(self, topic, props = None, guards = None ):
+        """ Execute command on the node
+
+        """
+        msg_id = os.urandom(16).encode('hex')
+        timestamp = tsformat()
+        payload = self._message.request_function(msg_id, self._src, timestamp, props = props ,guards = guards) 
+        self._client.publish(payload, xmpp_node)
+
+    def frcp_release(self, parent, child, res_id = None, props = None, guards = None ):
+        """ Delete the session and logger topics. Then disconnect 
+
+        """
+        msg_id = os.urandom(16).encode('hex')
+        timestamp = tsformat()
+        payload = self._message.release_function(msg_id, self._src, timestamp, res_id = res_id, props = props ,guards = guards) 
+        self._client.publish(payload, parent)
+
+        if child in self._topics:
+            self._topics.remove(child)
+
+        self._client.delete(child)
+
+    def disconnect(self) :
+        """ Delete the session and logger topics. Then disconnect 
+
+        """
+        self._client.delete(self._nepi_topic)
+
+        #XXX Why there is a sleep there ?
+        time.sleep(1)
+        
+        # Wait the send queue to be empty before disconnect
+        self._client.disconnect(wait=True)
+        msg = " Disconnected from XMPP Server"
+        self.debug(msg)
+
+
+class OMF6APIFactory(object):
+    """ 
+    .. note::
+
+        It allows the different RM to use the same xmpp client if they use 
+        the same credentials.  For the moment, it is focused on XMPP.
+
+    """
+    # use lock to avoid concurrent access to the Api list at the same times by 2 
+    # different threads
+    lock = threading.Lock()
+    _apis = dict()
+
+    @classmethod 
+    def get_api(cls, host, user, port, password, exp_id = None):
+        """ Get an OMF Api
+
+        :param slice: Xmpp Slice Name
+        :type slice: str
+        :param host: Xmpp Server Adress
+        :type host: str
+        :param port: Xmpp Port (Default : 5222)
+        :type port: str
+        :param password: Xmpp Password
+        :type password: str
+
+        """
+        if host and user and port and password:
+            key = cls._make_key(host, user, port, password, exp_id)
+            cls.lock.acquire()
+            if key in cls._apis:
+                #print "Api Counter : " + str(cls._apis[key]['cnt'])
+                cls._apis[key]['cnt'] += 1
+                cls.lock.release()
+                return cls._apis[key]['api']
+            else :
+                omf_api = cls.create_api(host, user, port, password, exp_id)
+                cls.lock.release()
+                return omf_api
+        return None
+
+    @classmethod 
+    def create_api(cls, host, user, port, password, exp_id):
+        """ Create an OMF API if this one doesn't exist yet with this credentials
+
+        :param slice: Xmpp Slice Name
+        :type slice: str
+        :param host: Xmpp Server Adress
+        :type host: str
+        :param port: Xmpp Port (Default : 5222)
+        :type port: str
+        :param password: Xmpp Password
+        :type password: str
+
+        """
+        omf_api = OMF6API(host, user = user, port = port, password = password, exp_id = exp_id)
+        key = cls._make_key(host, user, port, password, exp_id)
+        cls._apis[key] = {}
+        cls._apis[key]['api'] = omf_api
+        cls._apis[key]['cnt'] = 1
+        return omf_api
+
+    @classmethod 
+    def release_api(cls, host, user, port, password, exp_id = None):
+        """ Release an OMF API with this credentials
+
+        :param slice: Xmpp Slice Name
+        :type slice: str
+        :param host: Xmpp Server Adress
+        :type host: str
+        :param port: Xmpp Port (Default : 5222)
+        :type port: str
+        :param password: Xmpp Password
+        :type password: str
+
+        """
+        if host and user and port and password:
+            key = cls._make_key(host, user, port, password, exp_id)
+            if key in cls._apis:
+                cls._apis[key]['cnt'] -= 1
+                #print "Api Counter : " + str(cls._apis[key]['cnt'])
+                if cls._apis[key]['cnt'] == 0:
+                    omf_api = cls._apis[key]['api']
+                    omf_api.disconnect()
+
+
+    @classmethod 
+    def _make_key(cls, *args):
+        """ Hash the credentials in order to create a key
+
+        :param args: list of arguments used to create the hash (user, host, port, ...)
+        :type args: list of args
+
+        """
+        skey = "".join(map(str, args))
+        return hashlib.md5(skey).hexdigest()
+
+
+
diff --git a/src/nepi/resources/omf/omf6_parser.py b/src/nepi/resources/omf/omf6_parser.py
new file mode 100644 (file)
index 0000000..9029ce2
--- /dev/null
@@ -0,0 +1,201 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#         Julien Tribino <julien.tribino@inria.fr>
+
+from nepi.util.logger import Logger
+
+import traceback
+import xml.etree.ElementTree as ET
+
+# inherit from BaseXmpp and XMLstream classes
+class OMF6Parser(Logger): 
+    """
+    .. class:: Class Args :
+      
+        :param jid: Jabber Id (= Xmpp Slice + Date)
+        :type jid: str
+        :param password: Jabber Password (= Xmpp Password)
+        :type password: str
+
+    .. note::
+
+       This class is an XMPP Client with customized method
+
+    """
+
+    def __init__(self):
+        """
+
+        :param jid: Jabber Id (= Xmpp Slice + Date)
+        :type jid: str
+        :param password: Jabber Password (= Xmpp Password)
+        :type password: str
+
+
+        """
+        super(OMF6Parser, self).__init__("OMF6API")
+
+        
+  
+    def _check_for_tag(self, root, namespaces, tag):
+        """  Check if an element markup is in the ElementTree
+
+        :param root: Root of the tree
+        :type root: ElementTree Element
+        :param namespaces: Namespaces of the element
+        :type namespaces: str
+        :param tag: Tag that will search in the tree
+        :type tag: str
+
+        """
+        for element in root.iter(namespaces+tag):
+            if element.text:
+                return element.text
+            else : 
+                return None
+
+    def _check_for_props(self, root, namespaces):
+        """  Check if an element markup is in the ElementTree
+
+        :param root: Root of the tree
+        :type root: ElementTree Element
+        :param namespaces: Namespaces of the element
+        :type namespaces: str
+
+        """
+        props = {}
+        for properties in root.iter(namespaces+'props'):
+            for element in properties.iter():
+                if element.tag and element.text:
+                    props[element.tag] = element.text
+        return props
+
+    def _check_for_membership(self, root, namespaces):
+        """  Check if an element markup is in the ElementTree
+
+        :param root: Root of the tree
+        :type root: ElementTree Element
+        :param namespaces: Namespaces of the element
+        :type namespaces: str
+
+        """
+        for element in root.iter(namespaces+'membership'):
+            for elt in element.iter(namespaces+'it'):
+                ##XXX : change
+                return elt.text
+
+
+    def _check_output(self, root, namespaces):
+        """ Check the significative element in the answer and display it
+
+        :param root: Root of the tree
+        :type root: ElementTree Element
+        :param namespaces: Namespaces of the tree
+        :type namespaces: str
+
+        """
+        fields = ["TARGET", "REASON", "PATH", "APPID", "VALUE"]
+        response = ""
+        for elt in fields:
+            msg = self._check_for_tag(root, namespaces, elt)
+            if msg is not None:
+                response = response + " " + msg.text + " :"
+        deb = self._check_for_tag(root, namespaces, "MESSAGE")
+        if deb is not None:
+            msg = response + " " + deb.text
+            self.debug(msg)
+        else :
+            self.info(response)
+
+
+    def _inform_creation_ok(self, root, namespaces):
+        uid = self._check_for_tag(root, namespaces, "uid")
+        member = self._check_for_membership(root, namespaces)
+        binary_path = self._check_for_tag(root, namespaces, "binary_path")
+        msg = "CREATION OK -- "
+        if binary_path :
+            msg = msg + "The resource : '"+binary_path
+        if uid :
+            msg = msg + "' is listening to the topics : '"+ uid
+        if member :
+            msg = msg + "' and '"+ member +"'"
+        self.info(msg)
+
+    def _inform_creation_failed(self, root, namespaces):
+        reason = self._check_for_tag(root, namespaces, "reason")
+        msg = "CREATION FAILED - The reason : "+reason
+        self.error(msg)
+
+    def _inform_status(self, root, namespaces):
+        props = self._check_for_props(root, namespaces)
+        msg = "STATUS -- "
+        for elt in props.keys():
+            ns, tag = elt.split('}')
+            if tag == "it":
+                msg = msg + "membership : " + props[elt]+" -- "
+            else:
+                msg = msg + tag +" : " + props[elt]+" -- "
+        msg = msg + " STATUS "
+        self.info(msg)
+
+    def _inform_released(self, root, namespaces):
+        parent_id = self._check_for_tag(root, namespaces, "src")
+        child_id = self._check_for_tag(root, namespaces, "res_id")
+        msg = "RELEASED - The resource : '"+res_id+ \
+              "' has been released by : '"+ src
+        self.info(msg)
+
+    def _inform_error(self, root, namespaces):
+        reason = self._check_for_tag(root, namespaces, "reason")
+        msg = "The reason : "+reason
+        self.error(msg)
+
+    def _inform_warn(self, root, namespaces):
+        reason = self._check_for_tag(root, namespaces, "reason")
+        msg = "The reason : "+reason
+        self.warn(msg)
+
+    def _parse_inform(self, root, namespaces):
+        """ Check the significative element in the answer and display it
+
+        :param root: Root of the tree
+        :type root: ElementTree Element
+        :param namespaces: Namespaces of the tree
+        :type namespaces: str
+
+        """
+        itype = self._check_for_tag(root, namespaces, "itype")
+        if itype :
+            method_name = '_inform_'+ itype.replace('.', '_').lower()
+            method = getattr(self, method_name)
+            if method :
+                method(root, namespaces)
+            else :
+                msg = "There is no method to parse the response of the type " + itype
+                self.info(msg)
+                return
+        
+
+    def handle(self, iq):
+        namespaces = "{http://schema.mytestbed.net/omf/6.0/protocol}"
+        for i in iq['pubsub_event']['items']:
+            root = ET.fromstring(str(i))
+            #ET.dump(root)
+            self._parse_inform(root, namespaces)
+
diff --git a/src/nepi/resources/omf/omf6_resource.py b/src/nepi/resources/omf/omf6_resource.py
new file mode 100644 (file)
index 0000000..c92f86d
--- /dev/null
@@ -0,0 +1,53 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Julien Tribino <julien.tribino@inria.fr>
+#         Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
+
+from nepi.execution.attribute import Attribute, Flags, Types
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay
+
+
+@clsinit_copy
+class OMF6Resource(ResourceManager):
+    """
+    Generic resource gathering XMPP credential information and common methods
+    for OMF nodes, channels, applications, etc.
+    """
+    _rtype = "OMFResource"
+
+    @classmethod
+    def _register_attributes(cls):
+
+        xmppHost = Attribute("xmppHost", "Xmpp Server",
+            flags = Flags.Credential)
+        xmppUser = Attribute("xmppUser", "Xmpp User")
+        xmppPort = Attribute("xmppPort", "Xmpp Port",
+            flags = Flags.Credential)
+        xmppPassword = Attribute("xmppPassword", "Xmpp Password",
+                flags = Flags.Credential)
+
+        cls._register_attribute(xmppHost)
+        cls._register_attribute(xmppUser)
+        cls._register_attribute(xmppPort)
+        cls._register_attribute(xmppPassword)
+
+    def __init__(self, ec, guid):
+        super(OMF6Resource, self).__init__(ec, guid)
+        pass
+
index 396e2d7..ea53f60 100644 (file)
@@ -19,7 +19,7 @@
 #         Julien Tribino <julien.tribino@inria.fr>
 
 from nepi.util.logger import Logger
-
+from nepi.resources.omf.omf6_parser import OMF6Parser
 
 try:
     import sleekxmpp
@@ -71,6 +71,9 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger):
         self._ready = False
         self._registered = False
         self._server = None
+        self._parser = None
+
+
 
         self.register_plugin('xep_0077') # In-band registration
         self.register_plugin('xep_0030')
@@ -80,7 +83,16 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger):
         self.add_event_handler("session_start", self.start)
         self.add_event_handler("register", self.register)
         self.add_event_handler("pubsub_publish", self.handle_omf_message)
+
+        #Init the parser
+        self._init_parser()
         
+    def _init_parser(self):
+        """ Init the parser depending on the OMF Version
+
+        """
+        self._parser = OMF6Parser()
+
     @property
     def ready(self):
         """ Check if the client is ready
@@ -190,8 +202,9 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger):
         try:
             self['xep_0060'].create_node(self._server, node, config = config)
         except:
-            error = traceback.format_exc()
-            msg = ' Could not create topic: %s\ntraceback:\n%s' % (node, error)
+            #error = traceback.format_exc()
+            #msg = ' Could not create topic: %s\ntraceback:\n%s' % (node, error)
+            msg = 'Could not create the topic : '+node+' . Maybe the topic already exists'
             self.error(msg)
 
     def delete(self, node):
@@ -321,44 +334,6 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger):
                     % (self.boundjid.bare, node, error)
             self.error(msg)
 
-    def _check_for_tag(self, root, namespaces, tag):
-        """  Check if an element markup is in the ElementTree
-
-        :param root: Root of the tree
-        :type root: ElementTree Element
-        :param namespaces: Namespaces of the element
-        :type namespaces: str
-        :param tag: Tag that will search in the tree
-        :type tag: str
-
-        """
-        for element in root.iter(namespaces+tag):
-            if element.text:
-                return element
-            else : 
-                return None    
-
-    def _check_output(self, root, namespaces):
-        """ Check the significative element in the answer and display it
-
-        :param root: Root of the tree
-        :type root: ElementTree Element
-        :param namespaces: Namespaces of the tree
-        :type namespaces: str
-
-        """
-        fields = ["TARGET", "REASON", "PATH", "APPID", "VALUE"]
-        response = ""
-        for elt in fields:
-            msg = self._check_for_tag(root, namespaces, elt)
-            if msg is not None:
-                response = response + " " + msg.text + " :"
-        deb = self._check_for_tag(root, namespaces, "MESSAGE")
-        if deb is not None:
-            msg = response + " " + deb.text
-            self.debug(msg)
-        else :
-            self.info(response)
 
     def handle_omf_message(self, iq):
         """ Handle published/received message 
@@ -367,9 +342,5 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger):
         :type iq: Iq Stanza
 
         """
-        namespaces = "{http://jabber.org/protocol/pubsub}"
-        for i in iq['pubsub_event']['items']:
-            root = ET.fromstring(str(i))
-            self._check_output(root, namespaces)
-
+        self._parser.handle(iq)