import logging
logging.basicConfig()
+
+LOGLEVEL = logging.DEBUG
+
# TODO
pass
- def register_resource(self, rtype, guid = None):
+ def register_resource(self, rtype, guid = None, creds = None):
# Get next available guid
guid = self._guid_generator.next(guid)
# Instantiate RM
- rm = ResourceFactory.create(rtype, self, guid)
+ rm = ResourceFactory.create(rtype, self, guid,creds)
# Store RM
self._resources[guid] = rm
self._attrs = copy.deepcopy(self._attributes)
# Logging
- loglevel = "debug"
self._logger = logging.getLogger("neco.execution.resource.Resource.%s" %
self.guid)
- self._logger.setLevel(getattr(logging, loglevel.upper()))
@property
def guid(self):
if (self._validate_connection(guid)):
self._connections.add(guid)
+ @property
+ def connections(self):
+ return self._connections
+
def discover(self, filters):
pass
cls._resource_types[rclass.rtype()] = rclass
@classmethod
- def create(cls, rtype, ec, guid):
- rclass = cls._resource[rtype]
- return rclass(ec, guid)
+ def create(cls, rtype, ec, guid, creds):
+ rclass = cls._resource_types[rtype]
+ return rclass(ec, guid, creds)
--- /dev/null
+import datetime
+import logging
+import ssl
+import sys
+import time
+
+from neco.resources.omf.omf_client import OMFClient
+from neco.resources.omf.omf_messages_5_4 import MessageHandler
+
+class OMFAPI(object):
+ def __init__(self, slice, host, port, password, xmpp_root = None):
+ date = datetime.datetime.now().strftime("%Y-%m-%dt%H.%M.%S")
+ tz = -time.altzone if time.daylight != 0 else -time.timezone
+ date += "%+06.2f" % (tz / 3600) # timezone difference is in seconds
+ self._user = "%s-%s" % (slice, date)
+ self._slice = slice
+ self._host = host
+ self._port = port
+ self._password = password
+ self._hostnames = []
+ self._xmpp_root = xmpp_root or "OMF_5.4"
+
+ self._logger = logging.getLogger("neco.resources.omf")
+
+ # OMF xmpp client
+ self._client = None
+ # message handler
+ self._message = None
+
+ if sys.version_info < (3, 0):
+ reload(sys)
+ sys.setdefaultencoding('utf8')
+
+ # instantiate the xmpp client
+ self._init_client()
+
+ # register xmpp nodes for the experiment
+ self._enroll_experiment()
+ self._enroll_newexperiment()
+
+ # register xmpp logger for the experiment
+ self._enroll_logger()
+
+ def _init_client(self):
+ jid = "%s@%s" % (self._user, self._host)
+ xmpp = OMFClient(jid, self._password)
+ # PROTOCOL_SSLv3 required for compatibility with OpenFire
+ xmpp.ssl_version = ssl.PROTOCOL_SSLv3
+
+ if xmpp.connect((self._host, self._port)):
+ xmpp.process(threaded=True)
+ while not xmpp.ready:
+ time.sleep(1)
+ self._client = xmpp
+ self._message = MessageHandler(self._slice, self._user)
+ else:
+ msg = "Unable to connect to the XMPP server."
+ self._logger.error(msg)
+ raise RuntimeError(msg)
+
+ def _enroll_experiment(self):
+ xmpp_node = self._exp_session_id
+ self._client.create(xmpp_node)
+ #print "Create experiment sesion id topics !!"
+ self._client.subscribe(xmpp_node)
+ #print "Subscribe to experiment sesion id topics !!"
+
+
+ def _enroll_newexperiment(self):
+ address = "/%s/%s/%s/%s" % (self._host, self._xmpp_root, self._slice, self._user)
+ print address
+ payload = self._message.newexpfunction(self._user, address)
+ slice_sid = "/%s/%s" % (self._xmpp_root, self._slice)
+ self._client.publish(payload, slice_sid)
+
+ def _enroll_logger(self):
+ xmpp_node = self._logger_session_id
+ self._client.create(xmpp_node)
+ self._client.subscribe(xmpp_node)
+
+ payload = self._message.logfunction("2",
+ "nodeHandler::NodeHandler",
+ "INFO",
+ "OMF Experiment Controller 5.4 (git 529a626)")
+ self._client.publish(payload, xmpp_node)
+
+ def _host_session_id(self, hostname):
+ return "/%s/%s/%s/%s" % (self._xmpp_root, self._slice, self._user, hostname)
+
+ def _host_resource_id(self, hostname):
+ return "/%s/%s/resources/%s" % (self._xmpp_root, self._slice, hostname)
+
+ @property
+ def _exp_session_id(self):
+ return "/%s/%s/%s" % (self._xmpp_root, self._slice, self._user)
+
+ @property
+ def _logger_session_id(self):
+ return "/%s/%s/%s/LOGGER" % (self._xmpp_root, self._slice, self._user)
+
+ def delete(self, hostname):
+ if not hostname in self._hostnames:
+ return
+
+ self._hostnames.remove(hostname)
+
+ xmpp_node = self._host_session_id(hostname)
+ self._client.delete(xmpp_node)
+
+ def enroll_host(self, hostname):
+ if hostname in self._hostnames:
+ return
+
+ self._hostnames.append(hostname)
+
+ xmpp_node = self._host_session_id(hostname)
+ self._client.create(xmpp_node)
+ self._client.subscribe(xmpp_node)
+
+ xmpp_node = self._host_resource_id(hostname)
+ self._client.subscribe(xmpp_node)
+
+ payload = self._message.enrollfunction("1", "*", "1", hostname)
+ self._client.publish(payload, xmpp_node)
+
+ def configure(self, hostname, attribute, value):
+ payload = self._message.configurefunction(hostname, value, attribute)
+ xmpp_node = self._host_session_id(hostname)
+ self._client.publish(payload, xmpp_node)
+
+ def execute(self, hostname, app_id, arguments, path, env):
+ payload = self._message.executefunction(hostname, app_id, arguments, path, env)
+ xmpp_node = self._host_session_id(hostname)
+ self._client.publish(payload, xmpp_node)
+
+ def exit(self, hostname, app_id):
+ payload = self._message.exitfunction(hostname, app_id)
+ xmpp_node = self._host_session_id(hostname)
+ self._client.publish(payload, xmpp_node)
+
+ def disconnect(self):
+ self._client.delete(self._exp_session_id)
+ self._client.delete(self._logger_session_id)
+
+ for hostname in self._hostnames[:]:
+ self.delete(hostname)
+
+ time.sleep(1)
+ self._client.disconnect()
+
+
+class OMFAPIFactory(object):
+ _Api = dict()
+
+ @classmethod
+ def get_api(cls, slice, host, port, password):
+ if slice and host and port and password:
+ key = cls._hash_api(slice, host, port)
+ if key in cls._Api:
+ return cls._Api[key]
+ else :
+ return cls.create_api(slice, host, port, password)
+ return None
+
+ @classmethod
+ def create_api(cls, slice, host, port, password):
+ OmfApi = OMFAPI(slice, host, port, password)
+ key = cls._hash_api(slice, host, port)
+ cls._Api[key] = OmfApi
+ return OmfApi
+
+ @classmethod
+ def _hash_api(cls, slice, host, port):
+ res = slice + "_" + host + "_" + port
+ return res
+
+
+
+
+
--- /dev/null
+#!/usr/bin/env python
+from neco.execution.resource import Resource, clsinit
+from neco.execution.attribute import Attribute
+from neco.resources.omf.omf_api import OMFAPIFactory
+
+import neco
+import logging
+
+@clsinit
+class OMFApplication(Resource):
+ _rtype = "OMFApplication"
+ _authorized_connections = ["OMFNode"]
+
+ @classmethod
+ def _register_attributes(cls):
+ appid = Attribute("appid", "Name of the application")
+ path = Attribute("path", "Path of the application")
+ args = Attribute("args", "Argument of the application")
+ env = Attribute("env", "Environnement variable of the application")
+ xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02")
+ xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02")
+ xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02")
+ xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02")
+ cls._register_attribute(appid)
+ cls._register_attribute(path)
+ cls._register_attribute(args)
+ cls._register_attribute(env)
+ cls._register_attribute(xmppSlice)
+ cls._register_attribute(xmppHost)
+ cls._register_attribute(xmppPort)
+ cls._register_attribute(xmppPassword)
+
+
+ def __init__(self, ec, guid, creds):
+ super(OMFApplication, self).__init__(ec, guid)
+ self.set('xmppSlice', creds['xmppSlice'])
+ self.set('xmppHost', creds['xmppHost'])
+ self.set('xmppPort', creds['xmppPort'])
+ self.set('xmppPassword', creds['xmppPassword'])
+
+ self.set('appid', "")
+ self.set('path', "")
+ self.set('args', "")
+ self.set('env', "")
+
+ self._node = None
+
+ self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+
+ self._logger = logging.getLogger("neco.omf.omfApp ")
+ self._logger.setLevel(neco.LOGLEVEL)
+
+ def _validate_connection(self, guid):
+ rm = self.ec.resource(guid)
+ if rm.rtype() not in self._authorized_connections:
+ self._logger.debug("Connection between %s %s and %s %s refused : An Application can be connected only to a Node" % (self.rtype(), self._guid, rm.rtype(), guid))
+ return False
+ elif len(self.connections) != 0 :
+ self._logger.debug("Connection between %s %s and %s %s refused : Already Connected" % (self.rtype(), self._guid, rm.rtype(), guid))
+ return False
+ else :
+ self._logger.debug("Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid))
+ return True
+
+ def _get_nodes(self, conn_set):
+ for elt in conn_set:
+ rm = self.ec.resource(elt)
+ if rm.rtype() == "OMFNode":
+ return rm
+ return None
+
+ def start(self):
+ self._logger.debug(" " + self.rtype() + " ( Guid : " + str(self._guid) +") : " + self.get('appid') + " : " + self.get('path') + " : " + self.get('args') + " : " + self.get('env'))
+ #try:
+ if self.get('appid') and self.get('path') and self.get('args') and self.get('env'):
+ rm_node = self._get_nodes(self._connections)
+ self._omf_api.execute(rm_node.get('hostname'),self.get('appid'), self.get('args'), self.get('path'), self.get('env'))
+
+ def stop(self):
+ rm_node = self._get_nodes(self._connections)
+ self._omf_api.exit(rm_node.get('hostname'),self.get('appid'))
+
+
+
--- /dev/null
+#!/usr/bin/env python
+from neco.execution.resource import Resource, clsinit
+from neco.execution.attribute import Attribute
+
+from neco.resources.omf.omf_api import OMFAPIFactory
+
+import neco
+import logging
+
+@clsinit
+class OMFChannel(Resource):
+ _rtype = "OMFChannel"
+ _authorized_connections = ["OMFWifiInterface"]
+
+ @classmethod
+ def _register_attributes(cls):
+ channel = Attribute("channel", "Name of the application")
+ xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02")
+ xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02")
+ xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02")
+ xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02")
+ cls._register_attribute(channel)
+ cls._register_attribute(xmppSlice)
+ cls._register_attribute(xmppHost)
+ cls._register_attribute(xmppPort)
+ cls._register_attribute(xmppPassword)
+
+ def __init__(self, ec, guid, creds):
+ super(OMFChannel, self).__init__(ec, guid)
+ self.set('xmppSlice', creds['xmppSlice'])
+ self.set('xmppHost', creds['xmppHost'])
+ self.set('xmppPort', creds['xmppPort'])
+ self.set('xmppPassword', creds['xmppPassword'])
+
+ self._nodes_guid = list()
+
+ self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+
+ self._logger = logging.getLogger("neco.omf.omfChannel")
+ self._logger.setLevel(neco.LOGLEVEL)
+
+ def _validate_connection(self, guid):
+ rm = self.ec.resource(guid)
+ if rm.rtype() in self._authorized_connections:
+ self._logger.debug("Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid))
+ return True
+ self._logger.debug("Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid))
+ return False
+
+ def _get_nodes(self, conn_set):
+ for elt in conn_set:
+ rm_iface = self.ec.resource(elt)
+ for conn in rm_iface._connections:
+ rm_node = self.ec.resource(conn)
+ if rm_node.rtype() == "OMFNode":
+ couple = [rm_node.get('hostname'), rm_iface.get('alias')]
+ #print couple
+ self._nodes_guid.append(couple)
+ return self._nodes_guid
+
+ def discover(self):
+ pass
+
+ def provision(self, credential):
+ pass
+
+ def start(self):
+ if self.get('channel'):
+ set_nodes = self._get_nodes(self._connections)
+ #print set_nodes
+ for couple in set_nodes:
+ #print "Couple node/alias : " + couple[0] + " , " + couple[1]
+ attrval = self.get('channel')
+ attrname = "net/%s/%s" % (couple[1], 'channel')
+ #print "Send the configure message"
+ self._omf_api.configure(couple[0], attrname, attrval)
+
+ def xstart(self):
+ try:
+ if self.get('channel'):
+ node = self.tc.elements.get(self._node_guid)
+ attrval = self.get('channel')
+ attrname = "net/%s/%s" % (self._alias, 'channel')
+ self._omf_api.configure('omf.plexus.wlab17', attrname, attrval)
+ except AttributeError:
+ # If the attribute is not yet defined, ignore the error
+ pass
+
+
--- /dev/null
+import logging
+import sleekxmpp
+from sleekxmpp.exceptions import IqError, IqTimeout
+import traceback
+import xml.etree.ElementTree as ET
+
+import neco
+
+class OMFClient(sleekxmpp.ClientXMPP):
+ def __init__(self, jid, password):
+ sleekxmpp.ClientXMPP.__init__(self, jid, password)
+ self._ready = False
+ self._registered = False
+ self._server = None
+
+ self.register_plugin('xep_0077') # In-band registration
+ self.register_plugin('xep_0030')
+ self.register_plugin('xep_0059')
+ self.register_plugin('xep_0060') # PubSub
+
+ self.add_event_handler("session_start", self.start)
+ self.add_event_handler("register", self.register)
+ self.add_event_handler("pubsub_publish", self.handle_omf_message)
+
+ self._logger = logging.getLogger("neco.omf.xmppClient")
+ self._logger.setLevel(neco.LOGLEVEL)
+
+ @property
+ def ready(self):
+ return self._ready
+
+ def start(self, event):
+ self.send_presence()
+ self._ready = True
+ self._server = "pubsub.%s" % self.boundjid.domain
+
+ def register(self, iq):
+ if self._registered:
+ self._logger.info("%s already registered!" % self.boundjid)
+ return
+
+ resp = self.Iq()
+ resp['type'] = 'set'
+ resp['register']['username'] = self.boundjid.user
+ resp['register']['password'] = self.password
+
+ try:
+ resp.send(now=True)
+ self._logger.info("Account created for %s!" % self.boundjid)
+ self._registered = True
+ except IqError as e:
+ self._logger.error("Could not register account: %s" %
+ e.iq['error']['text'])
+ except IqTimeout:
+ self._logger.error("No response from server.")
+
+ def unregister(self):
+ try:
+ self.plugin['xep_0077'].cancel_registration(
+ ifrom=self.boundjid.full)
+ self._logger.info("Account unregistered for %s!" % self.boundjid)
+ except IqError as e:
+ self._logger.error("Could not unregister account: %s" %
+ e.iq['error']['text'])
+ except IqTimeout:
+ self._logger.error("No response from server.")
+
+ def nodes(self):
+ try:
+ result = self['xep_0060'].get_nodes(self._server)
+ for item in result['disco_items']['items']:
+ self._logger.info(' - %s' % str(item))
+ return result
+ except:
+ error = traceback.format_exc()
+ self._logger.error('Could not retrieve node list.\ntraceback:\n%s', error)
+
+ def subscriptions(self):
+ try:
+ result = self['xep_0060'].get_subscriptions(self._server)
+ #self.boundjid.full)
+ for node in result['node']:
+ self._logger.info(' - %s' % str(node))
+ return result
+ except:
+ error = traceback.format_exc()
+ self._logger.error('Could not retrieve subscriptions.\ntraceback:\n%s', error)
+
+ def create(self, node):
+ self._logger.debug(" Create Topic : " + node)
+
+ config = self['xep_0004'].makeForm('submit')
+ config.add_field(var='pubsub#node_type', value='leaf')
+ config.add_field(var='pubsub#notify_retract', value='0')
+ config.add_field(var='pubsub#publish_model', value='open')
+ config.add_field(var='pubsub#persist_items', value='1')
+ config.add_field(var='pubsub#max_items', value='1')
+ config.add_field(var='pubsub#title', value=node)
+
+ try:
+ self['xep_0060'].create_node(self._server, node, config = config)
+ except:
+ error = traceback.format_exc()
+ self._logger.error('Could not create topic: %s\ntraceback:\n%s' % (node, error))
+
+ def delete(self, node):
+ try:
+ self['xep_0060'].delete_node(self._server, node)
+ self._logger.info('Deleted node: %s' % node)
+ except:
+ error = traceback.format_exc()
+ self._logger.error('Could not delete topic: %s\ntraceback:\n%s' % (node, error))
+
+ def publish(self, data, node):
+ self._logger.debug(" Publish to Topic :" + node)
+ try:
+ result = self['xep_0060'].publish(self._server,node,payload=data)
+ # id = result['pubsub']['publish']['item']['id']
+ # print('Published at item id: %s' % id)
+ except:
+ error = traceback.format_exc()
+ self._logger.error('Could not publish to: %s\ntraceback:\n%s' \
+ % (node, error))
+
+ def get(self, data):
+ try:
+ result = self['xep_0060'].get_item(self._server, self.boundjid,
+ data)
+ for item in result['pubsub']['items']['substanzas']:
+ self._logger.info('Retrieved item %s: %s' % (item['id'],
+ tostring(item['payload'])))
+ except:
+ error = traceback.format_exc()
+ self._logger.error('Could not retrieve item %s from topic %s\ntraceback:\n%s' \
+ % (data, self.boundjid, error))
+
+ def retract(self, data):
+ try:
+ result = self['xep_0060'].retract(self._server, self.boundjid, data)
+ self._logger.info('Retracted item %s from topic %s' % (data, self.boundjid))
+ except:
+ error = traceback.format_exc()
+ self._logger.error('Could not retract item %s from topic %s\ntraceback:\n%s' \
+ % (data, self.boundjid, error))
+
+ def purge(self):
+ try:
+ result = self['xep_0060'].purge(self._server, self.boundjid)
+ self._logger.info('Purged all items from topic %s' % self.boundjid)
+ except:
+ error = traceback.format_exc()
+ self._logger.error('Could not purge items from topic %s\ntraceback:\n%s' \
+ % (self.boundjid, error))
+
+ def subscribe(self, node):
+ try:
+ result = self['xep_0060'].subscribe(self._server, node)
+ #self._logger.debug('Subscribed %s to node %s' \
+ #% (self.boundjid.bare, node))
+ self._logger.info(' Subscribed %s to topic %s' \
+ % (self.boundjid.user, node))
+ except:
+ error = traceback.format_exc()
+ self._logger.error(' Could not subscribe %s to topic %s\ntraceback:\n%s' \
+ % (self.boundjid.bare, node, error))
+
+ def unsubscribe(self, node):
+ try:
+ result = self['xep_0060'].unsubscribe(self._server, node)
+ self._logger.info(' Unsubscribed %s from topic %s' % (self.boundjid.bare, node))
+ except:
+ error = traceback.format_exc()
+ self._logger.error(' Could not unsubscribe %s from topic %s\ntraceback:\n%s' \
+ % (self.boundjid.bare, node, error))
+
+ def _check_for_tag(self, treeroot, namespaces, tag):
+ for element in treeroot.iter(namespaces+tag):
+ if element.text:
+ return element
+ else :
+ return None
+
+ def _check_output(self, treeroot, namespaces):
+ output_param = ["TARGET", "REASON", "PATH", "APPID", "VALUE"]
+ response = ""
+ for elt in output_param:
+ msg = self._check_for_tag(treeroot, namespaces, elt)
+ if msg is not None:
+ response = response + " " + msg.text + " :"
+ deb = self._check_for_tag(treeroot, namespaces, "MESSAGE")
+ if deb is not None:
+ self._logger.debug(response + " " + deb.text)
+ else :
+ self._logger.info(response)
+
+ def handle_omf_message(self, iq):
+ namespaces = "{http://jabber.org/protocol/pubsub}"
+ for i in iq['pubsub_event']['items']:
+ root = ET.fromstring(str(i))
+ self._check_output(root, namespaces)
+
+
--- /dev/null
+#!/usr/bin/env python
+from neco.execution.resource import Resource, clsinit
+from neco.execution.attribute import Attribute
+
+from neco.resources.omf.omf_api import OMFAPIFactory
+
+import neco
+import logging
+
+@clsinit
+class OMFWifiInterface(Resource):
+ _rtype = "OMFWifiInterface"
+ _authorized_connections = ["OMFNode" , "OMFChannel"]
+
+ #alias2name = dict({'w0':'wlan0', 'w1':'wlan1'})
+
+ @classmethod
+ def _register_attributes(cls):
+ alias = Attribute("alias","Alias of the interface", default_value = "w0")
+ mode = Attribute("mode","Mode of the interface")
+ type = Attribute("type","Type of the interface")
+ essid = Attribute("essid","Essid of the interface")
+ ip = Attribute("ip","IP of the interface")
+ xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02")
+ xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02")
+ xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02")
+ xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02")
+ cls._register_attribute(alias)
+ cls._register_attribute(xmppSlice)
+ cls._register_attribute(xmppHost)
+ cls._register_attribute(xmppPort)
+ cls._register_attribute(xmppPassword)
+ cls._register_attribute(mode)
+ cls._register_attribute(type)
+ cls._register_attribute(essid)
+ cls._register_attribute(ip)
+
+ def __init__(self, ec, guid, creds):
+ super(OMFWifiInterface, self).__init__(ec, guid)
+ self.set('xmppSlice', creds['xmppSlice'])
+ self.set('xmppHost', creds['xmppHost'])
+ self.set('xmppPort', creds['xmppPort'])
+ self.set('xmppPassword', creds['xmppPassword'])
+
+ self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+ self._alias = self.get('alias')
+
+ self._logger = logging.getLogger("neco.omf.omfIface ")
+ self._logger.setLevel(neco.LOGLEVEL)
+
+ def _validate_connection(self, guid):
+ rm = self.ec.resource(guid)
+ if rm.rtype() in self._authorized_connections:
+ self._logger.debug("Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid))
+ return True
+ self._logger.debug("Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid))
+ return False
+
+ def _get_nodes(self, conn_set):
+ for elt in conn_set:
+ rm = self.ec.resource(elt)
+ if rm.rtype() == "OMFNode":
+ return rm
+ return None
+
+
+ def start(self):
+ self._logger.debug(self.rtype() + " ( Guid : " + str(self._guid) +") : " + self.get('mode') + " : " + self.get('type') + " : " + self.get('essid') + " : " + self.get('ip'))
+ #try:
+ if self.get('mode') and self.get('type') and self.get('essid') and self.get('ip'):
+ rm_node = self._get_nodes(self._connections)
+ for attrname in ["mode", "type", "essid", "ip"]:
+ attrval = self.get(attrname)
+ attrname = "net/%s/%s" % (self._alias, attrname)
+ #print "Send the configure message"
+ self._omf_api.configure(rm_node.get('hostname'), attrname, attrval)
+
+ def stop(self):
+ self._omf_api.disconnect()
+
+
+
--- /dev/null
+from xml.etree import cElementTree as ET
+
+EXECUTE = "EXECUTE"
+KILL = "KILL"
+STDIN = "STDIN"
+NOOP = "NOOP"
+PM_INSTALL = "PM_INSTALL"
+APT_INSTALL = "APT_INSTALL"
+RPM_INSTALL = "RPM_INSTALL"
+RESET = "RESET"
+REBOOT = "REBOOT"
+MODPROBE = "MODPROBE"
+CONFIGURE = "CONFIGURE"
+LOAD_IMAGE = "LOAD_IMAGE"
+SAVE_IMAGE = "SAVE_IMAGE"
+LOAD_DATA = "LOAD_DATA"
+SET_LINK = "SET_LINK"
+ALIAS = "ALIAS"
+SET_DISCONNECTION = "SET_DISCONNECTION"
+RESTART = "RESTART"
+ENROLL = "ENROLL"
+EXIT = "EXIT"
+
+class MessageHandler():
+ SliceID = ""
+ ExpID = ""
+
+ def __init__(self, sliceid, expid ):
+ self.SliceID = sliceid
+ self.ExpID = expid
+ print "init" + self.ExpID +" "+ self.SliceID
+ pass
+
+ def Mid(self, parent, keyword):
+ mid = ET.SubElement(parent, keyword)
+ mid.set("id", "\'omf-payload\'")
+ return mid
+
+ def Mtext(self, parent, keyword, text):
+ mtext = ET.SubElement(parent, keyword)
+ mtext.text = text
+ return mtext
+
+ def executefunction(self, target, appid, cmdlineargs, path, env):
+ payload = ET.Element("omf-message")
+ execute = self.Mid(payload,"EXECUTE")
+ env = self.Mtext(execute, "ENV", env)
+ sliceid = self.Mtext(execute,"SLICEID",self.SliceID)
+ expid = self.Mtext(execute,"EXPID",self.ExpID)
+ target = self.Mtext(execute,"TARGET",target)
+ appid = self.Mtext(execute,"APPID",appid)
+ cmdlineargs = self.Mtext(execute,"CMDLINEARGS",cmdlineargs)
+ path = self.Mtext(execute,"PATH",path)
+ return payload
+
+ def exitfunction(self, target, appid):
+ payload = ET.Element("omf-message")
+ execute = self.Mid(payload,"EXIT")
+ sliceid = self.Mtext(execute,"SLICEID",self.SliceID)
+ expid = self.Mtext(execute,"EXPID",self.ExpID)
+ target = self.Mtext(execute,"TARGET",target)
+ appid = self.Mtext(execute,"APPID",appid)
+ return payload
+
+ def configurefunction(self, target, value, path):
+ payload = ET.Element("omf-message")
+ config = self.Mid(payload, "CONFIGURE")
+ sliceid = self.Mtext(config,"SLICEID",self.SliceID)
+ expid = self.Mtext(config,"EXPID",self.ExpID)
+ target = self.Mtext(config,"TARGET",target)
+ value = self.Mtext(config,"VALUE",value)
+ path = self.Mtext(config,"PATH",path)
+ return payload
+
+ def logfunction(self,level, logger, level_name, data):
+ payload = ET.Element("omf-message")
+ log = self.Mid(payload, "LOGGING")
+ level = self.Mtext(log,"LEVEL",level)
+ sliceid = self.Mtext(log,"SLICEID",self.SliceID)
+ logger = self.Mtext(log,"LOGGER",logger)
+ expid = self.Mtext(log,"EXPID",self.ExpID)
+ level_name = self.Mtext(log,"LEVEL_NAME",level_name)
+ data = self.Mtext(log,"DATA",data)
+ return payload
+
+ def aliasfunction(self, name, target):
+ payload = ET.Element("omf-message")
+ alias = self.Mid(payload,"ALIAS")
+ sliceid = self.Mtext(alias,"SLICEID",self.SliceID)
+ expid = self.Mtext(alias,"EXPID",self.ExpID)
+ name = self.Mtext(alias,"NAME",name)
+ target = self.Mtext(alias,"TARGET",target)
+ return payload
+
+ def enrollfunction(self, enrollkey, image, index, target ):
+ payload = ET.Element("omf-message")
+ enroll = self.Mid(payload,"ENROLL")
+ enrollkey = self.Mtext(enroll,"ENROLLKEY",enrollkey)
+ sliceid = self.Mtext(enroll,"SLICEID",self.SliceID)
+ image = self.Mtext(enroll,"IMAGE",image)
+ expid = self.Mtext(enroll,"EXPID",self.ExpID)
+ index = self.Mtext(enroll,"INDEX",index)
+ target = self.Mtext(enroll,"TARGET",target)
+ return payload
+
+ def noopfunction(self,target):
+ payload = ET.Element("omf-message")
+ noop = self.Mid(payload,"NOOP")
+ sliceid = self.Mtext(noop,"SLICEID",self.SliceID)
+ expid = self.Mtext(noop,"EXPID",self.ExpID)
+ target = self.Mtext(noop,"TARGET",target)
+ return payload
+
+ def newexpfunction(self, experimentid, address):
+ payload = ET.Element("omf-message")
+ newexp = self.Mid(payload,"EXPERIMENT_NEW")
+ experimentid = self.Mtext(newexp,"EXPERIMENT_ID",experimentid)
+ sliceid = self.Mtext(newexp,"SLICEID",self.SliceID)
+ expid = self.Mtext(newexp,"EXPID",self.ExpID)
+ address = self.Mtext(newexp,"ADDRESS",address)
+ return payload
+
+ def handle_message(self, msg):
+ # Do something!!!
+ return msg
--- /dev/null
+#!/usr/bin/env python
+from neco.execution.resource import Resource, clsinit
+from neco.execution.attribute import Attribute
+
+from neco.resources.omf.omf_api import OMFAPIFactory
+
+import neco
+import logging
+
+@clsinit
+class OMFNode(Resource):
+ _rtype = "OMFNode"
+ _authorized_connections = ["OMFApplication" , "OMFWifiInterface"]
+
+ @classmethod
+ def _register_attributes(cls):
+ hostname = Attribute("hostname", "Hostname of the machine")
+ cpu = Attribute("cpu", "CPU of the node")
+ ram = Attribute("ram", "RAM of the node")
+ xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02")
+ xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02")
+ xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02")
+ xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02")
+ cls._register_attribute(hostname)
+ cls._register_attribute(ram)
+ cls._register_attribute(cpu)
+ cls._register_attribute(xmppSlice)
+ cls._register_attribute(xmppHost)
+ cls._register_attribute(xmppPort)
+ cls._register_attribute(xmppPassword)
+
+ @classmethod
+ def _register_filters(cls):
+ hostname = Attribute("hostname", "Hostname of the machine")
+ gateway = Attribute("gateway", "Gateway")
+ granularity = Attribute("granularity", "Granularity of the reservation time")
+ hardware_type = Attribute("hardware_type", "Hardware type of the machine")
+ cls._register_filter(hostname)
+ cls._register_filter(gateway)
+ cls._register_filter(granularity)
+ cls._register_filter(hardware_type)
+
+ def __init__(self, ec, guid, creds):
+ super(OMFNode, self).__init__(ec, guid)
+ self.set('xmppSlice', creds['xmppSlice'])
+ self.set('xmppHost', creds['xmppHost'])
+ self.set('xmppPort', creds['xmppPort'])
+ self.set('xmppPassword', creds['xmppPassword'])
+
+ self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+
+ self._logger = logging.getLogger("neco.omf.omfNode ")
+ self._logger.setLevel(neco.LOGLEVEL)
+
+ def _validate_connection(self, guid):
+ rm = self.ec.resource(guid)
+ if rm.rtype() in self._authorized_connections:
+ self._logger.debug("Connection between %s %s and %s %s accepted" % (self.rtype(), self._guid, rm.rtype(), guid))
+ return True
+ self._logger.debug("Connection between %s %s and %s %s refused" % (self.rtype(), self._guid, rm.rtype(), guid))
+ return False
+
+ def discover(self):
+ pass
+
+ def provision(self, credential):
+ pass
+
+ def start(self):
+ self._omf_api.enroll_host(self.get('hostname'))
+
+ def stop(self):
+ self._omf_api.disconnect()
+
+ def configure(self):
+ #routes = self.tc._add_route.get(self.guid, [])
+ #iface_guids = self.tc.get_connected(self.guid, "devs", "node")
+
+ for route in routes:
+ (destination, netprefix, nexthop, metric, device) = route
+ netmask = ipaddr2.ipv4_mask2dot(netprefix)
+
+ # Validate that the interface is associated to the node
+ for iface_guid in iface_guids:
+ iface = self.tc.elements.get(iface_guid)
+ if iface.devname == device:
+ self._omf_api.execute(self.get('hostname'),
+ "Id#%s" % str(random.getrandbits(128)),
+ "add -net %s netmask %s dev %s" % (destination, netmask, iface.devname),
+ "/sbin/route", # path
+ None, # env
+ )
+ break
--- /dev/null
+#!/usr/bin/env python
+from neco.execution.resource import Resource, clsinit
+from neco.execution.attribute import Attribute
+
+from neco.resources.omf.omf_api import OMFAPIFactory
+
+@clsinit
+class OMFResource(Resource):
+ _rtype = "OMFResource"
+
+ @classmethod
+ def _register_attributes(cls):
+ xmppSlice = Attribute("xmppSlice","Name of the slice", flags = "0x02")
+ xmppHost = Attribute("xmppHost", "Xmpp Server",flags = "0x02")
+ xmppPort = Attribute("xmppPort", "Xmpp Port",flags = "0x02")
+ xmppPassword = Attribute("xmppPassword", "Xmpp Port",flags = "0x02")
+ cls._register_attribute(xmppSlice)
+ cls._register_attribute(xmppHost)
+ cls._register_attribute(xmppPort)
+ cls._register_attribute(xmppPassword)
+
+ def __init__(self, ec, guid, creds):
+ super(OMFNode, self).__init__(ec, guid)
+ self.set('xmppSlice', creds['xmppSlice'])
+ self.set('xmppHost', creds['xmppHost'])
+ self.set('xmppPort', creds['xmppPort'])
+ self.set('xmppPassword', creds['xmppPassword'])
+
+ self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'))
+
+ def discover(self):
+ pass
+
+ def provision(self, credential):
+ pass
+
+
self.assertEquals(AnotherResource.rtype(), "AnotherResource")
self.assertEquals(len(AnotherResource._attributes), 0)
+ #self.assertEquals(OmfNode.rtype(), "OmfNode")
+ #self.assertEquals(len(OmfNode._attributes), 0)
+
self.assertEquals(len(ResourceFactory.resource_types()), 2)
if __name__ == '__main__':
--- /dev/null
+#!/usr/bin/env python
+from neco.execution.resource import Resource, ResourceFactory
+from neco.execution.ec import ExperimentController
+
+from neco.resources.omf.omf_node import OMFNode
+from neco.resources.omf.omf_application import OMFApplication
+from neco.resources.omf.omf_interface import OMFWifiInterface
+from neco.resources.omf.omf_channel import OMFChannel
+from neco.resources.omf.omf_api import OMFAPIFactory
+
+from neco.util import guid
+
+import time
+import unittest
+import logging
+
+logging.basicConfig()
+
+
+class DummyEC(ExperimentController):
+ pass
+
+class OMFVLCTestCase(unittest.TestCase):
+
+ def setUp(self):
+ #self.guid_generator = guid.GuidGenerator()
+ self._creds = {'xmppSlice' : 'nepi' , 'xmppHost' : 'xmpp-plexus.onelab.eu' , 'xmppPort' : '5222', 'xmppPassword' : '1234' }
+
+ def tearDown(self):
+ pass
+
+ def test_creation_phase(self):
+ ec = DummyEC()
+
+ ResourceFactory.register_type(OMFNode)
+ ResourceFactory.register_type(OMFWifiInterface)
+ ResourceFactory.register_type(OMFChannel)
+ ResourceFactory.register_type(OMFApplication)
+
+ self.assertEquals(OMFNode.rtype(), "OMFNode")
+ self.assertEquals(len(OMFNode._attributes), 7)
+
+ self.assertEquals(OMFWifiInterface.rtype(), "OMFWifiInterface")
+ self.assertEquals(len(OMFWifiInterface._attributes), 9)
+
+ self.assertEquals(OMFChannel.rtype(), "OMFChannel")
+ self.assertEquals(len(OMFChannel._attributes), 5)
+
+ self.assertEquals(OMFApplication.rtype(), "OMFApplication")
+ self.assertEquals(len(OMFApplication._attributes), 8)
+
+ self.assertEquals(len(ResourceFactory.resource_types()), 4)
+
+ #def xtest_creation_and_configuration_node(self):
+ guid = ec.register_resource("OMFNode", creds = self._creds)
+ node1 = ec._resources[guid]
+ node1.set('hostname', 'omf.plexus.wlab17')
+
+ guid = ec.register_resource("OMFNode", creds = self._creds)
+ node2 = ec._resources[guid]
+ node2.set('hostname', "omf.plexus.wlab37")
+
+ #def xtest_creation_and_configuration_interface(self):
+ guid = ec.register_resource("OMFWifiInterface", creds = self._creds)
+ iface1 = ec._resources[guid]
+ iface1.set('alias', "w0")
+ iface1.set('mode', "adhoc")
+ iface1.set('type', "g")
+ iface1.set('essid', "helloworld")
+ iface1.set('ip', "10.0.0.17")
+
+ guid = ec.register_resource("OMFWifiInterface", creds = self._creds)
+ iface2 = ec._resources[guid]
+ iface2.set('alias', "w0")
+ iface2.set('mode', "adhoc")
+ iface2.set('type', 'g')
+ iface2.set('essid', "helloworld")
+ iface2.set('ip', "10.0.0.37")
+
+ #def xtest_creation_and_configuration_channel(self):
+ guid = ec.register_resource("OMFChannel", creds = self._creds)
+ channel = ec._resources[guid]
+ channel.set('channel', "6")
+
+ #def xtest_creation_and_configuration_application(self):
+ guid = ec.register_resource("OMFApplication", creds = self._creds)
+ app1 = ec._resources[guid]
+ app1.set('appid', 'Vlc#1')
+ app1.set('path', "/opt/vlc-1.1.13/cvlc")
+ app1.set('args', "/opt/10-by-p0d.avi --sout '#rtp{dst=10.0.0.37,port=1234,mux=ts}'")
+ app1.set('env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
+
+ guid = ec.register_resource("OMFApplication", creds = self._creds)
+ app2 = ec._resources[guid]
+ app2.set('appid', 'Vlc#2')
+ app2.set('path', "/opt/vlc-1.1.13/cvlc")
+ app2.set('args', "rtp://10.0.0.37:1234")
+ app2.set('env', "DISPLAY=localhost:10.0 XAUTHORITY=/root/.Xauthority")
+ self.assertEquals(len(OMFAPIFactory._Api), 1)
+
+ #def test_connection(self):
+ app1.connect(node1._guid)
+ node1.connect(app1._guid)
+
+ node1.connect(iface1._guid)
+ iface1.connect(node1._guid)
+
+ iface1.connect(channel._guid)
+ channel.connect(iface1._guid)
+
+ channel.connect(iface2._guid)
+ iface2.connect(channel._guid)
+
+ iface2.connect(node2._guid)
+ node2.connect(iface2._guid)
+
+ node2.connect(app2._guid)
+ app2.connect(node2._guid)
+
+ #def test_start_node(self):
+ node1.start()
+ node2.start()
+ time.sleep(1)
+ #pass
+
+ #def test_start_interface(self):
+ iface1.start()
+ iface2.start()
+
+ #def test_start_channel(self):
+ channel.start()
+ time.sleep(1)
+
+ #def test_start_application(self):
+ app1.start()
+ time.sleep(2)
+ app2.start()
+
+ time.sleep(10)
+
+ #def test_stop_application(self):
+ app1.stop()
+ app2.stop()
+ time.sleep(2)
+
+
+ #def test_stop_nodes(self):
+ node1.stop()
+ #node2.stop()
+
+
+if __name__ == '__main__':
+ unittest.main()
+