Adding baskend for OMF
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Mon, 20 Aug 2012 17:46:17 +0000 (19:46 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Mon, 20 Aug 2012 17:46:17 +0000 (19:46 +0200)
examples/omf_vlc.py [new file with mode: 0644]
src/nepi/testbeds/omf/__init__.py [new file with mode: 0644]
src/nepi/testbeds/omf/constants.py [new file with mode: 0644]
src/nepi/testbeds/omf/execute.py [new file with mode: 0644]
src/nepi/testbeds/omf/metadata.py [new file with mode: 0644]
src/nepi/testbeds/omf/omf_client.py [new file with mode: 0644]
src/nepi/testbeds/omf/omf_messages.py [new file with mode: 0644]

diff --git a/examples/omf_vlc.py b/examples/omf_vlc.py
new file mode 100644 (file)
index 0000000..e5aac6c
--- /dev/null
@@ -0,0 +1,88 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+#
+# Experiment Topology:
+#
+#  n1 --- n2
+#  0.1   0.2 
+#    
+
+from nepi.core.design import ExperimentDescription, FactoriesProvider
+from nepi.core.execute import ExperimentController
+import getpass
+import tempfile
+import time
+
+root_dir = tempfile.mkdtemp()
+
+exp_desc = ExperimentDescription()
+
+testbed_id = "omf"
+omf_provider = FactoriesProvider(testbed_id)
+omf_desc = exp_desc.add_testbed_description(omf_provider)
+omf_desc.set_attribute_value("homeDirectory", root_dir)
+omf_desc.set_attribute_value("enableDebug", True)
+omf_desc.set_attribute_value("xmppSlice", "default_slice")
+omf_desc.set_attribute_value("xmppHost", "xmpp-omf.onelab.eu")
+omf_desc.set_attribute_value("xmppPort", 5222)
+omf_desc.set_attribute_value("xmppPassword", "******")
+
+node1 = omf_desc.create("Node")
+node1.set_attribute_value("hostname", "omf.my.wlab18")
+node2 = omf_desc.create("Node")
+node2.set_attribute_value("hostname", "omf.my.wlab49")
+
+iface12 = omf_desc.create("WifiInterface")
+iface12.set_attribute_value("mode", "adhoc")
+iface12.set_attribute_value("channel", "g")
+iface12.set_attribute_value("type", "g")
+iface12.set_attribute_value("essid", "cvlcmode")
+node1.connector("devs").connect(iface12.connector("node"))
+
+iface21 = omf_desc.create("WifiInterface")
+iface21.set_attribute_value("mode", "adhoc")
+iface21.set_attribute_value("channel", "g")
+iface21.set_attribute_value("type", "g")
+iface21.set_attribute_value("essid", "cvlcmode")
+node2.connector("devs").connect(iface21.connector("node"))
+
+ip12 = iface12.add_address()
+ip12.set_attribute_value("Address", "192.168.0.18")
+
+ip21 = iface21.add_address()
+ip21.set_attribute_value("Address", "192.168.0.49")
+
+channel = omf_desc.create("Channel")
+channel.set_attribute_value("mode", "adhoc")
+channel.set_attribute_value("channel", "g")
+channel.set_attribute_value("type", "g")
+channel.set_attribute_value("essid", "cvlcmode")
+channel.connector("devs").connect(iface12.connector("chan"))
+channel.connector("devs").connect(iface21.connector("chan"))
+
+app2 = omf_desc.create("Application")
+app2.set_attribute_value("appId", "Vlc#2")
+app2.set_attribute_value("arguments", "rtp://239.255.0.1:1234")
+app2.set_attribute_value("path", "/opt/vlc-1.1.13/vlc")
+app2.connector("node").connect(node2.connector("apps"))
+
+app1 = omf_desc.create("Application")
+app1.set_attribute_value("appId", "Vlc#1")
+app1.set_attribute_value("arguments", "/opt/10-by-p0d.avi --sout '#duplicate{dst=display,dst=rtp{mux=ts,dst=239.255.0.1,port=1234}}'")
+app1.set_attribute_value("path", "/opt/vlc-1.1.13/vlc")
+app1.connector("node").connect(node1.connector("apps"))
+
+xml = exp_desc.to_xml()
+
+controller = ExperimentController(xml, root_dir)
+controller.start()
+#while not (controller.is_finished(app1.guid) and \
+#        controller.is_finished(app2.guid)):
+#    time.sleep(0.5)
+
+time.sleep(30)
+
+controller.stop()
+controller.shutdown()
+
diff --git a/src/nepi/testbeds/omf/__init__.py b/src/nepi/testbeds/omf/__init__.py
new file mode 100644 (file)
index 0000000..d3ef5dd
--- /dev/null
@@ -0,0 +1,5 @@
+# -*- coding: utf-8 -*-
+
+from constants import TESTBED_ID, TESTBED_ID
+from execute import TestbedController 
+
diff --git a/src/nepi/testbeds/omf/constants.py b/src/nepi/testbeds/omf/constants.py
new file mode 100644 (file)
index 0000000..dc96b01
--- /dev/null
@@ -0,0 +1,5 @@
+# -*- coding: utf-8 -*-
+
+TESTBED_ID = "omf"
+TESTBED_VERSION = "5.4"
+
diff --git a/src/nepi/testbeds/omf/execute.py b/src/nepi/testbeds/omf/execute.py
new file mode 100644 (file)
index 0000000..dccb80c
--- /dev/null
@@ -0,0 +1,160 @@
+# -*- coding: utf-8 -*-
+
+from constants import TESTBED_ID, TESTBED_VERSION
+from nepi.core import testbed_impl
+from nepi.util.constants import TIME_NOW
+
+import datetime
+import logging
+import os
+import sys
+import ssl
+import time
+
+from nepi.testbeds.omf.omf_client import OMFClient
+from nepi.testbeds.omf.omf_messages import MessageHandler
+
+
+class TestbedController(testbed_impl.TestbedController):
+    def __init__(self):
+        super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
+        self._slice = None
+        self._user = None
+        self._host = None
+        self._xmpp = None
+        self._message = None
+        self._home = None
+        
+        self._logger = logging.getLogger('nepi.testbeds.omf')
+    def do_setup(self):
+        if self._attributes.get_attribute_value("enableDebug") == True:
+            self._logger.setLevel(logging.DEBUG)
+
+        # create home
+        self._home = self._attributes.\
+            get_attribute_value("homeDirectory")
+        home = os.path.normpath(self._home)
+        if not os.path.exists(home):
+            os.makedirs(home, 0755)
+    
+        # instantiate the xmpp client
+        self._init_client()
+        # register xmpp nodes for the experiment
+        self._publish_and_enroll_experiment()
+        # register xmpp logger for the experiment
+        self._publish_and_enroll_logger()
+
+        super(TestbedController, self).do_setup()
+
+    def set(self, guid, name, value, time = TIME_NOW):
+        super(TestbedController, self).set(guid, name, value, time)
+        pass
+
+    def get(self, guid, name, time = TIME_NOW):
+        value = super(TestbedController, self).get(guid, name, time)
+        return "MISS"
+
+    def shutdown(self):
+        node_sid = "/OMF/%s/%s" % (self._slice, self._user)
+        self._clean_up(node_sid)
+        logger = "/OMF/%s/%s/LOGGER" % (self._slice, self._user)
+        self._clean_up(logger)
+
+        for hostname in self._elements.values():
+            if not hostname:
+                continue
+            node_sid = self._host_sid(hostname)
+            self._clean_up(node_sid)
+            #node_res = self._host_res(hostname)
+            #self._clean_up(node_res)
+
+        time.sleep(5)
+        self._xmpp.disconnect()
+
+    def _host_sid(self, hostname):
+        return "/OMF/%s/%s/%s" % (self._slice, self._user, hostname)
+
+    def _host_res(self, hostname):
+        return "/OMF/%s/resources/%s" % (self._slice, hostname)
+
+    def _init_client(self):
+        self._slice = self._attributes.get_attribute_value("xmppSlice")
+        self._host = self._attributes.get_attribute_value("xmppHost")
+        port = self._attributes.get_attribute_value("xmppPort")
+        password = self._attributes.get_attribute_value("xmppPassword")
+       
+        #date = "2012-04-18t16.06.34+02.00"
+        date = datetime.datetime.now().strftime("%Y-%m-%dt%H.%M.%S+02.00")
+        self._user = "%s-%s" % (self._slice, date)
+        jid = "%s@%s" % (self._user, self._host)
+
+        xmpp = OMFClient(jid, password)
+        # PROTOCOL_SSLv3 required for compatibility with OpenFire
+        xmpp.ssl_version = ssl.PROTOCOL_SSLv3
+
+        if xmpp.connect((self._host, port)):
+            xmpp.process(threaded=True)
+            while not xmpp.ready:
+                time.sleep(1)
+            self._xmpp = xmpp
+            self._message = MessageHandler(self._slice, self._user)
+        else:
+            msg = "Unable to connect to the XMPP server."
+            self._logger.error(msg)
+            raise RuntimeError(msg)
+
+    def _publish_and_enroll_experiment(self):
+        node_sid = "/OMF/%s/%s" % (self._slice, self._user)
+        self._create_and_subscribe(node_sid)  
+
+        node_slice = "/OMF/%s" % (self._slice)
+        address = "/%s/OMF/%s/%s" % (self._host, self._slice, self._user)
+        payload = self._message.newexpfunction(self._user, address)
+        self._xmpp.publish(payload, node_slice)
+   
+    def _publish_and_enroll_logger(self):
+        logger = "/OMF/%s/%s/LOGGER" % (self._slice, self._user)
+        self._create_and_subscribe(logger)
+
+        payload = self._message.logfunction("2", 
+                "nodeHandler::NodeHandler", 
+                "INFO", 
+                "OMF Experiment Controller 5.4 (git 529a626)")
+        self._xmpp.publish(payload, logger)
+
+    def _clean_up(self, xmpp_node):
+        self._xmpp.delete(xmpp_node)
+
+        if sys.version_info < (3, 0):
+            reload(sys)
+            sys.setdefaultencoding('utf8')
+
+    def _create_and_subscribe(self, xmpp_node):
+        self._xmpp.suscriptions()
+        self._xmpp.create(xmpp_node)
+        self._xmpp.subscribe(xmpp_node)
+        self._xmpp.nodes()
+
+    def _publish_and_enroll_host(self, hostname):
+        node_sid =  self._host_sid(hostname)
+        self._create_and_subscribe(node_sid)  
+        
+        node_res =  self._host_res(hostname)
+        self._create_and_subscribe(node_res)  
+
+        payload = self._message.enrollfunction("1", "*", "1", hostname)
+        self._xmpp.publish(payload, node_res)
+
+    def _publish_configure(self, hostname, attribute, value): 
+        payload = self._message.configurefunction(hostname, value, attribute)
+        node_sid =  self._host_sid(hostname)
+        self._xmpp.publish(payload, node_sid)
+
+    def _publish_execute(self, hostname, app_id, arguments, path):
+        payload = self._message.executefunction(hostname, app_id, arguments, path)
+        node_sid =  self._host_sid(hostname)
+        self._xmpp.publish(payload, node_sid)
+
+
+
diff --git a/src/nepi/testbeds/omf/metadata.py b/src/nepi/testbeds/omf/metadata.py
new file mode 100644 (file)
index 0000000..3a86db0
--- /dev/null
@@ -0,0 +1,347 @@
+# -*- coding: utf-8 -*-
+
+from constants import TESTBED_ID, TESTBED_VERSION
+from nepi.core import metadata
+from nepi.core.attributes import Attribute
+from nepi.util import tags, validation
+from nepi.util.constants import ApplicationStatus as AS, \
+        FactoryCategories as FC, DeploymentConfiguration as DC
+
+# Factories
+NODE = "Node"
+WIFIIFACE = "WifiInterface"
+ETHIFACE = "EthInterface"
+CHANNEL = "Channel"
+APPLICATION = "Application"
+
+### Connection functions ####
+
+### Creation functions ###
+
+def create_node(testbed_instance, guid):
+    parameters = testbed_instance._get_parameters(guid)
+    hostname = parameters['hostname']
+    testbed_instance._elements[guid] = hostname
+    testbed_instance._publish_and_enroll_host(hostname)
+
+def create_wifiiface(testbed_instance, guid):
+    pass
+
+def create_ethiface(testbed_instance, guid):
+    pass
+
+def create_channel(testbed_instance, guid):
+    pass
+
+def create_application(testbed_instance, guid):
+    pass
+
+### Start/Stop functions ###
+
+def start_application(testbed_instance, guid):
+    # search for the node asociated with the device
+    node_guids = testbed_instance.get_connected(guid, "node", "apps")
+    if len(node_guids) == 0:
+        raise RuntimeError("Can't instantiate interface %d outside node" % guid)
+
+    # node attributes
+    node_parameters = testbed_instance._get_parameters(node_guids[0])
+    hostname = node_parameters['hostname']
+
+    # application attributes
+    parameters = testbed_instance._get_parameters(guid)
+    app_id = parameters.get("appId")
+    arguments = parameters.get("arguments")
+    path = parameters.get("path")
+    testbed_instance._publish_execute(hostname, app_id, arguments, path)
+
+def stop_application(testbed_instance, guid):
+    pass
+
+### Status functions ###
+
+def status_application(testbed_instance, guid):
+    if guid not in testbed_instance.elements.keys():
+        return AS.STATUS_NOT_STARTED
+    return AS.STATUS_RUNNING
+    # TODO!!!!
+    #return AS.STATUS_FINISHED
+
+### Configure functions ###
+
+def configure_wifiiface(testbed_instance, guid):
+    # search for the node asociated with the device
+    node_guids = testbed_instance.get_connected(guid, "node", "devs")
+    if len(node_guids) == 0:
+        raise RuntimeError("Can't instantiate interface %d outside node" % guid)
+
+    # node attributes
+    node_parameters = testbed_instance._get_parameters(node_guids[0])
+    hostname = node_parameters['hostname']
+
+    # wifi iface attributes
+    parameters = testbed_instance._get_parameters(guid)
+
+    for attr in ["mode", "type", "channel", "essid"]: 
+        attribute = "net/w0/%s" % attr
+        value = parameters.get(attr)
+        if value:
+            testbed_instance._publish_configure(hostname, attribute, value)
+
+    if guid in testbed_instance._add_address: 
+        attribute = "net/w0/ip"
+        addresses = testbed_instance._add_address[guid]
+        (value, netprefix, broadcast) = addresses[0]
+        testbed_instance._publish_configure(hostname, attribute, value)
+
+### Factory information ###
+
+connector_types = dict({
+    "apps": dict({
+                "help": "Connector from node to applications", 
+                "name": "apps",
+                "max": -1, 
+                "min": 0
+            }),
+    "devs": dict({
+                "help": "Connector to network interfaces", 
+                "name": "devs",
+                "max": -1, 
+                "min": 0
+            }),
+    "chan": dict({
+                "help": "Connector from a device to a channel", 
+                "name": "chan",
+                "max": 1, 
+                "min": 1
+            }),
+    "node": dict({
+                "help": "Connector to a Node", 
+                "name": "node",
+                "max": 1, 
+                "min": 1
+            }),
+   })
+
+connections = [
+    dict({
+        "from": (TESTBED_ID, NODE, "devs"),
+        "to":   (TESTBED_ID, WIFIIFACE, "node"),
+        "can_cross": False
+    }),
+    dict({
+        "from": (TESTBED_ID, NODE, "devs"),
+        "to":   (TESTBED_ID, ETHIFACE, "node"),
+        "can_cross": False
+    }),
+    dict({
+        "from": (TESTBED_ID, WIFIIFACE, "chan"),
+        "to":   (TESTBED_ID, CHANNEL, "devs"),
+        "can_cross": False
+    }),
+    dict({
+        "from": (TESTBED_ID, NODE, "apps"),
+        "to":   (TESTBED_ID, APPLICATION, "node"),
+        "can_cross": False
+    }),
+ ]
+
+attributes = dict({
+    "appId": dict({
+                "name": "appId",
+                "help": "Application id",
+                "type": Attribute.STRING,
+                "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
+                "validation_function": validation.is_string
+            }),
+    "arguments": dict({
+                "name": "arguments",
+                "help": "Application arguments",
+                "type": Attribute.STRING,
+                "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
+                "validation_function": validation.is_string
+            }),
+    "path": dict({
+                "name": "path",
+                "help": "Path to binary (e.g '/opt/vlc-1.1.13/vlc')",
+                "type": Attribute.STRING,
+                "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
+                "validation_function": validation.is_string
+            }),
+    "hostname": dict({
+                "name": "hostname",
+                "help": "Hostname for the target OMF node",
+                "type": Attribute.STRING,
+                "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
+                "validation_function": validation.is_string
+            }),
+    "mode": dict({
+                "name": "mode",
+                "help": "Corresponds to the OMF attributes net/w0/mode",
+                "type": Attribute.STRING,
+                "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
+                "validation_function": validation.is_string
+            }),
+    "type": dict({
+                "name": "type",
+                "help": "Corresponds to the OMF attributes net/w0/type",
+                "type": Attribute.STRING,
+                "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
+                "validation_function": validation.is_string
+            }),
+    "channel": dict({
+                "name": "channel",
+                "help": "Corresponds to the OMF attributes net/w0/channel",
+                "type": Attribute.STRING,
+                "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
+                "validation_function": validation.is_string
+            }),
+    "essid": dict({
+                "name": "essid",
+                "help": "Corresponds to the OMF attributes net/w0/essid",
+                "type": Attribute.STRING,
+                "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
+                "validation_function": validation.is_string
+            }),
+
+
+    })
+
+traces = dict()
+
+create_order = [ NODE, WIFIIFACE, ETHIFACE, CHANNEL, APPLICATION ]
+configure_order = [ WIFIIFACE, ETHIFACE, NODE, CHANNEL, APPLICATION ]
+
+factories_info = dict({
+    NODE: dict({
+            "help": "OMF Node",
+            "category": FC.CATEGORY_NODES,
+            "create_function": create_node,
+            "box_attributes": ["hostname"],
+            "connector_types": ["devs", "apps"],
+            "tags": [tags.NODE, tags.ALLOW_ROUTES],
+       }),
+    WIFIIFACE: dict({
+            "help": "Wireless network interface",
+            "category": FC.CATEGORY_DEVICES,
+            "create_function": create_wifiiface,
+            "configure_function": configure_wifiiface,
+            "box_attributes": ["mode", "type", "channel", "essid"],
+            "connector_types": ["node", "chan"],
+            "tags": [tags.INTERFACE, tags.ALLOW_ADDRESSES],
+       }),
+    ETHIFACE: dict({
+            "help": "Ethernet network interface",
+            "category": FC.CATEGORY_DEVICES,
+            "create_function": create_ethiface,
+            #"box_attributes": [""],
+            "connector_types": ["node"],
+            "tags": [tags.INTERFACE, tags.ALLOW_ADDRESSES],
+       }),
+    CHANNEL: dict({
+            "help": "Wireless channel",
+            "category": FC.CATEGORY_DEVICES,
+            "create_function": create_channel,
+            "box_attributes": ["mode", "type", "channel", "essid"],
+            "connector_types": ["devs"],
+       }),
+    APPLICATION: dict({
+            "help": "Generic executable command line application",
+            "category": FC.CATEGORY_APPLICATIONS,
+            "create_function": create_application,
+            "start_function": start_application,
+            "stop_function": stop_application,
+            "status_function": status_application,
+            "box_attributes": ["appId", "arguments", "path"],
+            "connector_types": ["node"],
+            "tags": [tags.APPLICATION],
+        }),
+})
+
+testbed_attributes = dict({
+    "enable_debug": dict({
+            "name": "enableDebug",
+            "help": "Enable netns debug output",
+            "type": Attribute.BOOL,
+            "value": False,
+            "validation_function": validation.is_bool
+        }),
+    "xmppSlice": dict({
+                "name": "xmppSlice",
+                "help": "OMF slice",
+                "type": Attribute.STRING,
+                "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
+                "validation_function": validation.is_string
+            }),
+    "xmppHost": dict({
+                "name": "xmppHost",
+                "help": "OMF XMPP server host",
+                "type": Attribute.STRING,
+                "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
+                "validation_function": validation.is_string
+            }),
+    "xmppPort": dict({
+                "name": "xmppPort",
+                "help": "OMF XMPP service port",
+                "type": Attribute.INTEGER,
+                "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
+                "validation_function": validation.is_integer
+            }),
+    "xmppPassword": dict({
+                "name": "xmppPassword",
+                "help": "OMF XMPP slice password",
+                "type": Attribute.STRING,
+                "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
+                "validation_function": validation.is_string
+            }),
+    })
+
+supported_recovery_policies = [
+        DC.POLICY_FAIL,
+    ]
+
+class MetadataInfo(metadata.MetadataInfo):
+    @property
+    def connector_types(self):
+        return connector_types
+
+    @property
+    def connections(self):
+        return connections
+
+    @property
+    def attributes(self):
+        return attributes
+
+    @property
+    def traces(self):
+        return traces
+
+    @property
+    def create_order(self):
+        return create_order
+
+    @property
+    def configure_order(self):
+        return configure_order
+
+    @property
+    def factories_info(self):
+        return factories_info
+
+    @property
+    def testbed_attributes(self):
+        return testbed_attributes
+
+    @property
+    def testbed_id(self):
+        return TESTBED_ID
+
+    @property
+    def testbed_version(self):
+        return TESTBED_VERSION
+    
+    @property
+    def supported_recover_policies(self):
+        return supported_recovery_policies
+
diff --git a/src/nepi/testbeds/omf/omf_client.py b/src/nepi/testbeds/omf/omf_client.py
new file mode 100644 (file)
index 0000000..3100a46
--- /dev/null
@@ -0,0 +1,159 @@
+import logging
+import sleekxmpp
+from sleekxmpp.exceptions import IqError, IqTimeout
+import traceback
+from xml.etree import cElementTree as ET
+
+class OMFClient(sleekxmpp.ClientXMPP):
+    def __init__(self, jid, password):
+        sleekxmpp.ClientXMPP.__init__(self, jid, password)
+        self._ready = False
+        self._registered = False
+        self._server = None
+
+        self.register_plugin('xep_0077') # In-band registration
+        self.register_plugin('xep_0030')
+        self.register_plugin('xep_0059')
+        self.register_plugin('xep_0060') # PubSub 
+
+        self.add_event_handler("session_start", self.start)
+        self.add_event_handler("register", self.register)
+    
+    @property
+    def ready(self):
+        return self._ready
+
+    def start(self, event):
+        self.send_presence()
+        self._ready = True
+        self._server = "pubsub.%s" % self.boundjid.domain
+
+    def register(self, iq):
+        if self._registered:
+            logging.info("%s already registered!" % self.boundjid)
+            return 
+
+        resp = self.Iq()
+        resp['type'] = 'set'
+        resp['register']['username'] = self.boundjid.user
+        resp['register']['password'] = self.password
+
+        try:
+            resp.send(now=True)
+            logging.info("Account created for %s!" % self.boundjid)
+            self._registered = True
+        except IqError as e:
+            logging.error("Could not register account: %s" %
+                    e.iq['error']['text'])
+        except IqTimeout:
+            logging.error("No response from server.")
+
+    def unregister(self):
+        try:
+            self.plugin['xep_0077'].cancel_registration(
+                ifrom=self.boundjid.full)
+            logging.info("Account unregistered for %s!" % self.boundjid)
+        except IqError as e:
+            logging.error("Could not unregister account: %s" %
+                    e.iq['error']['text'])
+        except IqTimeout:
+            logging.error("No response from server.")
+
+    def nodes(self):
+        try:
+            result = self['xep_0060'].get_nodes(self._server)
+            for item in result['disco_items']['items']:
+                print(' - %s' % str(item))
+            return result
+        except:
+            print traceback.format_exc()
+            logging.error('Could not retrieve node list.')
+
+    def suscriptions(self):
+        try:
+            result = self['xep_0060'].get_subscriptions(self._server)
+                #self.boundjid.full)
+            for node in result['node']:
+                print(' - %s' % str(node))
+            return result
+        except:
+            print traceback.format_exc()
+            logging.error('Could not retrieve suscriptions.')
+
+
+    def create(self, node):
+        config = self['xep_0004'].makeForm('submit')
+        config.add_field(var='pubsub#node_type', value='leaf')
+        config.add_field(var='pubsub#notify_retract', value='0')
+        config.add_field(var='pubsub#publish_model', value='open')
+        config.add_field(var='pubsub#persist_items', value='1')
+        config.add_field(var='pubsub#max_items', value='1')
+        config.add_field(var='pubsub#title', value=node)
+
+        try:
+            self['xep_0060'].create_node(self._server, node, config = config)
+        except:
+            print traceback.format_exc()
+            logging.error('Could not create node: %s' % node)
+
+    def delete(self, node):
+        try:
+            self['xep_0060'].delete_node(self._server, node)
+            print('Deleted node: %s' % node)
+        except:
+            print traceback.format_exc()
+            logging.error('Could not delete node: %s' % node)
+
+    
+    def publish(self, data, node):
+        try:
+            result = self['xep_0060'].publish(self._server,node,payload=data)
+            id = result['pubsub']['publish']['item']['id']
+            print('Published at item id: %s' % id)
+        except:
+            print traceback.format_exc()
+            logging.error('Could not publish to: %s' % self.boundjid)
+
+    def get(self, data):
+        try:
+            result = self['xep_0060'].get_item(self._server, self.boundjid,
+                data)
+            for item in result['pubsub']['items']['substanzas']:
+                print('Retrieved item %s: %s' % (item['id'], tostring(item['payload'])))
+        except:
+            print traceback.format_exc()
+            logging.error('Could not retrieve item %s from node %s' % (data, self.boundjid))
+
+    def retract(self, data):
+        try:
+            result = self['xep_0060'].retract(self._server, self.boundjid, data)
+            print('Retracted item %s from node %s' % (data, self.boundjid))
+        except:
+            print traceback.format_exc()
+            logging.error('Could not retract item %s from node %s' % (data, self.boundjid))
+
+    def purge(self):
+        try:
+            result = self['xep_0060'].purge(self._server, self.boundjid)
+            print('Purged all items from node %s' % self.boundjid)
+        except:
+            print traceback.format_exc()
+            logging.error('Could not purge items from node %s' % self.boundjid)
+
+    def subscribe(self, node):
+        try:
+            result = self['xep_0060'].subscribe(self._server, node)
+            print('Subscribed %s to node %s' % (self.boundjid.bare, self.boundjid))
+        except:
+            print traceback.format_exc()
+            logging.error('Could not subscribe %s to node %s' % (self.boundjid.bare, node))
+
+    def unsubscribe(self, node):
+        try:
+            result = self['xep_0060'].unsubscribe(self._server, node)
+            print('Unsubscribed %s from node %s' % (self.boundjid.bare, node))
+        except:
+            print traceback.format_exc()
+            logging.error('Could not unsubscribe %s from node %s' % (self.boundjid.bare, node))
+
+
diff --git a/src/nepi/testbeds/omf/omf_messages.py b/src/nepi/testbeds/omf/omf_messages.py
new file mode 100644 (file)
index 0000000..f4c99c5
--- /dev/null
@@ -0,0 +1,129 @@
+from xml.etree import cElementTree as ET
+
+EXECUTE = "EXECUTE"
+KILL = "KILL"
+STDIN = "STDIN"
+NOOP = "NOOP"
+PM_INSTALL = "PM_INSTALL"
+APT_INSTALL = "APT_INSTALL"
+RPM_INSTALL = "RPM_INSTALL"
+RESET = "RESET"
+REBOOT = "REBOOT"
+MODPROBE = "MODPROBE"
+CONFIGURE = "CONFIGURE"
+LOAD_IMAGE = "LOAD_IMAGE"
+SAVE_IMAGE = "SAVE_IMAGE"
+LOAD_DATA = "LOAD_DATA"
+SET_LINK = "SET_LINK"
+ALIAS = "ALIAS"
+SET_DISCONNECTION = "SET_DISCONNECTION"
+RESTART = "RESTART"
+ENROLL = "ENROLL"
+EXIT = "EXIT" 
+
+class MessageHandler():
+    SliceID = ""
+    ExpID = ""
+
+    def __init__(self, sliceid, expid ):
+        self.SliceID = sliceid
+        self.ExpID = expid
+        print "init" + self.ExpID +"  "+ self.SliceID
+        pass
+
+
+    def Mid(self, parent, keyword):
+        mid = ET.SubElement(parent, keyword)
+        mid.set("id", "\'omf-payload\'")
+        return mid
+
+    def Mtext(self, parent, keyword, text):
+        mtext = ET.SubElement(parent, keyword)
+        mtext.text = text
+        return mtext
+
+
+    def executefunction(self, target, appid, cmdlineargs, path):
+        payload = ET.Element("omf-message")
+        execute = self.Mid(payload,"EXECUTE")
+        env = self.Mtext(execute, "ENV", "")
+        sliceid = self.Mtext(execute,"SLICEID",self.SliceID)
+        expid = self.Mtext(execute,"EXPID",self.ExpID)
+        target = self.Mtext(execute,"TARGET",target)
+        appid = self.Mtext(execute,"APPID",appid)
+        cmdlineargs = self.Mtext(execute,"CMDLINEARGS",cmdlineargs)
+        path = self.Mtext(execute,"PATH",path)
+        return payload
+
+    def configurefunction(self, target, value, path):
+        payload = ET.Element("omf-message")
+        config = self.Mid(payload, "CONFIGURE")
+        sliceid = self.Mtext(config,"SLICEID",self.SliceID)
+        expid = self.Mtext(config,"EXPID",self.ExpID)
+        target = self.Mtext(config,"TARGET",target)
+        value = self.Mtext(config,"VALUE",value)
+        path = self.Mtext(config,"PATH",path)
+        return payload
+
+    def logfunction(self,level, logger, level_name, data):
+        payload = ET.Element("omf-message")
+        log = self.Mid(payload, "LOGGING")
+        level = self.Mtext(log,"LEVEL",level)
+        sliceid = self.Mtext(log,"SLICEID",self.SliceID)
+        logger = self.Mtext(log,"LOGGER",logger)
+        expid = self.Mtext(log,"EXPID",self.ExpID)
+        level_name = self.Mtext(log,"LEVEL_NAME",level_name)
+        data = self.Mtext(log,"DATA",data)
+        return payload
+
+    def aliasfunction(self, name, target):
+        payload = ET.Element("omf-message")
+        alias = self.Mid(payload,"ALIAS")
+        sliceid = self.Mtext(alias,"SLICEID",self.SliceID)
+        expid = self.Mtext(alias,"EXPID",self.ExpID)
+        name = self.Mtext(alias,"NAME",name)
+        target = self.Mtext(alias,"TARGET",target)
+        return payload
+
+    def enrollfunction(self, enrollkey, image, index, target ):
+        payload = ET.Element("omf-message")
+        enroll = self.Mid(payload,"ENROLL")
+        enrollkey = self.Mtext(enroll,"ENROLLKEY",enrollkey)
+        sliceid = self.Mtext(enroll,"SLICEID",self.SliceID)
+        image = self.Mtext(enroll,"IMAGE",image)
+        expid = self.Mtext(enroll,"EXPID",self.ExpID)
+        index = self.Mtext(enroll,"INDEX",index)
+        target = self.Mtext(enroll,"TARGET",target)
+        return payload
+
+    def noopfunction(self,target):
+        payload = ET.Element("omf-message")
+        noop = self.Mid(payload,"NOOP")
+        sliceid = self.Mtext(noop,"SLICEID",self.SliceID)
+        expid = self.Mtext(noop,"EXPID",self.ExpID)
+        target = self.Mtext(noop,"TARGET",target)
+        return payload
+
+    def enrollfunction(self, enrollkey, image, index, target ):
+        payload = ET.Element("omf-message")
+        enroll = self.Mid(payload,"ENROLL")
+        enrollkey = self.Mtext(enroll,"ENROLLKEY",enrollkey)
+        sliceid = self.Mtext(enroll,"SLICEID",self.SliceID)
+        image = self.Mtext(enroll,"IMAGE",image)
+        expid = self.Mtext(enroll,"EXPID",self.ExpID)
+        index = self.Mtext(enroll,"INDEX",index)
+        target = self.Mtext(enroll,"TARGET",target)
+        return payload
+
+    def newexpfunction(self, experimentid, address):
+        payload = ET.Element("omf-message")
+        newexp = self.Mid(payload,"EXPERIMENT_NEW")
+        experimentid = self.Mtext(newexp,"EXPERIMENT_ID",experimentid)
+        sliceid = self.Mtext(newexp,"SLICEID",self.SliceID)
+        expid = self.Mtext(newexp,"EXPID",self.ExpID)
+        address = self.Mtext(newexp,"ADDRESS",address)
+        return payload
+
+    def handle_message(self, msg):
+        # Do something!!!
+        return msg