From 53ed35520b540c8c76e0a2266b053f0e30b2a7c8 Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Mon, 20 Aug 2012 19:46:17 +0200 Subject: [PATCH] Adding baskend for OMF --- examples/omf_vlc.py | 88 +++++++ src/nepi/testbeds/omf/__init__.py | 5 + src/nepi/testbeds/omf/constants.py | 5 + src/nepi/testbeds/omf/execute.py | 160 ++++++++++++ src/nepi/testbeds/omf/metadata.py | 347 ++++++++++++++++++++++++++ src/nepi/testbeds/omf/omf_client.py | 159 ++++++++++++ src/nepi/testbeds/omf/omf_messages.py | 129 ++++++++++ 7 files changed, 893 insertions(+) create mode 100644 examples/omf_vlc.py create mode 100644 src/nepi/testbeds/omf/__init__.py create mode 100644 src/nepi/testbeds/omf/constants.py create mode 100644 src/nepi/testbeds/omf/execute.py create mode 100644 src/nepi/testbeds/omf/metadata.py create mode 100644 src/nepi/testbeds/omf/omf_client.py create mode 100644 src/nepi/testbeds/omf/omf_messages.py diff --git a/examples/omf_vlc.py b/examples/omf_vlc.py new file mode 100644 index 00000000..e5aac6c2 --- /dev/null +++ b/examples/omf_vlc.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# +# Experiment Topology: +# +# n1 --- n2 +# 0.1 0.2 +# + +from nepi.core.design import ExperimentDescription, FactoriesProvider +from nepi.core.execute import ExperimentController +import getpass +import tempfile +import time + +root_dir = tempfile.mkdtemp() + +exp_desc = ExperimentDescription() + +testbed_id = "omf" +omf_provider = FactoriesProvider(testbed_id) +omf_desc = exp_desc.add_testbed_description(omf_provider) +omf_desc.set_attribute_value("homeDirectory", root_dir) +omf_desc.set_attribute_value("enableDebug", True) +omf_desc.set_attribute_value("xmppSlice", "default_slice") +omf_desc.set_attribute_value("xmppHost", "xmpp-omf.onelab.eu") +omf_desc.set_attribute_value("xmppPort", 5222) +omf_desc.set_attribute_value("xmppPassword", "******") + +node1 = omf_desc.create("Node") +node1.set_attribute_value("hostname", "omf.my.wlab18") +node2 = omf_desc.create("Node") +node2.set_attribute_value("hostname", "omf.my.wlab49") + +iface12 = omf_desc.create("WifiInterface") +iface12.set_attribute_value("mode", "adhoc") +iface12.set_attribute_value("channel", "g") +iface12.set_attribute_value("type", "g") +iface12.set_attribute_value("essid", "cvlcmode") +node1.connector("devs").connect(iface12.connector("node")) + +iface21 = omf_desc.create("WifiInterface") +iface21.set_attribute_value("mode", "adhoc") +iface21.set_attribute_value("channel", "g") +iface21.set_attribute_value("type", "g") +iface21.set_attribute_value("essid", "cvlcmode") +node2.connector("devs").connect(iface21.connector("node")) + +ip12 = iface12.add_address() +ip12.set_attribute_value("Address", "192.168.0.18") + +ip21 = iface21.add_address() +ip21.set_attribute_value("Address", "192.168.0.49") + +channel = omf_desc.create("Channel") +channel.set_attribute_value("mode", "adhoc") +channel.set_attribute_value("channel", "g") +channel.set_attribute_value("type", "g") +channel.set_attribute_value("essid", "cvlcmode") +channel.connector("devs").connect(iface12.connector("chan")) +channel.connector("devs").connect(iface21.connector("chan")) + +app2 = omf_desc.create("Application") +app2.set_attribute_value("appId", "Vlc#2") +app2.set_attribute_value("arguments", "rtp://239.255.0.1:1234") +app2.set_attribute_value("path", "/opt/vlc-1.1.13/vlc") +app2.connector("node").connect(node2.connector("apps")) + +app1 = omf_desc.create("Application") +app1.set_attribute_value("appId", "Vlc#1") +app1.set_attribute_value("arguments", "/opt/10-by-p0d.avi --sout '#duplicate{dst=display,dst=rtp{mux=ts,dst=239.255.0.1,port=1234}}'") +app1.set_attribute_value("path", "/opt/vlc-1.1.13/vlc") +app1.connector("node").connect(node1.connector("apps")) + +xml = exp_desc.to_xml() + +controller = ExperimentController(xml, root_dir) +controller.start() +#while not (controller.is_finished(app1.guid) and \ +# controller.is_finished(app2.guid)): +# time.sleep(0.5) + +time.sleep(30) + +controller.stop() +controller.shutdown() + diff --git a/src/nepi/testbeds/omf/__init__.py b/src/nepi/testbeds/omf/__init__.py new file mode 100644 index 00000000..d3ef5dd6 --- /dev/null +++ b/src/nepi/testbeds/omf/__init__.py @@ -0,0 +1,5 @@ +# -*- coding: utf-8 -*- + +from constants import TESTBED_ID, TESTBED_ID +from execute import TestbedController + diff --git a/src/nepi/testbeds/omf/constants.py b/src/nepi/testbeds/omf/constants.py new file mode 100644 index 00000000..dc96b015 --- /dev/null +++ b/src/nepi/testbeds/omf/constants.py @@ -0,0 +1,5 @@ +# -*- coding: utf-8 -*- + +TESTBED_ID = "omf" +TESTBED_VERSION = "5.4" + diff --git a/src/nepi/testbeds/omf/execute.py b/src/nepi/testbeds/omf/execute.py new file mode 100644 index 00000000..dccb80cc --- /dev/null +++ b/src/nepi/testbeds/omf/execute.py @@ -0,0 +1,160 @@ +# -*- coding: utf-8 -*- + +from constants import TESTBED_ID, TESTBED_VERSION +from nepi.core import testbed_impl +from nepi.util.constants import TIME_NOW + +import datetime +import logging +import os +import sys +import ssl +import time + +from nepi.testbeds.omf.omf_client import OMFClient +from nepi.testbeds.omf.omf_messages import MessageHandler + + +class TestbedController(testbed_impl.TestbedController): + def __init__(self): + super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION) + self._slice = None + self._user = None + self._host = None + self._xmpp = None + self._message = None + self._home = None + + self._logger = logging.getLogger('nepi.testbeds.omf') + + def do_setup(self): + if self._attributes.get_attribute_value("enableDebug") == True: + self._logger.setLevel(logging.DEBUG) + + # create home + self._home = self._attributes.\ + get_attribute_value("homeDirectory") + home = os.path.normpath(self._home) + if not os.path.exists(home): + os.makedirs(home, 0755) + + # instantiate the xmpp client + self._init_client() + # register xmpp nodes for the experiment + self._publish_and_enroll_experiment() + # register xmpp logger for the experiment + self._publish_and_enroll_logger() + + super(TestbedController, self).do_setup() + + def set(self, guid, name, value, time = TIME_NOW): + super(TestbedController, self).set(guid, name, value, time) + pass + + def get(self, guid, name, time = TIME_NOW): + value = super(TestbedController, self).get(guid, name, time) + return "MISS" + + def shutdown(self): + node_sid = "/OMF/%s/%s" % (self._slice, self._user) + self._clean_up(node_sid) + logger = "/OMF/%s/%s/LOGGER" % (self._slice, self._user) + self._clean_up(logger) + + for hostname in self._elements.values(): + if not hostname: + continue + node_sid = self._host_sid(hostname) + self._clean_up(node_sid) + #node_res = self._host_res(hostname) + #self._clean_up(node_res) + + time.sleep(5) + self._xmpp.disconnect() + + def _host_sid(self, hostname): + return "/OMF/%s/%s/%s" % (self._slice, self._user, hostname) + + def _host_res(self, hostname): + return "/OMF/%s/resources/%s" % (self._slice, hostname) + + def _init_client(self): + self._slice = self._attributes.get_attribute_value("xmppSlice") + self._host = self._attributes.get_attribute_value("xmppHost") + port = self._attributes.get_attribute_value("xmppPort") + password = self._attributes.get_attribute_value("xmppPassword") + + #date = "2012-04-18t16.06.34+02.00" + date = datetime.datetime.now().strftime("%Y-%m-%dt%H.%M.%S+02.00") + self._user = "%s-%s" % (self._slice, date) + jid = "%s@%s" % (self._user, self._host) + + xmpp = OMFClient(jid, password) + # PROTOCOL_SSLv3 required for compatibility with OpenFire + xmpp.ssl_version = ssl.PROTOCOL_SSLv3 + + if xmpp.connect((self._host, port)): + xmpp.process(threaded=True) + while not xmpp.ready: + time.sleep(1) + self._xmpp = xmpp + self._message = MessageHandler(self._slice, self._user) + else: + msg = "Unable to connect to the XMPP server." + self._logger.error(msg) + raise RuntimeError(msg) + + def _publish_and_enroll_experiment(self): + node_sid = "/OMF/%s/%s" % (self._slice, self._user) + self._create_and_subscribe(node_sid) + + node_slice = "/OMF/%s" % (self._slice) + address = "/%s/OMF/%s/%s" % (self._host, self._slice, self._user) + payload = self._message.newexpfunction(self._user, address) + self._xmpp.publish(payload, node_slice) + + def _publish_and_enroll_logger(self): + logger = "/OMF/%s/%s/LOGGER" % (self._slice, self._user) + self._create_and_subscribe(logger) + + payload = self._message.logfunction("2", + "nodeHandler::NodeHandler", + "INFO", + "OMF Experiment Controller 5.4 (git 529a626)") + self._xmpp.publish(payload, logger) + + def _clean_up(self, xmpp_node): + self._xmpp.delete(xmpp_node) + + if sys.version_info < (3, 0): + reload(sys) + sys.setdefaultencoding('utf8') + + def _create_and_subscribe(self, xmpp_node): + self._xmpp.suscriptions() + self._xmpp.create(xmpp_node) + self._xmpp.subscribe(xmpp_node) + self._xmpp.nodes() + + def _publish_and_enroll_host(self, hostname): + node_sid = self._host_sid(hostname) + self._create_and_subscribe(node_sid) + + node_res = self._host_res(hostname) + self._create_and_subscribe(node_res) + + payload = self._message.enrollfunction("1", "*", "1", hostname) + self._xmpp.publish(payload, node_res) + + def _publish_configure(self, hostname, attribute, value): + payload = self._message.configurefunction(hostname, value, attribute) + node_sid = self._host_sid(hostname) + self._xmpp.publish(payload, node_sid) + + def _publish_execute(self, hostname, app_id, arguments, path): + payload = self._message.executefunction(hostname, app_id, arguments, path) + node_sid = self._host_sid(hostname) + self._xmpp.publish(payload, node_sid) + + + diff --git a/src/nepi/testbeds/omf/metadata.py b/src/nepi/testbeds/omf/metadata.py new file mode 100644 index 00000000..3a86db05 --- /dev/null +++ b/src/nepi/testbeds/omf/metadata.py @@ -0,0 +1,347 @@ +# -*- coding: utf-8 -*- + +from constants import TESTBED_ID, TESTBED_VERSION +from nepi.core import metadata +from nepi.core.attributes import Attribute +from nepi.util import tags, validation +from nepi.util.constants import ApplicationStatus as AS, \ + FactoryCategories as FC, DeploymentConfiguration as DC + +# Factories +NODE = "Node" +WIFIIFACE = "WifiInterface" +ETHIFACE = "EthInterface" +CHANNEL = "Channel" +APPLICATION = "Application" + +### Connection functions #### + +### Creation functions ### + +def create_node(testbed_instance, guid): + parameters = testbed_instance._get_parameters(guid) + hostname = parameters['hostname'] + testbed_instance._elements[guid] = hostname + testbed_instance._publish_and_enroll_host(hostname) + +def create_wifiiface(testbed_instance, guid): + pass + +def create_ethiface(testbed_instance, guid): + pass + +def create_channel(testbed_instance, guid): + pass + +def create_application(testbed_instance, guid): + pass + +### Start/Stop functions ### + +def start_application(testbed_instance, guid): + # search for the node asociated with the device + node_guids = testbed_instance.get_connected(guid, "node", "apps") + if len(node_guids) == 0: + raise RuntimeError("Can't instantiate interface %d outside node" % guid) + + # node attributes + node_parameters = testbed_instance._get_parameters(node_guids[0]) + hostname = node_parameters['hostname'] + + # application attributes + parameters = testbed_instance._get_parameters(guid) + app_id = parameters.get("appId") + arguments = parameters.get("arguments") + path = parameters.get("path") + testbed_instance._publish_execute(hostname, app_id, arguments, path) + +def stop_application(testbed_instance, guid): + pass + +### Status functions ### + +def status_application(testbed_instance, guid): + if guid not in testbed_instance.elements.keys(): + return AS.STATUS_NOT_STARTED + return AS.STATUS_RUNNING + # TODO!!!! + #return AS.STATUS_FINISHED + +### Configure functions ### + +def configure_wifiiface(testbed_instance, guid): + # search for the node asociated with the device + node_guids = testbed_instance.get_connected(guid, "node", "devs") + if len(node_guids) == 0: + raise RuntimeError("Can't instantiate interface %d outside node" % guid) + + # node attributes + node_parameters = testbed_instance._get_parameters(node_guids[0]) + hostname = node_parameters['hostname'] + + # wifi iface attributes + parameters = testbed_instance._get_parameters(guid) + + for attr in ["mode", "type", "channel", "essid"]: + attribute = "net/w0/%s" % attr + value = parameters.get(attr) + if value: + testbed_instance._publish_configure(hostname, attribute, value) + + if guid in testbed_instance._add_address: + attribute = "net/w0/ip" + addresses = testbed_instance._add_address[guid] + (value, netprefix, broadcast) = addresses[0] + testbed_instance._publish_configure(hostname, attribute, value) + +### Factory information ### + +connector_types = dict({ + "apps": dict({ + "help": "Connector from node to applications", + "name": "apps", + "max": -1, + "min": 0 + }), + "devs": dict({ + "help": "Connector to network interfaces", + "name": "devs", + "max": -1, + "min": 0 + }), + "chan": dict({ + "help": "Connector from a device to a channel", + "name": "chan", + "max": 1, + "min": 1 + }), + "node": dict({ + "help": "Connector to a Node", + "name": "node", + "max": 1, + "min": 1 + }), + }) + +connections = [ + dict({ + "from": (TESTBED_ID, NODE, "devs"), + "to": (TESTBED_ID, WIFIIFACE, "node"), + "can_cross": False + }), + dict({ + "from": (TESTBED_ID, NODE, "devs"), + "to": (TESTBED_ID, ETHIFACE, "node"), + "can_cross": False + }), + dict({ + "from": (TESTBED_ID, WIFIIFACE, "chan"), + "to": (TESTBED_ID, CHANNEL, "devs"), + "can_cross": False + }), + dict({ + "from": (TESTBED_ID, NODE, "apps"), + "to": (TESTBED_ID, APPLICATION, "node"), + "can_cross": False + }), + ] + +attributes = dict({ + "appId": dict({ + "name": "appId", + "help": "Application id", + "type": Attribute.STRING, + "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable, + "validation_function": validation.is_string + }), + "arguments": dict({ + "name": "arguments", + "help": "Application arguments", + "type": Attribute.STRING, + "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable, + "validation_function": validation.is_string + }), + "path": dict({ + "name": "path", + "help": "Path to binary (e.g '/opt/vlc-1.1.13/vlc')", + "type": Attribute.STRING, + "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable, + "validation_function": validation.is_string + }), + "hostname": dict({ + "name": "hostname", + "help": "Hostname for the target OMF node", + "type": Attribute.STRING, + "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable, + "validation_function": validation.is_string + }), + "mode": dict({ + "name": "mode", + "help": "Corresponds to the OMF attributes net/w0/mode", + "type": Attribute.STRING, + "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable, + "validation_function": validation.is_string + }), + "type": dict({ + "name": "type", + "help": "Corresponds to the OMF attributes net/w0/type", + "type": Attribute.STRING, + "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable, + "validation_function": validation.is_string + }), + "channel": dict({ + "name": "channel", + "help": "Corresponds to the OMF attributes net/w0/channel", + "type": Attribute.STRING, + "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable, + "validation_function": validation.is_string + }), + "essid": dict({ + "name": "essid", + "help": "Corresponds to the OMF attributes net/w0/essid", + "type": Attribute.STRING, + "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable, + "validation_function": validation.is_string + }), + + + }) + +traces = dict() + +create_order = [ NODE, WIFIIFACE, ETHIFACE, CHANNEL, APPLICATION ] +configure_order = [ WIFIIFACE, ETHIFACE, NODE, CHANNEL, APPLICATION ] + +factories_info = dict({ + NODE: dict({ + "help": "OMF Node", + "category": FC.CATEGORY_NODES, + "create_function": create_node, + "box_attributes": ["hostname"], + "connector_types": ["devs", "apps"], + "tags": [tags.NODE, tags.ALLOW_ROUTES], + }), + WIFIIFACE: dict({ + "help": "Wireless network interface", + "category": FC.CATEGORY_DEVICES, + "create_function": create_wifiiface, + "configure_function": configure_wifiiface, + "box_attributes": ["mode", "type", "channel", "essid"], + "connector_types": ["node", "chan"], + "tags": [tags.INTERFACE, tags.ALLOW_ADDRESSES], + }), + ETHIFACE: dict({ + "help": "Ethernet network interface", + "category": FC.CATEGORY_DEVICES, + "create_function": create_ethiface, + #"box_attributes": [""], + "connector_types": ["node"], + "tags": [tags.INTERFACE, tags.ALLOW_ADDRESSES], + }), + CHANNEL: dict({ + "help": "Wireless channel", + "category": FC.CATEGORY_DEVICES, + "create_function": create_channel, + "box_attributes": ["mode", "type", "channel", "essid"], + "connector_types": ["devs"], + }), + APPLICATION: dict({ + "help": "Generic executable command line application", + "category": FC.CATEGORY_APPLICATIONS, + "create_function": create_application, + "start_function": start_application, + "stop_function": stop_application, + "status_function": status_application, + "box_attributes": ["appId", "arguments", "path"], + "connector_types": ["node"], + "tags": [tags.APPLICATION], + }), +}) + +testbed_attributes = dict({ + "enable_debug": dict({ + "name": "enableDebug", + "help": "Enable netns debug output", + "type": Attribute.BOOL, + "value": False, + "validation_function": validation.is_bool + }), + "xmppSlice": dict({ + "name": "xmppSlice", + "help": "OMF slice", + "type": Attribute.STRING, + "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable, + "validation_function": validation.is_string + }), + "xmppHost": dict({ + "name": "xmppHost", + "help": "OMF XMPP server host", + "type": Attribute.STRING, + "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable, + "validation_function": validation.is_string + }), + "xmppPort": dict({ + "name": "xmppPort", + "help": "OMF XMPP service port", + "type": Attribute.INTEGER, + "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable, + "validation_function": validation.is_integer + }), + "xmppPassword": dict({ + "name": "xmppPassword", + "help": "OMF XMPP slice password", + "type": Attribute.STRING, + "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable, + "validation_function": validation.is_string + }), + }) + +supported_recovery_policies = [ + DC.POLICY_FAIL, + ] + +class MetadataInfo(metadata.MetadataInfo): + @property + def connector_types(self): + return connector_types + + @property + def connections(self): + return connections + + @property + def attributes(self): + return attributes + + @property + def traces(self): + return traces + + @property + def create_order(self): + return create_order + + @property + def configure_order(self): + return configure_order + + @property + def factories_info(self): + return factories_info + + @property + def testbed_attributes(self): + return testbed_attributes + + @property + def testbed_id(self): + return TESTBED_ID + + @property + def testbed_version(self): + return TESTBED_VERSION + + @property + def supported_recover_policies(self): + return supported_recovery_policies + diff --git a/src/nepi/testbeds/omf/omf_client.py b/src/nepi/testbeds/omf/omf_client.py new file mode 100644 index 00000000..3100a46c --- /dev/null +++ b/src/nepi/testbeds/omf/omf_client.py @@ -0,0 +1,159 @@ +import logging +import sleekxmpp +from sleekxmpp.exceptions import IqError, IqTimeout +import traceback +from xml.etree import cElementTree as ET + +class OMFClient(sleekxmpp.ClientXMPP): + def __init__(self, jid, password): + sleekxmpp.ClientXMPP.__init__(self, jid, password) + self._ready = False + self._registered = False + self._server = None + + self.register_plugin('xep_0077') # In-band registration + self.register_plugin('xep_0030') + self.register_plugin('xep_0059') + self.register_plugin('xep_0060') # PubSub + + self.add_event_handler("session_start", self.start) + self.add_event_handler("register", self.register) + + @property + def ready(self): + return self._ready + + def start(self, event): + self.send_presence() + self._ready = True + self._server = "pubsub.%s" % self.boundjid.domain + + def register(self, iq): + if self._registered: + logging.info("%s already registered!" % self.boundjid) + return + + resp = self.Iq() + resp['type'] = 'set' + resp['register']['username'] = self.boundjid.user + resp['register']['password'] = self.password + + try: + resp.send(now=True) + logging.info("Account created for %s!" % self.boundjid) + self._registered = True + except IqError as e: + logging.error("Could not register account: %s" % + e.iq['error']['text']) + except IqTimeout: + logging.error("No response from server.") + + def unregister(self): + try: + self.plugin['xep_0077'].cancel_registration( + ifrom=self.boundjid.full) + logging.info("Account unregistered for %s!" % self.boundjid) + except IqError as e: + logging.error("Could not unregister account: %s" % + e.iq['error']['text']) + except IqTimeout: + logging.error("No response from server.") + + def nodes(self): + try: + result = self['xep_0060'].get_nodes(self._server) + for item in result['disco_items']['items']: + print(' - %s' % str(item)) + return result + except: + print traceback.format_exc() + logging.error('Could not retrieve node list.') + + def suscriptions(self): + try: + result = self['xep_0060'].get_subscriptions(self._server) + #self.boundjid.full) + for node in result['node']: + print(' - %s' % str(node)) + return result + except: + print traceback.format_exc() + logging.error('Could not retrieve suscriptions.') + + + def create(self, node): + config = self['xep_0004'].makeForm('submit') + config.add_field(var='pubsub#node_type', value='leaf') + config.add_field(var='pubsub#notify_retract', value='0') + config.add_field(var='pubsub#publish_model', value='open') + config.add_field(var='pubsub#persist_items', value='1') + config.add_field(var='pubsub#max_items', value='1') + config.add_field(var='pubsub#title', value=node) + + try: + self['xep_0060'].create_node(self._server, node, config = config) + except: + print traceback.format_exc() + logging.error('Could not create node: %s' % node) + + def delete(self, node): + try: + self['xep_0060'].delete_node(self._server, node) + print('Deleted node: %s' % node) + except: + print traceback.format_exc() + logging.error('Could not delete node: %s' % node) + + + def publish(self, data, node): + try: + result = self['xep_0060'].publish(self._server,node,payload=data) + id = result['pubsub']['publish']['item']['id'] + print('Published at item id: %s' % id) + except: + print traceback.format_exc() + logging.error('Could not publish to: %s' % self.boundjid) + + def get(self, data): + try: + result = self['xep_0060'].get_item(self._server, self.boundjid, + data) + for item in result['pubsub']['items']['substanzas']: + print('Retrieved item %s: %s' % (item['id'], tostring(item['payload']))) + except: + print traceback.format_exc() + logging.error('Could not retrieve item %s from node %s' % (data, self.boundjid)) + + def retract(self, data): + try: + result = self['xep_0060'].retract(self._server, self.boundjid, data) + print('Retracted item %s from node %s' % (data, self.boundjid)) + except: + print traceback.format_exc() + logging.error('Could not retract item %s from node %s' % (data, self.boundjid)) + + def purge(self): + try: + result = self['xep_0060'].purge(self._server, self.boundjid) + print('Purged all items from node %s' % self.boundjid) + except: + print traceback.format_exc() + logging.error('Could not purge items from node %s' % self.boundjid) + + def subscribe(self, node): + try: + result = self['xep_0060'].subscribe(self._server, node) + print('Subscribed %s to node %s' % (self.boundjid.bare, self.boundjid)) + except: + print traceback.format_exc() + logging.error('Could not subscribe %s to node %s' % (self.boundjid.bare, node)) + + def unsubscribe(self, node): + try: + result = self['xep_0060'].unsubscribe(self._server, node) + print('Unsubscribed %s from node %s' % (self.boundjid.bare, node)) + except: + print traceback.format_exc() + logging.error('Could not unsubscribe %s from node %s' % (self.boundjid.bare, node)) + + diff --git a/src/nepi/testbeds/omf/omf_messages.py b/src/nepi/testbeds/omf/omf_messages.py new file mode 100644 index 00000000..f4c99c59 --- /dev/null +++ b/src/nepi/testbeds/omf/omf_messages.py @@ -0,0 +1,129 @@ +from xml.etree import cElementTree as ET + +EXECUTE = "EXECUTE" +KILL = "KILL" +STDIN = "STDIN" +NOOP = "NOOP" +PM_INSTALL = "PM_INSTALL" +APT_INSTALL = "APT_INSTALL" +RPM_INSTALL = "RPM_INSTALL" +RESET = "RESET" +REBOOT = "REBOOT" +MODPROBE = "MODPROBE" +CONFIGURE = "CONFIGURE" +LOAD_IMAGE = "LOAD_IMAGE" +SAVE_IMAGE = "SAVE_IMAGE" +LOAD_DATA = "LOAD_DATA" +SET_LINK = "SET_LINK" +ALIAS = "ALIAS" +SET_DISCONNECTION = "SET_DISCONNECTION" +RESTART = "RESTART" +ENROLL = "ENROLL" +EXIT = "EXIT" + +class MessageHandler(): + SliceID = "" + ExpID = "" + + def __init__(self, sliceid, expid ): + self.SliceID = sliceid + self.ExpID = expid + print "init" + self.ExpID +" "+ self.SliceID + pass + + + def Mid(self, parent, keyword): + mid = ET.SubElement(parent, keyword) + mid.set("id", "\'omf-payload\'") + return mid + + def Mtext(self, parent, keyword, text): + mtext = ET.SubElement(parent, keyword) + mtext.text = text + return mtext + + + def executefunction(self, target, appid, cmdlineargs, path): + payload = ET.Element("omf-message") + execute = self.Mid(payload,"EXECUTE") + env = self.Mtext(execute, "ENV", "") + sliceid = self.Mtext(execute,"SLICEID",self.SliceID) + expid = self.Mtext(execute,"EXPID",self.ExpID) + target = self.Mtext(execute,"TARGET",target) + appid = self.Mtext(execute,"APPID",appid) + cmdlineargs = self.Mtext(execute,"CMDLINEARGS",cmdlineargs) + path = self.Mtext(execute,"PATH",path) + return payload + + def configurefunction(self, target, value, path): + payload = ET.Element("omf-message") + config = self.Mid(payload, "CONFIGURE") + sliceid = self.Mtext(config,"SLICEID",self.SliceID) + expid = self.Mtext(config,"EXPID",self.ExpID) + target = self.Mtext(config,"TARGET",target) + value = self.Mtext(config,"VALUE",value) + path = self.Mtext(config,"PATH",path) + return payload + + def logfunction(self,level, logger, level_name, data): + payload = ET.Element("omf-message") + log = self.Mid(payload, "LOGGING") + level = self.Mtext(log,"LEVEL",level) + sliceid = self.Mtext(log,"SLICEID",self.SliceID) + logger = self.Mtext(log,"LOGGER",logger) + expid = self.Mtext(log,"EXPID",self.ExpID) + level_name = self.Mtext(log,"LEVEL_NAME",level_name) + data = self.Mtext(log,"DATA",data) + return payload + + def aliasfunction(self, name, target): + payload = ET.Element("omf-message") + alias = self.Mid(payload,"ALIAS") + sliceid = self.Mtext(alias,"SLICEID",self.SliceID) + expid = self.Mtext(alias,"EXPID",self.ExpID) + name = self.Mtext(alias,"NAME",name) + target = self.Mtext(alias,"TARGET",target) + return payload + + def enrollfunction(self, enrollkey, image, index, target ): + payload = ET.Element("omf-message") + enroll = self.Mid(payload,"ENROLL") + enrollkey = self.Mtext(enroll,"ENROLLKEY",enrollkey) + sliceid = self.Mtext(enroll,"SLICEID",self.SliceID) + image = self.Mtext(enroll,"IMAGE",image) + expid = self.Mtext(enroll,"EXPID",self.ExpID) + index = self.Mtext(enroll,"INDEX",index) + target = self.Mtext(enroll,"TARGET",target) + return payload + + def noopfunction(self,target): + payload = ET.Element("omf-message") + noop = self.Mid(payload,"NOOP") + sliceid = self.Mtext(noop,"SLICEID",self.SliceID) + expid = self.Mtext(noop,"EXPID",self.ExpID) + target = self.Mtext(noop,"TARGET",target) + return payload + + def enrollfunction(self, enrollkey, image, index, target ): + payload = ET.Element("omf-message") + enroll = self.Mid(payload,"ENROLL") + enrollkey = self.Mtext(enroll,"ENROLLKEY",enrollkey) + sliceid = self.Mtext(enroll,"SLICEID",self.SliceID) + image = self.Mtext(enroll,"IMAGE",image) + expid = self.Mtext(enroll,"EXPID",self.ExpID) + index = self.Mtext(enroll,"INDEX",index) + target = self.Mtext(enroll,"TARGET",target) + return payload + + def newexpfunction(self, experimentid, address): + payload = ET.Element("omf-message") + newexp = self.Mid(payload,"EXPERIMENT_NEW") + experimentid = self.Mtext(newexp,"EXPERIMENT_ID",experimentid) + sliceid = self.Mtext(newexp,"SLICEID",self.SliceID) + expid = self.Mtext(newexp,"EXPID",self.ExpID) + address = self.Mtext(newexp,"ADDRESS",address) + return payload + + def handle_message(self, msg): + # Do something!!! + return msg -- 2.43.0