Add OMF Classes using XMPP and Protocol 5.4
authorJulien Tribino <julien.tribino@inria.fr>
Tue, 26 Mar 2013 09:16:32 +0000 (10:16 +0100)
committerJulien Tribino <julien.tribino@inria.fr>
Tue, 26 Mar 2013 09:16:32 +0000 (10:16 +0100)
13 files changed:
src/neco/__init__.py
src/neco/execution/ec.py
src/neco/execution/resource.py
src/neco/resources/omf/omf_api.py [new file with mode: 0644]
src/neco/resources/omf/omf_application.py [new file with mode: 0644]
src/neco/resources/omf/omf_channel.py [new file with mode: 0644]
src/neco/resources/omf/omf_client.py [new file with mode: 0644]
src/neco/resources/omf/omf_interface.py [new file with mode: 0644]
src/neco/resources/omf/omf_messages_5_4.py [new file with mode: 0644]
src/neco/resources/omf/omf_node.py [new file with mode: 0644]
src/neco/resources/omf/xx_omf_resource.py [new file with mode: 0644]
test/execution/resource.py [changed mode: 0644->0755]
test/resources/omf/omf_vlc_exp.py [new file with mode: 0755]

index df19bd5..f926e6d 100644 (file)
@@ -1,2 +1,5 @@
 import logging
 logging.basicConfig()
+
+LOGLEVEL = logging.DEBUG
+
index 0ac35e7..cb1df2b 100644 (file)
@@ -39,12 +39,12 @@ class ExperimentController(object):
         # TODO
         pass
 
-    def register_resource(self, rtype, guid = None):
+    def register_resource(self, rtype, guid = None, creds = None):
         # Get next available guid
         guid = self._guid_generator.next(guid)
         
         # Instantiate RM
-        rm = ResourceFactory.create(rtype, self, guid)
+        rm = ResourceFactory.create(rtype, self, guid,creds)
 
         # Store RM
         self._resources[guid] = rm
index 39f96c6..d3a1dc4 100644 (file)
@@ -68,10 +68,8 @@ class Resource(object):
         self._attrs = copy.deepcopy(self._attributes)
 
         # Logging
-        loglevel = "debug"
         self._logger = logging.getLogger("neco.execution.resource.Resource.%s" % 
             self.guid)
-        self._logger.setLevel(getattr(logging, loglevel.upper()))
 
     @property
     def guid(self):
@@ -85,6 +83,10 @@ class Resource(object):
         if (self._validate_connection(guid)):
             self._connections.add(guid)
 
+    @property
+    def connections(self):
+        return self._connections
+
     def discover(self, filters):
         pass
 
@@ -130,7 +132,7 @@ class ResourceFactory(object):
         cls._resource_types[rclass.rtype()] = rclass
 
     @classmethod
-    def create(cls, rtype, ec, guid):
-        rclass = cls._resource[rtype]
-        return rclass(ec, guid)
+    def create(cls, rtype, ec, guid, creds):
+        rclass = cls._resource_types[rtype]
+        return rclass(ec, guid, creds)
 
diff --git a/src/neco/resources/omf/omf_api.py b/src/neco/resources/omf/omf_api.py
new file mode 100644 (file)
index 0000000..aca53e5
--- /dev/null
@@ -0,0 +1,180 @@
+import datetime
+import logging
+import ssl
+import sys
+import time
+
+from neco.resources.omf.omf_client import OMFClient
+from neco.resources.omf.omf_messages_5_4 import MessageHandler
+
+class OMFAPI(object):
+    def __init__(self, slice, host, port, password, xmpp_root = None):
+        date = datetime.datetime.now().strftime("%Y-%m-%dt%H.%M.%S")
+        tz = -time.altzone if time.daylight != 0 else -time.timezone
+        date += "%+06.2f" % (tz / 3600) # timezone difference is in seconds
+        self._user = "%s-%s" % (slice, date)
+        self._slice = slice
+        self._host = host
+        self._port = port
+        self._password = password
+        self._hostnames = []
+        self._xmpp_root = xmpp_root or "OMF_5.4"
+
+        self._logger = logging.getLogger("neco.resources.omf")
+
+        # OMF xmpp client
+        self._client = None
+        # message handler
+        self._message = None
+
+        if sys.version_info < (3, 0):
+            reload(sys)
+            sys.setdefaultencoding('utf8')
+
+        # instantiate the xmpp client
+        self._init_client()
+
+        # register xmpp nodes for the experiment
+        self._enroll_experiment()
+        self._enroll_newexperiment()
+
+        # register xmpp logger for the experiment
+        self._enroll_logger()
+
+    def _init_client(self):
+        jid = "%s@%s" % (self._user, self._host)
+        xmpp = OMFClient(jid, self._password)
+        # PROTOCOL_SSLv3 required for compatibility with OpenFire
+        xmpp.ssl_version = ssl.PROTOCOL_SSLv3
+
+        if xmpp.connect((self._host, self._port)):
+            xmpp.process(threaded=True)
+            while not xmpp.ready:
+                time.sleep(1)
+            self._client = xmpp
+            self._message = MessageHandler(self._slice, self._user)
+        else:
+            msg = "Unable to connect to the XMPP server."
+            self._logger.error(msg)
+            raise RuntimeError(msg)
+
+    def _enroll_experiment(self):
+        xmpp_node = self._exp_session_id
+        self._client.create(xmpp_node)
+        #print "Create experiment sesion id topics !!" 
+        self._client.subscribe(xmpp_node)
+        #print "Subscribe to experiment sesion id topics !!" 
+
+
+    def _enroll_newexperiment(self):
+        address = "/%s/%s/%s/%s" % (self._host, self._xmpp_root, self._slice, self._user)
+        print address
+        payload = self._message.newexpfunction(self._user, address)
+        slice_sid = "/%s/%s" % (self._xmpp_root, self._slice)
+        self._client.publish(payload, slice_sid)
+
+    def _enroll_logger(self):
+        xmpp_node = self._logger_session_id
+        self._client.create(xmpp_node)
+        self._client.subscribe(xmpp_node)
+
+        payload = self._message.logfunction("2", 
+                "nodeHandler::NodeHandler", 
+                "INFO", 
+                "OMF Experiment Controller 5.4 (git 529a626)")
+        self._client.publish(payload, xmpp_node)
+
+    def _host_session_id(self, hostname):
+        return "/%s/%s/%s/%s" % (self._xmpp_root, self._slice, self._user, hostname)
+
+    def _host_resource_id(self, hostname):
+        return "/%s/%s/resources/%s" % (self._xmpp_root, self._slice, hostname)
+
+    @property
+    def _exp_session_id(self):
+        return "/%s/%s/%s" % (self._xmpp_root, self._slice, self._user)
+
+    @property
+    def _logger_session_id(self):
+        return "/%s/%s/%s/LOGGER" % (self._xmpp_root, self._slice, self._user)
+
+    def delete(self, hostname):
+        if not hostname in self._hostnames:
+            return
+
+        self._hostnames.remove(hostname)
+
+        xmpp_node = self._host_session_id(hostname)
+        self._client.delete(xmpp_node)
+
+    def enroll_host(self, hostname):
+        if hostname in self._hostnames:
+            return 
+
+        self._hostnames.append(hostname)
+
+        xmpp_node =  self._host_session_id(hostname)
+        self._client.create(xmpp_node)
+        self._client.subscribe(xmpp_node)
+
+        xmpp_node =  self._host_resource_id(hostname)
+        self._client.subscribe(xmpp_node)
+
+        payload = self._message.enrollfunction("1", "*", "1", hostname)
+        self._client.publish(payload, xmpp_node)
+
+    def configure(self, hostname, attribute, value): 
+        payload = self._message.configurefunction(hostname, value, attribute)
+        xmpp_node =  self._host_session_id(hostname)
+        self._client.publish(payload, xmpp_node)
+
+    def execute(self, hostname, app_id, arguments, path, env):
+        payload = self._message.executefunction(hostname, app_id, arguments, path, env)
+        xmpp_node =  self._host_session_id(hostname)
+        self._client.publish(payload, xmpp_node)
+
+    def exit(self, hostname, app_id):
+        payload = self._message.exitfunction(hostname, app_id)
+        xmpp_node =  self._host_session_id(hostname)
+        self._client.publish(payload, xmpp_node)
+
+    def disconnect(self):
+        self._client.delete(self._exp_session_id)
+        self._client.delete(self._logger_session_id)
+
+        for hostname in self._hostnames[:]:
+            self.delete(hostname)
+
+        time.sleep(1)
+        self._client.disconnect()
+
+
+class OMFAPIFactory(object):
+    _Api = dict()
+
+    @classmethod 
+    def get_api(cls, slice, host, port, password):
+        if slice and host and port and password:
+            key = cls._hash_api(slice, host, port)
+            if key in cls._Api:
+                return cls._Api[key]
+            else :
+                return cls.create_api(slice, host, port, password)
+        return None
+
+    @classmethod 
+    def create_api(cls, slice, host, port, password):
+        OmfApi = OMFAPI(slice, host, port, password)
+        key = cls._hash_api(slice, host, port)      
+        cls._Api[key] = OmfApi
+        return OmfApi
+
+    @classmethod 
+    def _hash_api(cls, slice, host, port):
+        res = slice + "_" + host + "_" + port
+        return res
+
+
+
+
+
diff --git a/src/neco/resources/omf/omf_application.py b/src/neco/resources/omf/omf_application.py
new file mode 100644 (file)
index 0000000..8b6625a
--- /dev/null
@@ -0,0 +1,84 @@
+#!/usr/bin/env python
+from neco.execution.resource import Resource, clsinit
+from neco.execution.attribute import Attribute
+from neco.resources.omf.omf_api import OMFAPIFactory
+
+import neco
+import logging
+
+@clsinit
+class OMFApplication(Resource):
+    _rtype = "OMFApplication"
+    _authorized_connections = ["OMFNode"]
+
+    @classmethod
+    def _register_attributes(cls):
+        appid = Attribute("appid", "Name of the application")
+        path = Attribute("path", "Path of the application")
+        args = Attribute("args", "Argument of the application")
+        env = Attribute("env", "Environnement variable of the application")
+        xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02")
+        xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02")
+        xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02")
+        xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02")
+        cls._register_attribute(appid)
+        cls._register_attribute(path)
+        cls._register_attribute(args)
+        cls._register_attribute(env)
+        cls._register_attribute(xmppSlice)
+        cls._register_attribute(xmppHost)
+        cls._register_attribute(xmppPort)
+        cls._register_attribute(xmppPassword)
+
+
+    def __init__(self, ec, guid, creds):
+        super(OMFApplication, self).__init__(ec, guid)
+        self.set('xmppSlice', creds['xmppSlice'])
+        self.set('xmppHost', creds['xmppHost'])
+        self.set('xmppPort', creds['xmppPort'])
+        self.set('xmppPassword', creds['xmppPassword'])
+
+        self.set('appid', "")
+        self.set('path', "")
+        self.set('args', "")
+        self.set('env', "")
+
+        self._node = None
+
+        self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+
+        self._logger = logging.getLogger("neco.omf.omfApp    ")
+        self._logger.setLevel(neco.LOGLEVEL)
+
+    def _validate_connection(self, guid):
+        rm = self.ec.resource(guid)
+        if rm.rtype() not in self._authorized_connections:
+            self._logger.debug("Connection between %s %s and %s %s refused : An Application can be connected only to a Node" % (self.rtype(), self._guid, rm.rtype(), guid))
+            return False
+        elif len(self.connections) != 0 :
+            self._logger.debug("Connection between %s %s and %s %s refused : Already Connected" % (self.rtype(), self._guid, rm.rtype(), guid))
+            return False
+        else :
+            self._logger.debug("Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid))
+            return True
+
+    def _get_nodes(self, conn_set):
+        for elt in conn_set:
+            rm = self.ec.resource(elt)
+            if rm.rtype() == "OMFNode":
+                return rm
+        return None
+
+    def start(self):
+        self._logger.debug(" " + self.rtype() + " ( Guid : " + str(self._guid) +") : " + self.get('appid') + " : " + self.get('path') + " : " + self.get('args') + " : " + self.get('env'))
+        #try:
+        if self.get('appid') and self.get('path') and self.get('args') and self.get('env'):
+            rm_node = self._get_nodes(self._connections)
+            self._omf_api.execute(rm_node.get('hostname'),self.get('appid'), self.get('args'), self.get('path'), self.get('env'))
+
+    def stop(self):
+        rm_node = self._get_nodes(self._connections)
+        self._omf_api.exit(rm_node.get('hostname'),self.get('appid'))
+
+
+
diff --git a/src/neco/resources/omf/omf_channel.py b/src/neco/resources/omf/omf_channel.py
new file mode 100644 (file)
index 0000000..bd7f0f1
--- /dev/null
@@ -0,0 +1,89 @@
+#!/usr/bin/env python
+from neco.execution.resource import Resource, clsinit
+from neco.execution.attribute import Attribute
+
+from neco.resources.omf.omf_api import OMFAPIFactory
+
+import neco
+import logging
+
+@clsinit
+class OMFChannel(Resource):
+    _rtype = "OMFChannel"
+    _authorized_connections = ["OMFWifiInterface"]
+
+    @classmethod
+    def _register_attributes(cls):
+        channel = Attribute("channel", "Name of the application")
+        xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02")
+        xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02")
+        xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02")
+        xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02")
+        cls._register_attribute(channel)
+        cls._register_attribute(xmppSlice)
+        cls._register_attribute(xmppHost)
+        cls._register_attribute(xmppPort)
+        cls._register_attribute(xmppPassword)
+
+    def __init__(self, ec, guid, creds):
+        super(OMFChannel, self).__init__(ec, guid)
+        self.set('xmppSlice', creds['xmppSlice'])
+        self.set('xmppHost', creds['xmppHost'])
+        self.set('xmppPort', creds['xmppPort'])
+        self.set('xmppPassword', creds['xmppPassword'])
+
+        self._nodes_guid = list()
+
+        self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+
+        self._logger = logging.getLogger("neco.omf.omfChannel")
+        self._logger.setLevel(neco.LOGLEVEL)
+
+    def _validate_connection(self, guid):
+        rm = self.ec.resource(guid)
+        if rm.rtype() in self._authorized_connections:
+            self._logger.debug("Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid))
+            return True
+        self._logger.debug("Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid))
+        return False
+
+    def _get_nodes(self, conn_set):
+        for elt in conn_set:
+            rm_iface = self.ec.resource(elt)
+            for conn in rm_iface._connections:
+                rm_node = self.ec.resource(conn)
+                if rm_node.rtype() == "OMFNode":
+                    couple = [rm_node.get('hostname'), rm_iface.get('alias')]
+                    #print couple
+                    self._nodes_guid.append(couple)
+        return self._nodes_guid
+
+    def discover(self):
+        pass
+     
+    def provision(self, credential):
+        pass
+
+    def start(self):
+        if self.get('channel'):
+            set_nodes = self._get_nodes(self._connections) 
+            #print set_nodes
+            for couple in set_nodes:
+                #print "Couple node/alias : " + couple[0] + "  ,  " + couple[1]
+                attrval = self.get('channel')
+                attrname = "net/%s/%s" % (couple[1], 'channel')
+                #print "Send the configure message"
+                self._omf_api.configure(couple[0], attrname, attrval)
+
+    def xstart(self):
+        try:
+            if self.get('channel'):
+                node = self.tc.elements.get(self._node_guid)    
+                attrval = self.get('channel')
+                attrname = "net/%s/%s" % (self._alias, 'channel')
+                self._omf_api.configure('omf.plexus.wlab17', attrname, attrval)
+        except AttributeError:
+            # If the attribute is not yet defined, ignore the error
+            pass
+
+
diff --git a/src/neco/resources/omf/omf_client.py b/src/neco/resources/omf/omf_client.py
new file mode 100644 (file)
index 0000000..7557a2c
--- /dev/null
@@ -0,0 +1,202 @@
+import logging
+import sleekxmpp
+from sleekxmpp.exceptions import IqError, IqTimeout
+import traceback
+import xml.etree.ElementTree as ET
+
+import neco
+
+class OMFClient(sleekxmpp.ClientXMPP):
+    def __init__(self, jid, password):
+        sleekxmpp.ClientXMPP.__init__(self, jid, password)
+        self._ready = False
+        self._registered = False
+        self._server = None
+
+        self.register_plugin('xep_0077') # In-band registration
+        self.register_plugin('xep_0030')
+        self.register_plugin('xep_0059')
+        self.register_plugin('xep_0060') # PubSub 
+
+        self.add_event_handler("session_start", self.start)
+        self.add_event_handler("register", self.register)
+        self.add_event_handler("pubsub_publish", self.handle_omf_message)
+        
+        self._logger = logging.getLogger("neco.omf.xmppClient")
+        self._logger.setLevel(neco.LOGLEVEL)
+
+    @property
+    def ready(self):
+        return self._ready
+
+    def start(self, event):
+        self.send_presence()
+        self._ready = True
+        self._server = "pubsub.%s" % self.boundjid.domain
+
+    def register(self, iq):
+        if self._registered:
+            self._logger.info("%s already registered!" % self.boundjid)
+            return 
+
+        resp = self.Iq()
+        resp['type'] = 'set'
+        resp['register']['username'] = self.boundjid.user
+        resp['register']['password'] = self.password
+
+        try:
+            resp.send(now=True)
+            self._logger.info("Account created for %s!" % self.boundjid)
+            self._registered = True
+        except IqError as e:
+            self._logger.error("Could not register account: %s" %
+                    e.iq['error']['text'])
+        except IqTimeout:
+            self._logger.error("No response from server.")
+
+    def unregister(self):
+        try:
+            self.plugin['xep_0077'].cancel_registration(
+                ifrom=self.boundjid.full)
+            self._logger.info("Account unregistered for %s!" % self.boundjid)
+        except IqError as e:
+            self._logger.error("Could not unregister account: %s" %
+                    e.iq['error']['text'])
+        except IqTimeout:
+            self._logger.error("No response from server.")
+
+    def nodes(self):
+        try:
+            result = self['xep_0060'].get_nodes(self._server)
+            for item in result['disco_items']['items']:
+                self._logger.info(' - %s' % str(item))
+            return result
+        except:
+            error = traceback.format_exc()
+            self._logger.error('Could not retrieve node list.\ntraceback:\n%s', error)
+
+    def subscriptions(self):
+        try:
+            result = self['xep_0060'].get_subscriptions(self._server)
+                #self.boundjid.full)
+            for node in result['node']:
+                self._logger.info(' - %s' % str(node))
+            return result
+        except:
+            error = traceback.format_exc()
+            self._logger.error('Could not retrieve subscriptions.\ntraceback:\n%s', error)
+
+    def create(self, node):
+        self._logger.debug(" Create Topic : " + node)
+   
+        config = self['xep_0004'].makeForm('submit')
+        config.add_field(var='pubsub#node_type', value='leaf')
+        config.add_field(var='pubsub#notify_retract', value='0')
+        config.add_field(var='pubsub#publish_model', value='open')
+        config.add_field(var='pubsub#persist_items', value='1')
+        config.add_field(var='pubsub#max_items', value='1')
+        config.add_field(var='pubsub#title', value=node)
+
+        try:
+            self['xep_0060'].create_node(self._server, node, config = config)
+        except:
+            error = traceback.format_exc()
+            self._logger.error('Could not create topic: %s\ntraceback:\n%s' % (node, error))
+
+    def delete(self, node):
+        try:
+            self['xep_0060'].delete_node(self._server, node)
+            self._logger.info('Deleted node: %s' % node)
+        except:
+            error = traceback.format_exc()
+            self._logger.error('Could not delete topic: %s\ntraceback:\n%s' % (node, error))
+    
+    def publish(self, data, node):
+        self._logger.debug(" Publish to Topic :" + node)
+        try:
+            result = self['xep_0060'].publish(self._server,node,payload=data)
+            # id = result['pubsub']['publish']['item']['id']
+            # print('Published at item id: %s' % id)
+        except:
+            error = traceback.format_exc()
+            self._logger.error('Could not publish to: %s\ntraceback:\n%s' \
+                    % (node, error))
+
+    def get(self, data):
+        try:
+            result = self['xep_0060'].get_item(self._server, self.boundjid,
+                data)
+            for item in result['pubsub']['items']['substanzas']:
+                self._logger.info('Retrieved item %s: %s' % (item['id'], 
+                    tostring(item['payload'])))
+        except:
+            error = traceback.format_exc()
+            self._logger.error('Could not retrieve item %s from topic %s\ntraceback:\n%s' \
+                    % (data, self.boundjid, error))
+
+    def retract(self, data):
+        try:
+            result = self['xep_0060'].retract(self._server, self.boundjid, data)
+            self._logger.info('Retracted item %s from topic %s' % (data, self.boundjid))
+        except:
+            error = traceback.format_exc()
+            self._logger.error('Could not retract item %s from topic %s\ntraceback:\n%s' \
+                    % (data, self.boundjid, error))
+
+    def purge(self):
+        try:
+            result = self['xep_0060'].purge(self._server, self.boundjid)
+            self._logger.info('Purged all items from topic %s' % self.boundjid)
+        except:
+            error = traceback.format_exc()
+            self._logger.error('Could not purge items from topic %s\ntraceback:\n%s' \
+                    % (self.boundjid, error))
+
+    def subscribe(self, node):
+        try:
+            result = self['xep_0060'].subscribe(self._server, node)
+            #self._logger.debug('Subscribed %s to node %s' \
+                    #% (self.boundjid.bare, node))
+            self._logger.info(' Subscribed %s to topic %s' \
+                    % (self.boundjid.user, node))
+        except:
+            error = traceback.format_exc()
+            self._logger.error(' Could not subscribe %s to topic %s\ntraceback:\n%s' \
+                    % (self.boundjid.bare, node, error))
+
+    def unsubscribe(self, node):
+        try:
+            result = self['xep_0060'].unsubscribe(self._server, node)
+            self._logger.info(' Unsubscribed %s from topic %s' % (self.boundjid.bare, node))
+        except:
+            error = traceback.format_exc()
+            self._logger.error(' Could not unsubscribe %s from topic %s\ntraceback:\n%s' \
+                    % (self.boundjid.bare, node, error))
+
+    def _check_for_tag(self, treeroot, namespaces, tag):
+        for element in treeroot.iter(namespaces+tag):
+            if element.text:
+                return element
+            else : 
+                return None    
+
+    def _check_output(self, treeroot, namespaces):
+        output_param = ["TARGET", "REASON", "PATH", "APPID", "VALUE"]
+        response = ""
+        for elt in output_param:
+            msg = self._check_for_tag(treeroot, namespaces, elt)
+            if msg is not None:
+                response = response + " " + msg.text + " :"
+        deb = self._check_for_tag(treeroot, namespaces, "MESSAGE")
+        if deb is not None:
+            self._logger.debug(response + " " + deb.text)
+        else :
+            self._logger.info(response)
+
+    def handle_omf_message(self, iq):
+        namespaces = "{http://jabber.org/protocol/pubsub}"
+        for i in iq['pubsub_event']['items']:
+            root = ET.fromstring(str(i))
+            self._check_output(root, namespaces)
+
+
diff --git a/src/neco/resources/omf/omf_interface.py b/src/neco/resources/omf/omf_interface.py
new file mode 100644 (file)
index 0000000..07d6446
--- /dev/null
@@ -0,0 +1,82 @@
+#!/usr/bin/env python
+from neco.execution.resource import Resource, clsinit
+from neco.execution.attribute import Attribute
+
+from neco.resources.omf.omf_api import OMFAPIFactory
+
+import neco
+import logging
+
+@clsinit
+class OMFWifiInterface(Resource):
+    _rtype = "OMFWifiInterface"
+    _authorized_connections = ["OMFNode" , "OMFChannel"]
+
+    #alias2name = dict({'w0':'wlan0', 'w1':'wlan1'})
+
+    @classmethod
+    def _register_attributes(cls):
+        alias = Attribute("alias","Alias of the interface", default_value = "w0")  
+        mode = Attribute("mode","Mode of the interface")
+        type = Attribute("type","Type of the interface")
+        essid = Attribute("essid","Essid of the interface")
+        ip = Attribute("ip","IP of the interface")
+        xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02")
+        xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02")
+        xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02")
+        xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02")
+        cls._register_attribute(alias)
+        cls._register_attribute(xmppSlice)
+        cls._register_attribute(xmppHost)
+        cls._register_attribute(xmppPort)
+        cls._register_attribute(xmppPassword)
+        cls._register_attribute(mode)
+        cls._register_attribute(type)
+        cls._register_attribute(essid)
+        cls._register_attribute(ip)
+
+    def __init__(self, ec, guid, creds):
+        super(OMFWifiInterface, self).__init__(ec, guid)
+        self.set('xmppSlice', creds['xmppSlice'])
+        self.set('xmppHost', creds['xmppHost'])
+        self.set('xmppPort', creds['xmppPort'])
+        self.set('xmppPassword', creds['xmppPassword'])
+
+        self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+        self._alias = self.get('alias')
+
+        self._logger = logging.getLogger("neco.omf.omfIface  ")
+        self._logger.setLevel(neco.LOGLEVEL)
+
+    def _validate_connection(self, guid):
+        rm = self.ec.resource(guid)
+        if rm.rtype() in self._authorized_connections:
+            self._logger.debug("Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid))
+            return True
+        self._logger.debug("Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid))
+        return False
+
+    def _get_nodes(self, conn_set):
+        for elt in conn_set:
+            rm = self.ec.resource(elt)
+            if rm.rtype() == "OMFNode":
+                return rm
+        return None
+
+
+    def start(self):
+        self._logger.debug(self.rtype() + " ( Guid : " + str(self._guid) +") : " + self.get('mode') + " : " + self.get('type') + " : " + self.get('essid') + " : " + self.get('ip'))
+        #try:
+        if self.get('mode') and self.get('type') and self.get('essid') and self.get('ip'):
+            rm_node = self._get_nodes(self._connections)    
+            for attrname in ["mode", "type", "essid", "ip"]:
+                attrval = self.get(attrname)
+                attrname = "net/%s/%s" % (self._alias, attrname)
+                #print "Send the configure message"
+                self._omf_api.configure(rm_node.get('hostname'), attrname, attrval)
+
+    def stop(self):
+        self._omf_api.disconnect()
+
+
+
diff --git a/src/neco/resources/omf/omf_messages_5_4.py b/src/neco/resources/omf/omf_messages_5_4.py
new file mode 100644 (file)
index 0000000..77c53dc
--- /dev/null
@@ -0,0 +1,125 @@
+from xml.etree import cElementTree as ET
+
+EXECUTE = "EXECUTE"
+KILL = "KILL"
+STDIN = "STDIN"
+NOOP = "NOOP"
+PM_INSTALL = "PM_INSTALL"
+APT_INSTALL = "APT_INSTALL"
+RPM_INSTALL = "RPM_INSTALL"
+RESET = "RESET"
+REBOOT = "REBOOT"
+MODPROBE = "MODPROBE"
+CONFIGURE = "CONFIGURE"
+LOAD_IMAGE = "LOAD_IMAGE"
+SAVE_IMAGE = "SAVE_IMAGE"
+LOAD_DATA = "LOAD_DATA"
+SET_LINK = "SET_LINK"
+ALIAS = "ALIAS"
+SET_DISCONNECTION = "SET_DISCONNECTION"
+RESTART = "RESTART"
+ENROLL = "ENROLL"
+EXIT = "EXIT" 
+
+class MessageHandler():
+    SliceID = ""
+    ExpID = ""
+
+    def __init__(self, sliceid, expid ):
+        self.SliceID = sliceid
+        self.ExpID = expid
+        print "init" + self.ExpID +"  "+ self.SliceID
+        pass
+
+    def Mid(self, parent, keyword):
+        mid = ET.SubElement(parent, keyword)
+        mid.set("id", "\'omf-payload\'")
+        return mid
+
+    def Mtext(self, parent, keyword, text):
+        mtext = ET.SubElement(parent, keyword)
+        mtext.text = text
+        return mtext
+
+    def executefunction(self, target, appid, cmdlineargs, path, env):
+        payload = ET.Element("omf-message")
+        execute = self.Mid(payload,"EXECUTE")
+        env = self.Mtext(execute, "ENV", env)
+        sliceid = self.Mtext(execute,"SLICEID",self.SliceID)
+        expid = self.Mtext(execute,"EXPID",self.ExpID)
+        target = self.Mtext(execute,"TARGET",target)
+        appid = self.Mtext(execute,"APPID",appid)
+        cmdlineargs = self.Mtext(execute,"CMDLINEARGS",cmdlineargs)
+        path = self.Mtext(execute,"PATH",path)
+        return payload
+
+    def exitfunction(self, target, appid):
+        payload = ET.Element("omf-message")
+        execute = self.Mid(payload,"EXIT")
+        sliceid = self.Mtext(execute,"SLICEID",self.SliceID)
+        expid = self.Mtext(execute,"EXPID",self.ExpID)
+        target = self.Mtext(execute,"TARGET",target)
+        appid = self.Mtext(execute,"APPID",appid)
+        return payload
+
+    def configurefunction(self, target, value, path):
+        payload = ET.Element("omf-message")
+        config = self.Mid(payload, "CONFIGURE")
+        sliceid = self.Mtext(config,"SLICEID",self.SliceID)
+        expid = self.Mtext(config,"EXPID",self.ExpID)
+        target = self.Mtext(config,"TARGET",target)
+        value = self.Mtext(config,"VALUE",value)
+        path = self.Mtext(config,"PATH",path)
+        return payload
+
+    def logfunction(self,level, logger, level_name, data):
+        payload = ET.Element("omf-message")
+        log = self.Mid(payload, "LOGGING")
+        level = self.Mtext(log,"LEVEL",level)
+        sliceid = self.Mtext(log,"SLICEID",self.SliceID)
+        logger = self.Mtext(log,"LOGGER",logger)
+        expid = self.Mtext(log,"EXPID",self.ExpID)
+        level_name = self.Mtext(log,"LEVEL_NAME",level_name)
+        data = self.Mtext(log,"DATA",data)
+        return payload
+
+    def aliasfunction(self, name, target):
+        payload = ET.Element("omf-message")
+        alias = self.Mid(payload,"ALIAS")
+        sliceid = self.Mtext(alias,"SLICEID",self.SliceID)
+        expid = self.Mtext(alias,"EXPID",self.ExpID)
+        name = self.Mtext(alias,"NAME",name)
+        target = self.Mtext(alias,"TARGET",target)
+        return payload
+
+    def enrollfunction(self, enrollkey, image, index, target ):
+        payload = ET.Element("omf-message")
+        enroll = self.Mid(payload,"ENROLL")
+        enrollkey = self.Mtext(enroll,"ENROLLKEY",enrollkey)
+        sliceid = self.Mtext(enroll,"SLICEID",self.SliceID)
+        image = self.Mtext(enroll,"IMAGE",image)
+        expid = self.Mtext(enroll,"EXPID",self.ExpID)
+        index = self.Mtext(enroll,"INDEX",index)
+        target = self.Mtext(enroll,"TARGET",target)
+        return payload
+
+    def noopfunction(self,target):
+        payload = ET.Element("omf-message")
+        noop = self.Mid(payload,"NOOP")
+        sliceid = self.Mtext(noop,"SLICEID",self.SliceID)
+        expid = self.Mtext(noop,"EXPID",self.ExpID)
+        target = self.Mtext(noop,"TARGET",target)
+        return payload
+
+    def newexpfunction(self, experimentid, address):
+        payload = ET.Element("omf-message")
+        newexp = self.Mid(payload,"EXPERIMENT_NEW")
+        experimentid = self.Mtext(newexp,"EXPERIMENT_ID",experimentid)
+        sliceid = self.Mtext(newexp,"SLICEID",self.SliceID)
+        expid = self.Mtext(newexp,"EXPID",self.ExpID)
+        address = self.Mtext(newexp,"ADDRESS",address)
+        return payload
+
+    def handle_message(self, msg):
+        # Do something!!!
+        return msg
diff --git a/src/neco/resources/omf/omf_node.py b/src/neco/resources/omf/omf_node.py
new file mode 100644 (file)
index 0000000..1a98b93
--- /dev/null
@@ -0,0 +1,93 @@
+#!/usr/bin/env python
+from neco.execution.resource import Resource, clsinit
+from neco.execution.attribute import Attribute
+
+from neco.resources.omf.omf_api import OMFAPIFactory
+
+import neco
+import logging
+
+@clsinit
+class OMFNode(Resource):
+    _rtype = "OMFNode"
+    _authorized_connections = ["OMFApplication" , "OMFWifiInterface"]
+
+    @classmethod
+    def _register_attributes(cls):
+        hostname = Attribute("hostname", "Hostname of the machine")
+        cpu = Attribute("cpu", "CPU of the node")
+        ram = Attribute("ram", "RAM of the node")
+        xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02")
+        xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02")
+        xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02")
+        xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02")
+        cls._register_attribute(hostname)
+        cls._register_attribute(ram)
+        cls._register_attribute(cpu)
+        cls._register_attribute(xmppSlice)
+        cls._register_attribute(xmppHost)
+        cls._register_attribute(xmppPort)
+        cls._register_attribute(xmppPassword)
+
+    @classmethod
+    def _register_filters(cls):
+        hostname = Attribute("hostname", "Hostname of the machine")
+        gateway = Attribute("gateway", "Gateway")
+        granularity = Attribute("granularity", "Granularity of the reservation time")
+        hardware_type = Attribute("hardware_type", "Hardware type of the machine")
+        cls._register_filter(hostname)
+        cls._register_filter(gateway)
+        cls._register_filter(granularity)
+        cls._register_filter(hardware_type)
+
+    def __init__(self, ec, guid, creds):
+        super(OMFNode, self).__init__(ec, guid)
+        self.set('xmppSlice', creds['xmppSlice'])
+        self.set('xmppHost', creds['xmppHost'])
+        self.set('xmppPort', creds['xmppPort'])
+        self.set('xmppPassword', creds['xmppPassword'])
+
+        self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+
+        self._logger = logging.getLogger("neco.omf.omfNode   ")
+        self._logger.setLevel(neco.LOGLEVEL)
+
+    def _validate_connection(self, guid):
+        rm = self.ec.resource(guid)
+        if rm.rtype() in self._authorized_connections:
+            self._logger.debug("Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid))
+            return True
+        self._logger.debug("Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid))
+        return False
+
+    def discover(self):
+        pass
+     
+    def provision(self, credential):
+        pass
+
+    def start(self):
+        self._omf_api.enroll_host(self.get('hostname'))
+
+    def stop(self):
+        self._omf_api.disconnect()
+
+    def configure(self):
+        #routes = self.tc._add_route.get(self.guid, [])
+        #iface_guids = self.tc.get_connected(self.guid, "devs", "node")
+       
+        for route in routes:
+            (destination, netprefix, nexthop, metric, device) = route
+            netmask = ipaddr2.ipv4_mask2dot(netprefix)
+
+            # Validate that the interface is associated to the node
+            for iface_guid in iface_guids:
+                iface = self.tc.elements.get(iface_guid)
+                if iface.devname == device:
+                    self._omf_api.execute(self.get('hostname'), 
+                        "Id#%s" % str(random.getrandbits(128)), 
+                        "add -net %s netmask %s dev %s" % (destination, netmask, iface.devname), 
+                        "/sbin/route", # path
+                        None, # env
+                     )
+                    break
diff --git a/src/neco/resources/omf/xx_omf_resource.py b/src/neco/resources/omf/xx_omf_resource.py
new file mode 100644 (file)
index 0000000..784513e
--- /dev/null
@@ -0,0 +1,37 @@
+#!/usr/bin/env python
+from neco.execution.resource import Resource, clsinit
+from neco.execution.attribute import Attribute
+
+from neco.resources.omf.omf_api import OMFAPIFactory
+
+@clsinit
+class OMFResource(Resource):
+    _rtype = "OMFResource"
+
+    @classmethod
+    def _register_attributes(cls):
+        xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02")
+        xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02")
+        xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02")
+        xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02")
+        cls._register_attribute(xmppSlice)
+        cls._register_attribute(xmppHost)
+        cls._register_attribute(xmppPort)
+        cls._register_attribute(xmppPassword)
+
+    def __init__(self, ec, guid, creds):
+        super(OMFNode, self).__init__(ec, guid)
+        self.set('xmppSlice', creds['xmppSlice'])
+        self.set('xmppHost', creds['xmppHost'])
+        self.set('xmppPort', creds['xmppPort'])
+        self.set('xmppPassword', creds['xmppPassword'])
+
+        self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+
+    def discover(self):
+        pass
+     
+    def provision(self, credential):
+        pass
+
+
old mode 100644 (file)
new mode 100755 (executable)
index cbe2f95..3616554
@@ -41,6 +41,9 @@ class ResourceTestCase(unittest.TestCase):
         self.assertEquals(AnotherResource.rtype(), "AnotherResource")
         self.assertEquals(len(AnotherResource._attributes), 0)
 
+        #self.assertEquals(OmfNode.rtype(), "OmfNode")
+        #self.assertEquals(len(OmfNode._attributes), 0)
+
         self.assertEquals(len(ResourceFactory.resource_types()), 2)
 
 if __name__ == '__main__':
diff --git a/test/resources/omf/omf_vlc_exp.py b/test/resources/omf/omf_vlc_exp.py
new file mode 100755 (executable)
index 0000000..799046f
--- /dev/null
@@ -0,0 +1,154 @@
+#!/usr/bin/env python
+from neco.execution.resource import Resource, ResourceFactory
+from neco.execution.ec import ExperimentController
+
+from neco.resources.omf.omf_node import OMFNode
+from neco.resources.omf.omf_application import OMFApplication
+from neco.resources.omf.omf_interface import OMFWifiInterface
+from neco.resources.omf.omf_channel import OMFChannel
+from neco.resources.omf.omf_api import OMFAPIFactory
+
+from neco.util import guid
+
+import time
+import unittest
+import logging
+
+logging.basicConfig()
+
+
+class DummyEC(ExperimentController):
+    pass
+
+class OMFVLCTestCase(unittest.TestCase):
+
+    def setUp(self):
+        #self.guid_generator = guid.GuidGenerator()
+        self._creds = {'xmppSlice' : 'nepi' , 'xmppHost' : 'xmpp-plexus.onelab.eu' , 'xmppPort' : '5222', 'xmppPassword' : '1234'  }
+
+    def tearDown(self):
+        pass
+
+    def test_creation_phase(self):
+        ec = DummyEC()
+
+        ResourceFactory.register_type(OMFNode)
+        ResourceFactory.register_type(OMFWifiInterface)
+        ResourceFactory.register_type(OMFChannel)
+        ResourceFactory.register_type(OMFApplication)
+
+        self.assertEquals(OMFNode.rtype(), "OMFNode")
+        self.assertEquals(len(OMFNode._attributes), 7)
+
+        self.assertEquals(OMFWifiInterface.rtype(), "OMFWifiInterface")
+        self.assertEquals(len(OMFWifiInterface._attributes), 9)
+
+        self.assertEquals(OMFChannel.rtype(), "OMFChannel")
+        self.assertEquals(len(OMFChannel._attributes), 5)
+
+        self.assertEquals(OMFApplication.rtype(), "OMFApplication")
+        self.assertEquals(len(OMFApplication._attributes), 8)
+
+        self.assertEquals(len(ResourceFactory.resource_types()), 4)
+
+    #def xtest_creation_and_configuration_node(self):
+        guid = ec.register_resource("OMFNode", creds =  self._creds)
+        node1 = ec._resources[guid]
+        node1.set('hostname', 'omf.plexus.wlab17')
+
+        guid = ec.register_resource("OMFNode", creds =  self._creds)
+        node2 = ec._resources[guid]
+        node2.set('hostname', "omf.plexus.wlab37")
+
+    #def xtest_creation_and_configuration_interface(self):
+        guid = ec.register_resource("OMFWifiInterface", creds =  self._creds)
+        iface1 = ec._resources[guid]
+        iface1.set('alias', "w0")
+        iface1.set('mode', "adhoc")
+        iface1.set('type', "g")
+        iface1.set('essid', "helloworld")
+        iface1.set('ip', "10.0.0.17")
+
+        guid = ec.register_resource("OMFWifiInterface", creds =  self._creds)
+        iface2 = ec._resources[guid]
+        iface2.set('alias', "w0")
+        iface2.set('mode', "adhoc")
+        iface2.set('type', 'g')
+        iface2.set('essid', "helloworld")
+        iface2.set('ip', "10.0.0.37")  
+
+    #def xtest_creation_and_configuration_channel(self):
+        guid = ec.register_resource("OMFChannel", creds =  self._creds)
+        channel = ec._resources[guid]
+        channel.set('channel', "6")
+
+    #def xtest_creation_and_configuration_application(self):
+        guid = ec.register_resource("OMFApplication", creds =  self._creds)
+        app1 = ec._resources[guid]
+        app1.set('appid', 'Vlc#1')
+        app1.set('path', "/opt/vlc-1.1.13/cvlc")
+        app1.set('args', "/opt/10-by-p0d.avi --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts}'")
+        app1.set('env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
+
+        guid = ec.register_resource("OMFApplication", creds =  self._creds)
+        app2 = ec._resources[guid]
+        app2.set('appid', 'Vlc#2')
+        app2.set('path', "/opt/vlc-1.1.13/cvlc")
+        app2.set('args', "rtp://10.0.0.37:1234")
+        app2.set('env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
+        self.assertEquals(len(OMFAPIFactory._Api), 1)   
+
+    #def test_connection(self):
+        app1.connect(node1._guid)
+        node1.connect(app1._guid)
+
+        node1.connect(iface1._guid)
+        iface1.connect(node1._guid)
+
+        iface1.connect(channel._guid)
+        channel.connect(iface1._guid)
+
+        channel.connect(iface2._guid)
+        iface2.connect(channel._guid)
+
+        iface2.connect(node2._guid)
+        node2.connect(iface2._guid)
+
+        node2.connect(app2._guid)
+        app2.connect(node2._guid)
+
+    #def test_start_node(self):
+        node1.start()
+        node2.start()
+        time.sleep(1)
+        #pass
+
+    #def test_start_interface(self):
+        iface1.start()
+        iface2.start()
+
+    #def test_start_channel(self):
+        channel.start()
+        time.sleep(1)
+
+    #def test_start_application(self):
+        app1.start()
+        time.sleep(2)
+        app2.start()
+
+        time.sleep(10)
+    
+    #def test_stop_application(self):
+        app1.stop()
+        app2.stop()
+        time.sleep(2)
+
+
+    #def test_stop_nodes(self):
+        node1.stop()
+        #node2.stop()
+
+
+if __name__ == '__main__':
+    unittest.main()
+