From: Barış Metin Date: Mon, 8 Mar 2010 15:08:18 +0000 (+0000) Subject: OMF integration for plcapi, only activated when PLC_OMF_ENABLED is true. X-Git-Tag: PLCAPI-5.0-3^2~30 X-Git-Url: http://git.onelab.eu/?p=plcapi.git;a=commitdiff_plain;h=1cda98c34563ed3e90a9d2219730ae81f9705dc6 OMF integration for plcapi, only activated when PLC_OMF_ENABLED is true. omfaspects.py wraps PLCAPI method calls and manages XMPP PubSub groups for slices and nodes added to slices. --- diff --git a/PLC/API.py b/PLC/API.py index 9d61ed9..55a0349 100644 --- a/PLC/API.py +++ b/PLC/API.py @@ -126,10 +126,16 @@ class PLCAPI: if self.config.PLC_DB_TYPE == "postgresql": from PLC.PostgreSQL import PostgreSQL self.db = PostgreSQL(self) - else: raise PLCAPIError, "Unsupported database type " + self.config.PLC_DB_TYPE + # Aspects modify the API injecting code before/after method + # calls. As of now we only have aspects for OMF integration, + # that's why we enable aspects only if PLC_OMF is set to true. + if self.config.PLC_OMF: + from aspects import apply_aspects; apply_aspects() + + def callable(self, method): """ Return a new instance of the specified method. diff --git a/PLCAPI.spec b/PLCAPI.spec index 081c5d3..b3a28b1 100644 --- a/PLCAPI.spec +++ b/PLCAPI.spec @@ -38,6 +38,10 @@ Requires: mod_ssl Requires: SOAPpy # for the RebootNodeWithPCU method Requires: pcucontrol +# for OMF integration +Requires: pyaspects +Requires: python-twisted-words +Requires: python-twisted-web ### avoid having yum complain about updates, as stuff is moving around # plc.d/api diff --git a/aspects/__init__.py b/aspects/__init__.py new file mode 100644 index 0000000..bcce3bc --- /dev/null +++ b/aspects/__init__.py @@ -0,0 +1,16 @@ + + +from pyaspects.weaver import weave_class_method + +from PLC.Method import Method +from aspects.omfaspects import OMFAspect + + +def apply_aspects(): + + # TEST + #from pyaspects.debuggeraspect import DebuggerAspect + #weave_class_method(DebuggerAspect(out=open("/tmp/all_method_calls.log", "a")), Method, "__call__") + + # track all PLC methods to add OMF hooks + weave_class_method(OMFAspect(), Method, "__call__") diff --git a/aspects/omfaspects.py b/aspects/omfaspects.py new file mode 100644 index 0000000..b0882a3 --- /dev/null +++ b/aspects/omfaspects.py @@ -0,0 +1,184 @@ +# Baris Metin + +import os +import xmlrpclib + +from PLC.Slices import Slices +from PLC.SliceTags import SliceTags +from PLC.Nodes import Nodes +from PLC.Config import Config +from pyaspects.meta import MetaAspect + + +class BaseOMF(object): + + def __init__(self): + self.config = Config("/etc/planetlab/plc_config") + self.log = open("/var/log/omf/plc_slice_calls.log", "a") + + def logit(self, call, args, kwargs, data, slice): + self.log.write("%s : args: %s kwargs: %s\n" % (call, args, kwargs)) + self.log.write("data: %s\n" % data) + self.log.write("%s\n\n" % slice) + self.log.flush() + + + def get_slice(self, api, id_or_name): + slice_filter = {} + if isinstance(id_or_name, str): + slice_filter['name'] = id_or_name + else: + slice_filter['slice_id']= id_or_name + slice = Slices(api, slice_filter = slice_filter)[0] +# don't bother to check for slice tags for the moment. we'll only +# create XMPP pubsub groups for all slices +# +# slice_tags = SliceTags(api, slice_tag_filter = { 'slice_id': slice['slice_id'] }) +# omf_tag = [tag for tag in slice_tags if tag['name'] == 'omf'] +# if omf_tag and omf_tag['value'] not in ('false','0','no'): +# # OK, slice has the "omf" tag set. +# return slice +# return None + return slice + + def get_node_hostname(self, api, node_id): + node_filter = {'node_id': node_id } + try: + node = Nodes(api, node_filter = node_filter)[0] + return node['hostname'] + except IndexError: + return None + + def get_slice_tags(self, api, slice_id): + return SliceTags(api, slice_tag_filter = {'slice_id': slice_id}) + + def create_slice(self, slice): + pass + + def add_resource(self, slice, resource): + pass + + def delete_slice(self, slice): + pass + + def delete_resource(self, slice, resource): + pass + + # aspect method + def before(self, wobj, data, *args, **kwargs): + api_method_name = wobj.name + slice_name_or_id = None + node_ids = None + + # DeleteSlice shall be handled before the actual method call; + # after the call we won't be able to acess the slice. + if api_method_name == "DeleteSlice": + slice_name_or_id = args[1] + else: # ignore the rest + return + + slice = self.get_slice(wobj.api, slice_name_or_id) + if not slice: + return + + if api_method_name == "DeleteSlice": + self.delete_slice(slice['name']) + + self.logit(wobj.name, args, kwargs, data, slice) + + # aspect method + def after(self, wobj, data, *args, **kwargs): + api_method_name = wobj.name + slice_name_or_id = None + node_ids = None + if api_method_name == "AddSlice": + slice_name_or_id = args[1]['name'] + elif api_method_name == "AddSliceToNodes" or api_method_name == "DeleteSliceFromNodes": + slice_name_or_id = args[1] + node_ids = args[2] + else: # ignore the rest + #self.logit(wobj.name, args, kwargs, data, "SLICE") + return + + slice = self.get_slice(wobj.api, slice_name_or_id) + if not slice: + return + + if api_method_name == "AddSlice": + self.create_slice(slice['name']) + elif api_method_name == "AddSliceToNodes": + for node_id in node_ids: + node_hostname = self.get_node_hostname(wobj.api, node_id) + self.add_resource(slice['name'], node_hostname) + elif api_method_name == "DeleteSliceFromNodes": + for node_id in node_ids: + node_hostname = self.get_node_hostname(wobj.api, node_id) + self.delete_resource(slice['name'], node_hostname) + + self.logit(wobj.name, args, kwargs, data, slice) + + +class OMFAspect_xmpp(BaseOMF): + __metaclass__ = MetaAspect + name = "omfaspect_xmpp" + + def __init__(self): + BaseOMF.__init__(self) + xmppserver = self.config.PLC_OMF_XMPP_SERVER + xmppuser = "@".join([self.config.PLC_OMF_XMPP_USER, xmppserver]) + xmpppass = self.config.PLC_OMF_XMPP_PASSWORD + self.slicemgr = Slicemgr(xmppuser, xmpppass, + log=open("/var/log/omf/pubsub_client.log", "a"), + verbose=True) + + def create_slice(self, slice): + self.slicemgr.create_slice(slice) + + def add_resource(self, slice, resource): + self.slicemgr.add_resource(slice, resource) + + def delete_slice(self, slice): + self.slicemgr.delete_slice(slice) + + def delete_resource(self, slice, resource): + self.slicemgr.delete_resource(slice, resource) + + def before(self, wobj, data, *args, **kwargs): + BaseOMF.before(self, wobj, data, *args, **kwargs) + + def after(self, wobj, data, *args, **kwargs): + BaseOMF.after(self, wobj, data, *args, **kwargs) + + +class OMFAspect_xmlrpc(BaseOMF): + __metaclass__ = MetaAspect + name = "omfaspect_xmlrpc" + + def __init__(self): + BaseOMF.__init__(self) + + slicemgr_url = self.config.PLC_OMF_SLICEMGR_URL + self.server = xmlrpclib.ServerProxy(slicemgr_url) + + def create_slice(self, slice): + self.server.slicemgr.createSlice(slice) + + def add_resource(self, slice, resource): + self.server.slicemgr.addResource(slice, resource) + + def delete_slice(self, slice): + self.server.slicemgr.deleteSlice(slice) + + def delete_resource(self, slice, resource): + self.server.slicemgr.removeResource(slice, resource) + + def before(self, wobj, data, *args, **kwargs): + BaseOMF.before(self, wobj, data, *args, **kwargs) + + def after(self, wobj, data, *args, **kwargs): + BaseOMF.after(self, wobj, data, *args, **kwargs) + + + +OMFAspect = OMFAspect_xmlrpc + diff --git a/db-config.d/010-slice_tags b/db-config.d/010-slice_tags index 850dce5..20a1e3c 100644 --- a/db-config.d/010-slice_tags +++ b/db-config.d/010-slice_tags @@ -153,6 +153,12 @@ slicetag_types = \ 'description': "Is a default Distributed Rate Limiting slice (1) or not (0 or unset)", 'category' : 'slice/general', 'min_role_id': 10}, + + # OMF controlled slice + {'tagname': "omf", + 'description': "Enable OMF (cOntrol and Management Framework) for the slice", + 'category' : 'slice/general', + 'min_role_id': 10}, ] # add in the platform supported rlimits to the default_attribute_types diff --git a/omf/omf-slicemgr.py b/omf/omf-slicemgr.py new file mode 100644 index 0000000..310e656 --- /dev/null +++ b/omf/omf-slicemgr.py @@ -0,0 +1,270 @@ +# Baris Metin + +import os +import sys +import Queue +from twisted.words.xish import domish +from twisted.web import xmlrpc, server +from twisted.internet import reactor, task +from twisted.words.protocols.jabber import xmlstream, client, jid + +sys.path.append("/usr/share/plc_api/") +from PLC.Config import Config + + +class BaseClient(object): + """ Base XMPP client: handles authentication and basic presence/message requests. """ + def __init__(self, id, secret, verbose = False, log = None): + + if isinstance(id, (str, unicode)): + id = jid.JID(id) + x = client.XMPPClientFactory(id, secret) + x.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self.event_connected) + x.addBootstrap(xmlstream.STREAM_END_EVENT, self.event_disconnected) + x.addBootstrap(xmlstream.INIT_FAILED_EVENT, self.event_init_failed) + x.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self.event_authenticated) + self.id = id + self.factory = x + self.verbose = verbose + self.log = log or sys.stdout + + def __rawDataIN(self, buf): + if self.verbose: self.msg("RECV: %s" % buf) + + def __rawDataOUT(self, buf): + if self.verbose: self.msg("SEND: %s" % buf) + + def msg(self, msg): + self.log.write("%s\n" % msg) + self.log.flush() + + def error(self, msg): + self.msg("ERROR: %s" % msg) + + def warn(self, msg): + self.msg("WARN: %s" % msg) + + def info(self, msg): + self.msg("INFO: %s" % msg) + + def event_connected(self, xs): + # log all traffic + xs.rawDataInFn = self.__rawDataIN + xs.rawDataOutFn = self.__rawDataOUT + self.xmlstream = xs + + def event_disconnected(self, xs): + pass + + def event_init_failed(self, xs): + self.error("Init Failed") + + def event_authenticated(self, xs): + presence = domish.Element(("jabber:client", "presence")) + presence.addElement("show", content="dnd") + presence.addElement("status", content="man at work") + xs.send(presence) + + # add protocol handlers + xs.addObserver("/presence[@type='subscribe']", self.presence_subscribe) + xs.addObserver("/presence[@type='unsubscribe']", self.presence_unsubscribe) + xs.addObserver("/precence", self.presence) + xs.addObserver("/message[@type='chat']", self.message_chat) + + def presence_subscribe(self, m): + self.info("%s request to add us, granting." % m['from']) + p = domish.Element(("jabber:client", "presence")) + p['from'], p['to'] = m['to'], m['from'] + p['type'] = "subscribed" + self.xmlstream.send(p) + + def presence_unsubscribe(self, m): + # try to re-subscribe + self.info("%s removed us, trying to re-authenticate." % m['from']) + p = domish.Element(("jabber:client", "presence")) + p['from'], p['to'] = m['to'], m['from'] + p['type'] = "subscribe" + self.xmlstream.send(p) + + def presence(self, m): + p = domish.Element(("jabber:client", "presence")) + p['from'], p['to'] = m['to'], m['from'] + presence.addElement("show", content="dnd") + presence.addElement("status", content="man at work") + self.xmlstream.send(p) + + def message_chat(self, m): + n = domish.Element((None, "message")) + n['to'], n['from'] = m['from'], m['to'] + n.addElement("body", content = "don't have time to chat. working!") + self.xmlstream.send(n) + + +class PubSubClient(BaseClient): + """ PubSub (XEP 0060) implementation """ + + def __init__(self, id, secret, verbose = False, log = None): + BaseClient.__init__(self, id, secret, verbose = verbose, log = log) + self.hooks = {} + + def add_result_hook(self, hook_to, hook): + self.hooks[hook_to] = hook + + def delete_result_hook(self, hook_to): + if self.hooks.has_key(hook_to): + del self.hooks[hook_to] + + def event_authenticated(self, xs): + BaseClient.event_authenticated(self, xs) + self.requests = {} + xs.addObserver("/iq/pubsub/create", self.result_create_node) + xs.addObserver("/iq/pubsub/delete", self.result_delete_node) + xs.addObserver("/iq/query[@xmlns='http://jabber.org/protocol/disco#items']", self.result_discover) + + def __iq(self, t="get"): + iq = domish.Element((None, "iq")) + iq['from'] = self.id.full() + iq['to'] = "pubsub.vplc27.inria.fr" + iq['type'] = t + iq.addUniqueId() + return iq + + def discover(self, node = None): + iq = self.__iq("get") + query = iq.addElement("query") + query['xmlns'] = "http://jabber.org/protocol/disco#items" + if node: + query['node'] = node + self.requests[iq['id']] = node + self.xmlstream.send(iq) + + def result_discover(self, iq): + if self.verbose: self.info("Items for node: %s" % self.requests[iq['id']]) + + hook = self.hooks.get('discover', None) + for i in iq.query.elements(): + if self.verbose: self.msg(i.toXml()) + if hook: hook(i) + self.requests.pop(iq['id']) + + def create_node(self, node = None): + iq = self.__iq("set") + pubsub = iq.addElement("pubsub") + pubsub['xmlns'] = "http://jabber.org/protocol/pubsub" + create = pubsub.addElement("create") + if node: + create['node'] = node + configure = pubsub.addElement("configure") + self.requests[iq['id']] = node + self.xmlstream.send(iq) + + def result_create_node(self, iq): +# if hasattr(iq, "error"): +# node = self.requests[iq['id']] +# if hasattr(iq.error, "conflict"): +# # node is already there, nothing important. +# self.warn("NodeID exists: %s" % node) +# else: +# err_type = "" +# err_name = "" +# if iq.error: +# if iq.error.has_key('type'): +# err_type = iq.error['type'] +# if iq.error.firstChildElement and hasattr(iq.error.firstChildElement, "name"): +# err_name = iq.error.firstChildElement.name +# self.error("Can not create node: %s (error type: %s, %s)" % (node, err_type, err_name)) + self.requests.pop(iq['id']) + + + def delete_node(self, node): + iq = self.__iq("set") + pubsub = iq.addElement("pubsub") + pubsub['xmlns'] = "http://jabber.org/protocol/pubsub#owner" + delete = pubsub.addElement("delete") + delete['node'] = node + self.requests[iq['id']] = node + self.xmlstream.send(iq) + + def result_delete_node(self, iq): + self.requests.pop(iq['id']) + + + + +class Slicemgr(PubSubClient, xmlrpc.XMLRPC): + + DOMAIN = "/OMF" + RESOURCES = 'resources' + + def __init__(self, id, secret, verbose = False, log = None): + PubSubClient.__init__(self, id, secret, verbose = verbose, log = log) + self.command_queue = Queue.Queue() + + # for xmlrpc interface + self.allowNone = True + + def xmlrpc_createSlice(self, slice): + self.create_slice(slice) + + def xmlrpc_addResource(self, slice, resource): + self.add_resource(slice, resource) + + def xmlrpc_deleteSlice(self, slice): + self.delete_slice(slice) + + def xmlrpc_removeResource(self, slice, resource): + self.delete_resource(slice, resource) + + + def flush_commands(self): +# self.info("Running %d commands" % self.command_queue.qsize()) + while not self.command_queue.empty(): + (meth, param) = self.command_queue.get() + meth(param) + + def create_slice(self, slice): + self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice]) )) + self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice,self.RESOURCES]) )) + + def add_resource(self, slice, resource): + self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice,self.RESOURCES,resource]) )) + + def delete_slice(self, slice): + slice_prefix = "/".join([self.DOMAIN,slice]) + resource_prefix = "/".join([self.DOMAIN,slice,self.RESOURCES]) + def delete_slice_resources(iq): + node = iq['node'] + if node.startswith(resource_prefix): + self.command_que.put(self.delete_node, node) + + self.add_result_hook("discover", delete_slice_resources) + self.discover() + self.delete_result_hook("discover") + + self.command_queue.put(( self.delete_node, resource_prefix) ) + self.command_queue.put(( self.delete_node, slice_prefix) ) + + def delete_resource(self, slice, resource): + self.command_queue.put(( self.delete_node, "/".join([self.DOMAIN,slice,self.RESOURCES,resource]) )) + + + +if __name__ == "__main__": + + config = Config("/etc/planetlab/plc_config") + + xmppserver = self.config.PLC_OMF_XMPP_SERVER + xmppuser = "@".join([self.config.PLC_OMF_XMPP_USER, xmppserver]) + xmpppass = self.config.PLC_OMF_XMPP_PASSWORD + slicemgr = Slicemgr(xmppuser, xmpppass, + log=open("/var/log/omf/pubsub_client.log", "a"), + verbose=True) + + t = task.LoopingCall(slicemgr.flush_commands) + t.start(5.0) # check every 5 seconds + reactor.connectTCP(slicemgr.id.host, 5222, slicemgr.factory) + reactor.listenTCP(5053, server.Site(slicemgr)) + reactor.run(installSignalHandlers=True) + + + diff --git a/plc.d/omf-slicemgr b/plc.d/omf-slicemgr new file mode 100755 index 0000000..985cd6d --- /dev/null +++ b/plc.d/omf-slicemgr @@ -0,0 +1,32 @@ +#!/bin/bash +# $Id$ +# $URL$ +# +# priority: 800 +# + + +# Source function library and configuration +. /etc/plc.d/functions +. /etc/planetlab/plc_config +local_config=/etc/planetlab/configs/site.xml + +# Be verbose +set -x + +case "$1" in + start) + if [ "$PLC_OMF_ENABLED" != "1" ] ; then + exit 0 + fi + + MESSAGE=$"Starting the OMF Slice Manager" + dialog "$MESSAGE" + + daemon /usr/bin/omf-slicemgr.py + + result "$MESSAGE" + ;; +esac + +exit $ERRORS