From c54dadfd927979b7f27e2581a1603ec28d6d9146 Mon Sep 17 00:00:00 2001 From: Julien Tribino Date: Tue, 26 Mar 2013 10:16:32 +0100 Subject: [PATCH] Add OMF Classes using XMPP and Protocol 5.4 --- src/neco/__init__.py | 3 + src/neco/execution/ec.py | 4 +- src/neco/execution/resource.py | 12 +- src/neco/resources/omf/omf_api.py | 180 ++++++++++++++++++ src/neco/resources/omf/omf_application.py | 84 +++++++++ src/neco/resources/omf/omf_channel.py | 89 +++++++++ src/neco/resources/omf/omf_client.py | 202 +++++++++++++++++++++ src/neco/resources/omf/omf_interface.py | 82 +++++++++ src/neco/resources/omf/omf_messages_5_4.py | 125 +++++++++++++ src/neco/resources/omf/omf_node.py | 93 ++++++++++ src/neco/resources/omf/xx_omf_resource.py | 37 ++++ test/execution/resource.py | 3 + test/resources/omf/omf_vlc_exp.py | 154 ++++++++++++++++ 13 files changed, 1061 insertions(+), 7 deletions(-) create mode 100644 src/neco/resources/omf/omf_api.py create mode 100644 src/neco/resources/omf/omf_application.py create mode 100644 src/neco/resources/omf/omf_channel.py create mode 100644 src/neco/resources/omf/omf_client.py create mode 100644 src/neco/resources/omf/omf_interface.py create mode 100644 src/neco/resources/omf/omf_messages_5_4.py create mode 100644 src/neco/resources/omf/omf_node.py create mode 100644 src/neco/resources/omf/xx_omf_resource.py mode change 100644 => 100755 test/execution/resource.py create mode 100755 test/resources/omf/omf_vlc_exp.py diff --git a/src/neco/__init__.py b/src/neco/__init__.py index df19bd55..f926e6de 100644 --- a/src/neco/__init__.py +++ b/src/neco/__init__.py @@ -1,2 +1,5 @@ import logging logging.basicConfig() + +LOGLEVEL = logging.DEBUG + diff --git a/src/neco/execution/ec.py b/src/neco/execution/ec.py index 0ac35e73..cb1df2bd 100644 --- a/src/neco/execution/ec.py +++ b/src/neco/execution/ec.py @@ -39,12 +39,12 @@ class ExperimentController(object): # TODO pass - def register_resource(self, rtype, guid = None): + def register_resource(self, rtype, guid = None, creds = None): # Get next available guid guid = self._guid_generator.next(guid) # Instantiate RM - rm = ResourceFactory.create(rtype, self, guid) + rm = ResourceFactory.create(rtype, self, guid,creds) # Store RM self._resources[guid] = rm diff --git a/src/neco/execution/resource.py b/src/neco/execution/resource.py index 39f96c68..d3a1dc44 100644 --- a/src/neco/execution/resource.py +++ b/src/neco/execution/resource.py @@ -68,10 +68,8 @@ class Resource(object): self._attrs = copy.deepcopy(self._attributes) # Logging - loglevel = "debug" self._logger = logging.getLogger("neco.execution.resource.Resource.%s" % self.guid) - self._logger.setLevel(getattr(logging, loglevel.upper())) @property def guid(self): @@ -85,6 +83,10 @@ class Resource(object): if (self._validate_connection(guid)): self._connections.add(guid) + @property + def connections(self): + return self._connections + def discover(self, filters): pass @@ -130,7 +132,7 @@ class ResourceFactory(object): cls._resource_types[rclass.rtype()] = rclass @classmethod - def create(cls, rtype, ec, guid): - rclass = cls._resource[rtype] - return rclass(ec, guid) + def create(cls, rtype, ec, guid, creds): + rclass = cls._resource_types[rtype] + return rclass(ec, guid, creds) diff --git a/src/neco/resources/omf/omf_api.py b/src/neco/resources/omf/omf_api.py new file mode 100644 index 00000000..aca53e51 --- /dev/null +++ b/src/neco/resources/omf/omf_api.py @@ -0,0 +1,180 @@ +import datetime +import logging +import ssl +import sys +import time + +from neco.resources.omf.omf_client import OMFClient +from neco.resources.omf.omf_messages_5_4 import MessageHandler + +class OMFAPI(object): + def __init__(self, slice, host, port, password, xmpp_root = None): + date = datetime.datetime.now().strftime("%Y-%m-%dt%H.%M.%S") + tz = -time.altzone if time.daylight != 0 else -time.timezone + date += "%+06.2f" % (tz / 3600) # timezone difference is in seconds + self._user = "%s-%s" % (slice, date) + self._slice = slice + self._host = host + self._port = port + self._password = password + self._hostnames = [] + self._xmpp_root = xmpp_root or "OMF_5.4" + + self._logger = logging.getLogger("neco.resources.omf") + + # OMF xmpp client + self._client = None + # message handler + self._message = None + + if sys.version_info < (3, 0): + reload(sys) + sys.setdefaultencoding('utf8') + + # instantiate the xmpp client + self._init_client() + + # register xmpp nodes for the experiment + self._enroll_experiment() + self._enroll_newexperiment() + + # register xmpp logger for the experiment + self._enroll_logger() + + def _init_client(self): + jid = "%s@%s" % (self._user, self._host) + xmpp = OMFClient(jid, self._password) + # PROTOCOL_SSLv3 required for compatibility with OpenFire + xmpp.ssl_version = ssl.PROTOCOL_SSLv3 + + if xmpp.connect((self._host, self._port)): + xmpp.process(threaded=True) + while not xmpp.ready: + time.sleep(1) + self._client = xmpp + self._message = MessageHandler(self._slice, self._user) + else: + msg = "Unable to connect to the XMPP server." + self._logger.error(msg) + raise RuntimeError(msg) + + def _enroll_experiment(self): + xmpp_node = self._exp_session_id + self._client.create(xmpp_node) + #print "Create experiment sesion id topics !!" + self._client.subscribe(xmpp_node) + #print "Subscribe to experiment sesion id topics !!" + + + def _enroll_newexperiment(self): + address = "/%s/%s/%s/%s" % (self._host, self._xmpp_root, self._slice, self._user) + print address + payload = self._message.newexpfunction(self._user, address) + slice_sid = "/%s/%s" % (self._xmpp_root, self._slice) + self._client.publish(payload, slice_sid) + + def _enroll_logger(self): + xmpp_node = self._logger_session_id + self._client.create(xmpp_node) + self._client.subscribe(xmpp_node) + + payload = self._message.logfunction("2", + "nodeHandler::NodeHandler", + "INFO", + "OMF Experiment Controller 5.4 (git 529a626)") + self._client.publish(payload, xmpp_node) + + def _host_session_id(self, hostname): + return "/%s/%s/%s/%s" % (self._xmpp_root, self._slice, self._user, hostname) + + def _host_resource_id(self, hostname): + return "/%s/%s/resources/%s" % (self._xmpp_root, self._slice, hostname) + + @property + def _exp_session_id(self): + return "/%s/%s/%s" % (self._xmpp_root, self._slice, self._user) + + @property + def _logger_session_id(self): + return "/%s/%s/%s/LOGGER" % (self._xmpp_root, self._slice, self._user) + + def delete(self, hostname): + if not hostname in self._hostnames: + return + + self._hostnames.remove(hostname) + + xmpp_node = self._host_session_id(hostname) + self._client.delete(xmpp_node) + + def enroll_host(self, hostname): + if hostname in self._hostnames: + return + + self._hostnames.append(hostname) + + xmpp_node = self._host_session_id(hostname) + self._client.create(xmpp_node) + self._client.subscribe(xmpp_node) + + xmpp_node = self._host_resource_id(hostname) + self._client.subscribe(xmpp_node) + + payload = self._message.enrollfunction("1", "*", "1", hostname) + self._client.publish(payload, xmpp_node) + + def configure(self, hostname, attribute, value): + payload = self._message.configurefunction(hostname, value, attribute) + xmpp_node = self._host_session_id(hostname) + self._client.publish(payload, xmpp_node) + + def execute(self, hostname, app_id, arguments, path, env): + payload = self._message.executefunction(hostname, app_id, arguments, path, env) + xmpp_node = self._host_session_id(hostname) + self._client.publish(payload, xmpp_node) + + def exit(self, hostname, app_id): + payload = self._message.exitfunction(hostname, app_id) + xmpp_node = self._host_session_id(hostname) + self._client.publish(payload, xmpp_node) + + def disconnect(self): + self._client.delete(self._exp_session_id) + self._client.delete(self._logger_session_id) + + for hostname in self._hostnames[:]: + self.delete(hostname) + + time.sleep(1) + self._client.disconnect() + + +class OMFAPIFactory(object): + _Api = dict() + + @classmethod + def get_api(cls, slice, host, port, password): + if slice and host and port and password: + key = cls._hash_api(slice, host, port) + if key in cls._Api: + return cls._Api[key] + else : + return cls.create_api(slice, host, port, password) + return None + + @classmethod + def create_api(cls, slice, host, port, password): + OmfApi = OMFAPI(slice, host, port, password) + key = cls._hash_api(slice, host, port) + cls._Api[key] = OmfApi + return OmfApi + + @classmethod + def _hash_api(cls, slice, host, port): + res = slice + "_" + host + "_" + port + return res + + + + + diff --git a/src/neco/resources/omf/omf_application.py b/src/neco/resources/omf/omf_application.py new file mode 100644 index 00000000..8b6625af --- /dev/null +++ b/src/neco/resources/omf/omf_application.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python +from neco.execution.resource import Resource, clsinit +from neco.execution.attribute import Attribute +from neco.resources.omf.omf_api import OMFAPIFactory + +import neco +import logging + +@clsinit +class OMFApplication(Resource): + _rtype = "OMFApplication" + _authorized_connections = ["OMFNode"] + + @classmethod + def _register_attributes(cls): + appid = Attribute("appid", "Name of the application") + path = Attribute("path", "Path of the application") + args = Attribute("args", "Argument of the application") + env = Attribute("env", "Environnement variable of the application") + xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02") + xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02") + xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02") + xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02") + cls._register_attribute(appid) + cls._register_attribute(path) + cls._register_attribute(args) + cls._register_attribute(env) + cls._register_attribute(xmppSlice) + cls._register_attribute(xmppHost) + cls._register_attribute(xmppPort) + cls._register_attribute(xmppPassword) + + + def __init__(self, ec, guid, creds): + super(OMFApplication, self).__init__(ec, guid) + self.set('xmppSlice', creds['xmppSlice']) + self.set('xmppHost', creds['xmppHost']) + self.set('xmppPort', creds['xmppPort']) + self.set('xmppPassword', creds['xmppPassword']) + + self.set('appid', "") + self.set('path', "") + self.set('args', "") + self.set('env', "") + + self._node = None + + self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) + + self._logger = logging.getLogger("neco.omf.omfApp ") + self._logger.setLevel(neco.LOGLEVEL) + + def _validate_connection(self, guid): + rm = self.ec.resource(guid) + if rm.rtype() not in self._authorized_connections: + self._logger.debug("Connection between %s %s and %s %s refused : An Application can be connected only to a Node" % (self.rtype(), self._guid, rm.rtype(), guid)) + return False + elif len(self.connections) != 0 : + self._logger.debug("Connection between %s %s and %s %s refused : Already Connected" % (self.rtype(), self._guid, rm.rtype(), guid)) + return False + else : + self._logger.debug("Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid)) + return True + + def _get_nodes(self, conn_set): + for elt in conn_set: + rm = self.ec.resource(elt) + if rm.rtype() == "OMFNode": + return rm + return None + + def start(self): + self._logger.debug(" " + self.rtype() + " ( Guid : " + str(self._guid) +") : " + self.get('appid') + " : " + self.get('path') + " : " + self.get('args') + " : " + self.get('env')) + #try: + if self.get('appid') and self.get('path') and self.get('args') and self.get('env'): + rm_node = self._get_nodes(self._connections) + self._omf_api.execute(rm_node.get('hostname'),self.get('appid'), self.get('args'), self.get('path'), self.get('env')) + + def stop(self): + rm_node = self._get_nodes(self._connections) + self._omf_api.exit(rm_node.get('hostname'),self.get('appid')) + + + diff --git a/src/neco/resources/omf/omf_channel.py b/src/neco/resources/omf/omf_channel.py new file mode 100644 index 00000000..bd7f0f11 --- /dev/null +++ b/src/neco/resources/omf/omf_channel.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python +from neco.execution.resource import Resource, clsinit +from neco.execution.attribute import Attribute + +from neco.resources.omf.omf_api import OMFAPIFactory + +import neco +import logging + +@clsinit +class OMFChannel(Resource): + _rtype = "OMFChannel" + _authorized_connections = ["OMFWifiInterface"] + + @classmethod + def _register_attributes(cls): + channel = Attribute("channel", "Name of the application") + xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02") + xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02") + xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02") + xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02") + cls._register_attribute(channel) + cls._register_attribute(xmppSlice) + cls._register_attribute(xmppHost) + cls._register_attribute(xmppPort) + cls._register_attribute(xmppPassword) + + def __init__(self, ec, guid, creds): + super(OMFChannel, self).__init__(ec, guid) + self.set('xmppSlice', creds['xmppSlice']) + self.set('xmppHost', creds['xmppHost']) + self.set('xmppPort', creds['xmppPort']) + self.set('xmppPassword', creds['xmppPassword']) + + self._nodes_guid = list() + + self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) + + self._logger = logging.getLogger("neco.omf.omfChannel") + self._logger.setLevel(neco.LOGLEVEL) + + def _validate_connection(self, guid): + rm = self.ec.resource(guid) + if rm.rtype() in self._authorized_connections: + self._logger.debug("Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid)) + return True + self._logger.debug("Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid)) + return False + + def _get_nodes(self, conn_set): + for elt in conn_set: + rm_iface = self.ec.resource(elt) + for conn in rm_iface._connections: + rm_node = self.ec.resource(conn) + if rm_node.rtype() == "OMFNode": + couple = [rm_node.get('hostname'), rm_iface.get('alias')] + #print couple + self._nodes_guid.append(couple) + return self._nodes_guid + + def discover(self): + pass + + def provision(self, credential): + pass + + def start(self): + if self.get('channel'): + set_nodes = self._get_nodes(self._connections) + #print set_nodes + for couple in set_nodes: + #print "Couple node/alias : " + couple[0] + " , " + couple[1] + attrval = self.get('channel') + attrname = "net/%s/%s" % (couple[1], 'channel') + #print "Send the configure message" + self._omf_api.configure(couple[0], attrname, attrval) + + def xstart(self): + try: + if self.get('channel'): + node = self.tc.elements.get(self._node_guid) + attrval = self.get('channel') + attrname = "net/%s/%s" % (self._alias, 'channel') + self._omf_api.configure('omf.plexus.wlab17', attrname, attrval) + except AttributeError: + # If the attribute is not yet defined, ignore the error + pass + + diff --git a/src/neco/resources/omf/omf_client.py b/src/neco/resources/omf/omf_client.py new file mode 100644 index 00000000..7557a2c6 --- /dev/null +++ b/src/neco/resources/omf/omf_client.py @@ -0,0 +1,202 @@ +import logging +import sleekxmpp +from sleekxmpp.exceptions import IqError, IqTimeout +import traceback +import xml.etree.ElementTree as ET + +import neco + +class OMFClient(sleekxmpp.ClientXMPP): + def __init__(self, jid, password): + sleekxmpp.ClientXMPP.__init__(self, jid, password) + self._ready = False + self._registered = False + self._server = None + + self.register_plugin('xep_0077') # In-band registration + self.register_plugin('xep_0030') + self.register_plugin('xep_0059') + self.register_plugin('xep_0060') # PubSub + + self.add_event_handler("session_start", self.start) + self.add_event_handler("register", self.register) + self.add_event_handler("pubsub_publish", self.handle_omf_message) + + self._logger = logging.getLogger("neco.omf.xmppClient") + self._logger.setLevel(neco.LOGLEVEL) + + @property + def ready(self): + return self._ready + + def start(self, event): + self.send_presence() + self._ready = True + self._server = "pubsub.%s" % self.boundjid.domain + + def register(self, iq): + if self._registered: + self._logger.info("%s already registered!" % self.boundjid) + return + + resp = self.Iq() + resp['type'] = 'set' + resp['register']['username'] = self.boundjid.user + resp['register']['password'] = self.password + + try: + resp.send(now=True) + self._logger.info("Account created for %s!" % self.boundjid) + self._registered = True + except IqError as e: + self._logger.error("Could not register account: %s" % + e.iq['error']['text']) + except IqTimeout: + self._logger.error("No response from server.") + + def unregister(self): + try: + self.plugin['xep_0077'].cancel_registration( + ifrom=self.boundjid.full) + self._logger.info("Account unregistered for %s!" % self.boundjid) + except IqError as e: + self._logger.error("Could not unregister account: %s" % + e.iq['error']['text']) + except IqTimeout: + self._logger.error("No response from server.") + + def nodes(self): + try: + result = self['xep_0060'].get_nodes(self._server) + for item in result['disco_items']['items']: + self._logger.info(' - %s' % str(item)) + return result + except: + error = traceback.format_exc() + self._logger.error('Could not retrieve node list.\ntraceback:\n%s', error) + + def subscriptions(self): + try: + result = self['xep_0060'].get_subscriptions(self._server) + #self.boundjid.full) + for node in result['node']: + self._logger.info(' - %s' % str(node)) + return result + except: + error = traceback.format_exc() + self._logger.error('Could not retrieve subscriptions.\ntraceback:\n%s', error) + + def create(self, node): + self._logger.debug(" Create Topic : " + node) + + config = self['xep_0004'].makeForm('submit') + config.add_field(var='pubsub#node_type', value='leaf') + config.add_field(var='pubsub#notify_retract', value='0') + config.add_field(var='pubsub#publish_model', value='open') + config.add_field(var='pubsub#persist_items', value='1') + config.add_field(var='pubsub#max_items', value='1') + config.add_field(var='pubsub#title', value=node) + + try: + self['xep_0060'].create_node(self._server, node, config = config) + except: + error = traceback.format_exc() + self._logger.error('Could not create topic: %s\ntraceback:\n%s' % (node, error)) + + def delete(self, node): + try: + self['xep_0060'].delete_node(self._server, node) + self._logger.info('Deleted node: %s' % node) + except: + error = traceback.format_exc() + self._logger.error('Could not delete topic: %s\ntraceback:\n%s' % (node, error)) + + def publish(self, data, node): + self._logger.debug(" Publish to Topic :" + node) + try: + result = self['xep_0060'].publish(self._server,node,payload=data) + # id = result['pubsub']['publish']['item']['id'] + # print('Published at item id: %s' % id) + except: + error = traceback.format_exc() + self._logger.error('Could not publish to: %s\ntraceback:\n%s' \ + % (node, error)) + + def get(self, data): + try: + result = self['xep_0060'].get_item(self._server, self.boundjid, + data) + for item in result['pubsub']['items']['substanzas']: + self._logger.info('Retrieved item %s: %s' % (item['id'], + tostring(item['payload']))) + except: + error = traceback.format_exc() + self._logger.error('Could not retrieve item %s from topic %s\ntraceback:\n%s' \ + % (data, self.boundjid, error)) + + def retract(self, data): + try: + result = self['xep_0060'].retract(self._server, self.boundjid, data) + self._logger.info('Retracted item %s from topic %s' % (data, self.boundjid)) + except: + error = traceback.format_exc() + self._logger.error('Could not retract item %s from topic %s\ntraceback:\n%s' \ + % (data, self.boundjid, error)) + + def purge(self): + try: + result = self['xep_0060'].purge(self._server, self.boundjid) + self._logger.info('Purged all items from topic %s' % self.boundjid) + except: + error = traceback.format_exc() + self._logger.error('Could not purge items from topic %s\ntraceback:\n%s' \ + % (self.boundjid, error)) + + def subscribe(self, node): + try: + result = self['xep_0060'].subscribe(self._server, node) + #self._logger.debug('Subscribed %s to node %s' \ + #% (self.boundjid.bare, node)) + self._logger.info(' Subscribed %s to topic %s' \ + % (self.boundjid.user, node)) + except: + error = traceback.format_exc() + self._logger.error(' Could not subscribe %s to topic %s\ntraceback:\n%s' \ + % (self.boundjid.bare, node, error)) + + def unsubscribe(self, node): + try: + result = self['xep_0060'].unsubscribe(self._server, node) + self._logger.info(' Unsubscribed %s from topic %s' % (self.boundjid.bare, node)) + except: + error = traceback.format_exc() + self._logger.error(' Could not unsubscribe %s from topic %s\ntraceback:\n%s' \ + % (self.boundjid.bare, node, error)) + + def _check_for_tag(self, treeroot, namespaces, tag): + for element in treeroot.iter(namespaces+tag): + if element.text: + return element + else : + return None + + def _check_output(self, treeroot, namespaces): + output_param = ["TARGET", "REASON", "PATH", "APPID", "VALUE"] + response = "" + for elt in output_param: + msg = self._check_for_tag(treeroot, namespaces, elt) + if msg is not None: + response = response + " " + msg.text + " :" + deb = self._check_for_tag(treeroot, namespaces, "MESSAGE") + if deb is not None: + self._logger.debug(response + " " + deb.text) + else : + self._logger.info(response) + + def handle_omf_message(self, iq): + namespaces = "{http://jabber.org/protocol/pubsub}" + for i in iq['pubsub_event']['items']: + root = ET.fromstring(str(i)) + self._check_output(root, namespaces) + + diff --git a/src/neco/resources/omf/omf_interface.py b/src/neco/resources/omf/omf_interface.py new file mode 100644 index 00000000..07d64460 --- /dev/null +++ b/src/neco/resources/omf/omf_interface.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python +from neco.execution.resource import Resource, clsinit +from neco.execution.attribute import Attribute + +from neco.resources.omf.omf_api import OMFAPIFactory + +import neco +import logging + +@clsinit +class OMFWifiInterface(Resource): + _rtype = "OMFWifiInterface" + _authorized_connections = ["OMFNode" , "OMFChannel"] + + #alias2name = dict({'w0':'wlan0', 'w1':'wlan1'}) + + @classmethod + def _register_attributes(cls): + alias = Attribute("alias","Alias of the interface", default_value = "w0") + mode = Attribute("mode","Mode of the interface") + type = Attribute("type","Type of the interface") + essid = Attribute("essid","Essid of the interface") + ip = Attribute("ip","IP of the interface") + xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02") + xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02") + xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02") + xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02") + cls._register_attribute(alias) + cls._register_attribute(xmppSlice) + cls._register_attribute(xmppHost) + cls._register_attribute(xmppPort) + cls._register_attribute(xmppPassword) + cls._register_attribute(mode) + cls._register_attribute(type) + cls._register_attribute(essid) + cls._register_attribute(ip) + + def __init__(self, ec, guid, creds): + super(OMFWifiInterface, self).__init__(ec, guid) + self.set('xmppSlice', creds['xmppSlice']) + self.set('xmppHost', creds['xmppHost']) + self.set('xmppPort', creds['xmppPort']) + self.set('xmppPassword', creds['xmppPassword']) + + self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) + self._alias = self.get('alias') + + self._logger = logging.getLogger("neco.omf.omfIface ") + self._logger.setLevel(neco.LOGLEVEL) + + def _validate_connection(self, guid): + rm = self.ec.resource(guid) + if rm.rtype() in self._authorized_connections: + self._logger.debug("Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid)) + return True + self._logger.debug("Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid)) + return False + + def _get_nodes(self, conn_set): + for elt in conn_set: + rm = self.ec.resource(elt) + if rm.rtype() == "OMFNode": + return rm + return None + + + def start(self): + self._logger.debug(self.rtype() + " ( Guid : " + str(self._guid) +") : " + self.get('mode') + " : " + self.get('type') + " : " + self.get('essid') + " : " + self.get('ip')) + #try: + if self.get('mode') and self.get('type') and self.get('essid') and self.get('ip'): + rm_node = self._get_nodes(self._connections) + for attrname in ["mode", "type", "essid", "ip"]: + attrval = self.get(attrname) + attrname = "net/%s/%s" % (self._alias, attrname) + #print "Send the configure message" + self._omf_api.configure(rm_node.get('hostname'), attrname, attrval) + + def stop(self): + self._omf_api.disconnect() + + + diff --git a/src/neco/resources/omf/omf_messages_5_4.py b/src/neco/resources/omf/omf_messages_5_4.py new file mode 100644 index 00000000..77c53dc0 --- /dev/null +++ b/src/neco/resources/omf/omf_messages_5_4.py @@ -0,0 +1,125 @@ +from xml.etree import cElementTree as ET + +EXECUTE = "EXECUTE" +KILL = "KILL" +STDIN = "STDIN" +NOOP = "NOOP" +PM_INSTALL = "PM_INSTALL" +APT_INSTALL = "APT_INSTALL" +RPM_INSTALL = "RPM_INSTALL" +RESET = "RESET" +REBOOT = "REBOOT" +MODPROBE = "MODPROBE" +CONFIGURE = "CONFIGURE" +LOAD_IMAGE = "LOAD_IMAGE" +SAVE_IMAGE = "SAVE_IMAGE" +LOAD_DATA = "LOAD_DATA" +SET_LINK = "SET_LINK" +ALIAS = "ALIAS" +SET_DISCONNECTION = "SET_DISCONNECTION" +RESTART = "RESTART" +ENROLL = "ENROLL" +EXIT = "EXIT" + +class MessageHandler(): + SliceID = "" + ExpID = "" + + def __init__(self, sliceid, expid ): + self.SliceID = sliceid + self.ExpID = expid + print "init" + self.ExpID +" "+ self.SliceID + pass + + def Mid(self, parent, keyword): + mid = ET.SubElement(parent, keyword) + mid.set("id", "\'omf-payload\'") + return mid + + def Mtext(self, parent, keyword, text): + mtext = ET.SubElement(parent, keyword) + mtext.text = text + return mtext + + def executefunction(self, target, appid, cmdlineargs, path, env): + payload = ET.Element("omf-message") + execute = self.Mid(payload,"EXECUTE") + env = self.Mtext(execute, "ENV", env) + sliceid = self.Mtext(execute,"SLICEID",self.SliceID) + expid = self.Mtext(execute,"EXPID",self.ExpID) + target = self.Mtext(execute,"TARGET",target) + appid = self.Mtext(execute,"APPID",appid) + cmdlineargs = self.Mtext(execute,"CMDLINEARGS",cmdlineargs) + path = self.Mtext(execute,"PATH",path) + return payload + + def exitfunction(self, target, appid): + payload = ET.Element("omf-message") + execute = self.Mid(payload,"EXIT") + sliceid = self.Mtext(execute,"SLICEID",self.SliceID) + expid = self.Mtext(execute,"EXPID",self.ExpID) + target = self.Mtext(execute,"TARGET",target) + appid = self.Mtext(execute,"APPID",appid) + return payload + + def configurefunction(self, target, value, path): + payload = ET.Element("omf-message") + config = self.Mid(payload, "CONFIGURE") + sliceid = self.Mtext(config,"SLICEID",self.SliceID) + expid = self.Mtext(config,"EXPID",self.ExpID) + target = self.Mtext(config,"TARGET",target) + value = self.Mtext(config,"VALUE",value) + path = self.Mtext(config,"PATH",path) + return payload + + def logfunction(self,level, logger, level_name, data): + payload = ET.Element("omf-message") + log = self.Mid(payload, "LOGGING") + level = self.Mtext(log,"LEVEL",level) + sliceid = self.Mtext(log,"SLICEID",self.SliceID) + logger = self.Mtext(log,"LOGGER",logger) + expid = self.Mtext(log,"EXPID",self.ExpID) + level_name = self.Mtext(log,"LEVEL_NAME",level_name) + data = self.Mtext(log,"DATA",data) + return payload + + def aliasfunction(self, name, target): + payload = ET.Element("omf-message") + alias = self.Mid(payload,"ALIAS") + sliceid = self.Mtext(alias,"SLICEID",self.SliceID) + expid = self.Mtext(alias,"EXPID",self.ExpID) + name = self.Mtext(alias,"NAME",name) + target = self.Mtext(alias,"TARGET",target) + return payload + + def enrollfunction(self, enrollkey, image, index, target ): + payload = ET.Element("omf-message") + enroll = self.Mid(payload,"ENROLL") + enrollkey = self.Mtext(enroll,"ENROLLKEY",enrollkey) + sliceid = self.Mtext(enroll,"SLICEID",self.SliceID) + image = self.Mtext(enroll,"IMAGE",image) + expid = self.Mtext(enroll,"EXPID",self.ExpID) + index = self.Mtext(enroll,"INDEX",index) + target = self.Mtext(enroll,"TARGET",target) + return payload + + def noopfunction(self,target): + payload = ET.Element("omf-message") + noop = self.Mid(payload,"NOOP") + sliceid = self.Mtext(noop,"SLICEID",self.SliceID) + expid = self.Mtext(noop,"EXPID",self.ExpID) + target = self.Mtext(noop,"TARGET",target) + return payload + + def newexpfunction(self, experimentid, address): + payload = ET.Element("omf-message") + newexp = self.Mid(payload,"EXPERIMENT_NEW") + experimentid = self.Mtext(newexp,"EXPERIMENT_ID",experimentid) + sliceid = self.Mtext(newexp,"SLICEID",self.SliceID) + expid = self.Mtext(newexp,"EXPID",self.ExpID) + address = self.Mtext(newexp,"ADDRESS",address) + return payload + + def handle_message(self, msg): + # Do something!!! + return msg diff --git a/src/neco/resources/omf/omf_node.py b/src/neco/resources/omf/omf_node.py new file mode 100644 index 00000000..1a98b939 --- /dev/null +++ b/src/neco/resources/omf/omf_node.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python +from neco.execution.resource import Resource, clsinit +from neco.execution.attribute import Attribute + +from neco.resources.omf.omf_api import OMFAPIFactory + +import neco +import logging + +@clsinit +class OMFNode(Resource): + _rtype = "OMFNode" + _authorized_connections = ["OMFApplication" , "OMFWifiInterface"] + + @classmethod + def _register_attributes(cls): + hostname = Attribute("hostname", "Hostname of the machine") + cpu = Attribute("cpu", "CPU of the node") + ram = Attribute("ram", "RAM of the node") + xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02") + xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02") + xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02") + xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02") + cls._register_attribute(hostname) + cls._register_attribute(ram) + cls._register_attribute(cpu) + cls._register_attribute(xmppSlice) + cls._register_attribute(xmppHost) + cls._register_attribute(xmppPort) + cls._register_attribute(xmppPassword) + + @classmethod + def _register_filters(cls): + hostname = Attribute("hostname", "Hostname of the machine") + gateway = Attribute("gateway", "Gateway") + granularity = Attribute("granularity", "Granularity of the reservation time") + hardware_type = Attribute("hardware_type", "Hardware type of the machine") + cls._register_filter(hostname) + cls._register_filter(gateway) + cls._register_filter(granularity) + cls._register_filter(hardware_type) + + def __init__(self, ec, guid, creds): + super(OMFNode, self).__init__(ec, guid) + self.set('xmppSlice', creds['xmppSlice']) + self.set('xmppHost', creds['xmppHost']) + self.set('xmppPort', creds['xmppPort']) + self.set('xmppPassword', creds['xmppPassword']) + + self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) + + self._logger = logging.getLogger("neco.omf.omfNode ") + self._logger.setLevel(neco.LOGLEVEL) + + def _validate_connection(self, guid): + rm = self.ec.resource(guid) + if rm.rtype() in self._authorized_connections: + self._logger.debug("Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid)) + return True + self._logger.debug("Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid)) + return False + + def discover(self): + pass + + def provision(self, credential): + pass + + def start(self): + self._omf_api.enroll_host(self.get('hostname')) + + def stop(self): + self._omf_api.disconnect() + + def configure(self): + #routes = self.tc._add_route.get(self.guid, []) + #iface_guids = self.tc.get_connected(self.guid, "devs", "node") + + for route in routes: + (destination, netprefix, nexthop, metric, device) = route + netmask = ipaddr2.ipv4_mask2dot(netprefix) + + # Validate that the interface is associated to the node + for iface_guid in iface_guids: + iface = self.tc.elements.get(iface_guid) + if iface.devname == device: + self._omf_api.execute(self.get('hostname'), + "Id#%s" % str(random.getrandbits(128)), + "add -net %s netmask %s dev %s" % (destination, netmask, iface.devname), + "/sbin/route", # path + None, # env + ) + break diff --git a/src/neco/resources/omf/xx_omf_resource.py b/src/neco/resources/omf/xx_omf_resource.py new file mode 100644 index 00000000..784513e2 --- /dev/null +++ b/src/neco/resources/omf/xx_omf_resource.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python +from neco.execution.resource import Resource, clsinit +from neco.execution.attribute import Attribute + +from neco.resources.omf.omf_api import OMFAPIFactory + +@clsinit +class OMFResource(Resource): + _rtype = "OMFResource" + + @classmethod + def _register_attributes(cls): + xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02") + xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02") + xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02") + xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02") + cls._register_attribute(xmppSlice) + cls._register_attribute(xmppHost) + cls._register_attribute(xmppPort) + cls._register_attribute(xmppPassword) + + def __init__(self, ec, guid, creds): + super(OMFNode, self).__init__(ec, guid) + self.set('xmppSlice', creds['xmppSlice']) + self.set('xmppHost', creds['xmppHost']) + self.set('xmppPort', creds['xmppPort']) + self.set('xmppPassword', creds['xmppPassword']) + + self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword')) + + def discover(self): + pass + + def provision(self, credential): + pass + + diff --git a/test/execution/resource.py b/test/execution/resource.py old mode 100644 new mode 100755 index cbe2f951..36165546 --- a/test/execution/resource.py +++ b/test/execution/resource.py @@ -41,6 +41,9 @@ class ResourceTestCase(unittest.TestCase): self.assertEquals(AnotherResource.rtype(), "AnotherResource") self.assertEquals(len(AnotherResource._attributes), 0) + #self.assertEquals(OmfNode.rtype(), "OmfNode") + #self.assertEquals(len(OmfNode._attributes), 0) + self.assertEquals(len(ResourceFactory.resource_types()), 2) if __name__ == '__main__': diff --git a/test/resources/omf/omf_vlc_exp.py b/test/resources/omf/omf_vlc_exp.py new file mode 100755 index 00000000..799046fc --- /dev/null +++ b/test/resources/omf/omf_vlc_exp.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python +from neco.execution.resource import Resource, ResourceFactory +from neco.execution.ec import ExperimentController + +from neco.resources.omf.omf_node import OMFNode +from neco.resources.omf.omf_application import OMFApplication +from neco.resources.omf.omf_interface import OMFWifiInterface +from neco.resources.omf.omf_channel import OMFChannel +from neco.resources.omf.omf_api import OMFAPIFactory + +from neco.util import guid + +import time +import unittest +import logging + +logging.basicConfig() + + +class DummyEC(ExperimentController): + pass + +class OMFVLCTestCase(unittest.TestCase): + + def setUp(self): + #self.guid_generator = guid.GuidGenerator() + self._creds = {'xmppSlice' : 'nepi' , 'xmppHost' : 'xmpp-plexus.onelab.eu' , 'xmppPort' : '5222', 'xmppPassword' : '1234' } + + def tearDown(self): + pass + + def test_creation_phase(self): + ec = DummyEC() + + ResourceFactory.register_type(OMFNode) + ResourceFactory.register_type(OMFWifiInterface) + ResourceFactory.register_type(OMFChannel) + ResourceFactory.register_type(OMFApplication) + + self.assertEquals(OMFNode.rtype(), "OMFNode") + self.assertEquals(len(OMFNode._attributes), 7) + + self.assertEquals(OMFWifiInterface.rtype(), "OMFWifiInterface") + self.assertEquals(len(OMFWifiInterface._attributes), 9) + + self.assertEquals(OMFChannel.rtype(), "OMFChannel") + self.assertEquals(len(OMFChannel._attributes), 5) + + self.assertEquals(OMFApplication.rtype(), "OMFApplication") + self.assertEquals(len(OMFApplication._attributes), 8) + + self.assertEquals(len(ResourceFactory.resource_types()), 4) + + #def xtest_creation_and_configuration_node(self): + guid = ec.register_resource("OMFNode", creds = self._creds) + node1 = ec._resources[guid] + node1.set('hostname', 'omf.plexus.wlab17') + + guid = ec.register_resource("OMFNode", creds = self._creds) + node2 = ec._resources[guid] + node2.set('hostname', "omf.plexus.wlab37") + + #def xtest_creation_and_configuration_interface(self): + guid = ec.register_resource("OMFWifiInterface", creds = self._creds) + iface1 = ec._resources[guid] + iface1.set('alias', "w0") + iface1.set('mode', "adhoc") + iface1.set('type', "g") + iface1.set('essid', "helloworld") + iface1.set('ip', "10.0.0.17") + + guid = ec.register_resource("OMFWifiInterface", creds = self._creds) + iface2 = ec._resources[guid] + iface2.set('alias', "w0") + iface2.set('mode', "adhoc") + iface2.set('type', 'g') + iface2.set('essid', "helloworld") + iface2.set('ip', "10.0.0.37") + + #def xtest_creation_and_configuration_channel(self): + guid = ec.register_resource("OMFChannel", creds = self._creds) + channel = ec._resources[guid] + channel.set('channel', "6") + + #def xtest_creation_and_configuration_application(self): + guid = ec.register_resource("OMFApplication", creds = self._creds) + app1 = ec._resources[guid] + app1.set('appid', 'Vlc#1') + app1.set('path', "/opt/vlc-1.1.13/cvlc") + app1.set('args', "/opt/10-by-p0d.avi --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts}'") + app1.set('env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority") + + guid = ec.register_resource("OMFApplication", creds = self._creds) + app2 = ec._resources[guid] + app2.set('appid', 'Vlc#2') + app2.set('path', "/opt/vlc-1.1.13/cvlc") + app2.set('args', "rtp://10.0.0.37:1234") + app2.set('env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority") + self.assertEquals(len(OMFAPIFactory._Api), 1) + + #def test_connection(self): + app1.connect(node1._guid) + node1.connect(app1._guid) + + node1.connect(iface1._guid) + iface1.connect(node1._guid) + + iface1.connect(channel._guid) + channel.connect(iface1._guid) + + channel.connect(iface2._guid) + iface2.connect(channel._guid) + + iface2.connect(node2._guid) + node2.connect(iface2._guid) + + node2.connect(app2._guid) + app2.connect(node2._guid) + + #def test_start_node(self): + node1.start() + node2.start() + time.sleep(1) + #pass + + #def test_start_interface(self): + iface1.start() + iface2.start() + + #def test_start_channel(self): + channel.start() + time.sleep(1) + + #def test_start_application(self): + app1.start() + time.sleep(2) + app2.start() + + time.sleep(10) + + #def test_stop_application(self): + app1.stop() + app2.stop() + time.sleep(2) + + + #def test_stop_nodes(self): + node1.stop() + #node2.stop() + + +if __name__ == '__main__': + unittest.main() + -- 2.47.0