OMF integration for plcapi, only activated when PLC_OMF_ENABLED is true.
authorBarış Metin <Talip-Baris.Metin@sophia.inria.fr>
Mon, 8 Mar 2010 15:08:18 +0000 (15:08 +0000)
committerBarış Metin <Talip-Baris.Metin@sophia.inria.fr>
Mon, 8 Mar 2010 15:08:18 +0000 (15:08 +0000)
omfaspects.py wraps PLCAPI method calls and manages XMPP PubSub groups for slices and nodes added to slices.

PLC/API.py
PLCAPI.spec
aspects/__init__.py [new file with mode: 0644]
aspects/omfaspects.py [new file with mode: 0644]
db-config.d/010-slice_tags
omf/omf-slicemgr.py [new file with mode: 0644]
plc.d/omf-slicemgr [new file with mode: 0755]

index 9d61ed9..55a0349 100644 (file)
@@ -126,10 +126,16 @@ class PLCAPI:
         if self.config.PLC_DB_TYPE == "postgresql":
             from PLC.PostgreSQL import PostgreSQL
             self.db = PostgreSQL(self)
-
         else:
             raise PLCAPIError, "Unsupported database type " + self.config.PLC_DB_TYPE
 
+        # Aspects modify the API injecting code before/after method
+        # calls. As of now we only have aspects for OMF integration,
+        # that's why we enable aspects only if PLC_OMF is set to true.
+        if self.config.PLC_OMF:
+            from aspects import apply_aspects; apply_aspects()
+
+
     def callable(self, method):
         """
         Return a new instance of the specified method.
index 081c5d3..b3a28b1 100644 (file)
@@ -38,6 +38,10 @@ Requires: mod_ssl
 Requires: SOAPpy
 # for the RebootNodeWithPCU method
 Requires: pcucontrol
+# for OMF integration
+Requires: pyaspects
+Requires: python-twisted-words
+Requires: python-twisted-web
 
 ### avoid having yum complain about updates, as stuff is moving around
 # plc.d/api
diff --git a/aspects/__init__.py b/aspects/__init__.py
new file mode 100644 (file)
index 0000000..bcce3bc
--- /dev/null
@@ -0,0 +1,16 @@
+
+
+from pyaspects.weaver import weave_class_method
+
+from PLC.Method import Method
+from aspects.omfaspects import OMFAspect
+
+
+def apply_aspects():
+
+    # TEST
+    #from pyaspects.debuggeraspect import DebuggerAspect
+    #weave_class_method(DebuggerAspect(out=open("/tmp/all_method_calls.log", "a")), Method, "__call__")
+
+    # track all PLC methods to add OMF hooks
+    weave_class_method(OMFAspect(), Method, "__call__")
diff --git a/aspects/omfaspects.py b/aspects/omfaspects.py
new file mode 100644 (file)
index 0000000..b0882a3
--- /dev/null
@@ -0,0 +1,184 @@
+# Baris Metin <tmetin@sophia.inria.fr>
+
+import os
+import xmlrpclib
+
+from PLC.Slices import Slices
+from PLC.SliceTags import SliceTags
+from PLC.Nodes import Nodes
+from PLC.Config import Config
+from pyaspects.meta import MetaAspect
+
+
+class BaseOMF(object):
+
+    def __init__(self):
+        self.config = Config("/etc/planetlab/plc_config")
+        self.log = open("/var/log/omf/plc_slice_calls.log", "a")
+
+    def logit(self, call, args, kwargs, data, slice):
+        self.log.write("%s : args: %s  kwargs: %s\n" % (call, args, kwargs))
+        self.log.write("data: %s\n" % data)
+        self.log.write("%s\n\n" % slice)
+        self.log.flush()
+
+           
+    def get_slice(self, api, id_or_name):
+        slice_filter = {}
+        if isinstance(id_or_name, str):
+            slice_filter['name'] = id_or_name
+        else:
+            slice_filter['slice_id']= id_or_name
+        slice = Slices(api, slice_filter = slice_filter)[0]
+# don't bother to check for slice tags for the moment. we'll only
+# create XMPP pubsub groups for all slices
+#
+#         slice_tags = SliceTags(api, slice_tag_filter = { 'slice_id': slice['slice_id'] })
+#         omf_tag = [tag for tag in slice_tags if tag['name'] == 'omf']
+#         if omf_tag and omf_tag['value'] not in ('false','0','no'):
+#             # OK, slice has the "omf" tag set.
+#             return slice
+#         return None
+        return slice
+
+    def get_node_hostname(self, api, node_id):
+        node_filter = {'node_id': node_id }
+        try:
+            node = Nodes(api, node_filter = node_filter)[0]
+            return node['hostname']
+        except IndexError:
+            return None
+        
+    def get_slice_tags(self, api, slice_id):
+        return SliceTags(api, slice_tag_filter = {'slice_id': slice_id})
+
+    def create_slice(self, slice):
+        pass
+
+    def add_resource(self, slice, resource):
+        pass
+
+    def delete_slice(self, slice):
+        pass
+
+    def delete_resource(self, slice, resource):
+        pass
+
+    # aspect method
+    def before(self, wobj, data, *args, **kwargs):
+        api_method_name = wobj.name
+        slice_name_or_id = None
+        node_ids = None
+
+        # DeleteSlice shall be handled before the actual method call;
+        # after the call we won't be able to acess the slice.
+        if api_method_name == "DeleteSlice":
+            slice_name_or_id = args[1]        
+        else: # ignore the rest
+            return
+
+        slice = self.get_slice(wobj.api, slice_name_or_id)
+        if not slice:
+            return
+
+        if api_method_name == "DeleteSlice":
+            self.delete_slice(slice['name'])
+
+        self.logit(wobj.name, args, kwargs, data, slice)
+
+    # aspect method
+    def after(self, wobj, data, *args, **kwargs):
+        api_method_name = wobj.name
+        slice_name_or_id = None
+        node_ids = None
+        if api_method_name == "AddSlice":
+            slice_name_or_id = args[1]['name']
+        elif api_method_name == "AddSliceToNodes" or api_method_name == "DeleteSliceFromNodes":
+            slice_name_or_id = args[1]
+            node_ids = args[2]
+        else: # ignore the rest
+            #self.logit(wobj.name, args, kwargs, data, "SLICE")
+            return
+
+        slice = self.get_slice(wobj.api, slice_name_or_id)
+        if not slice:
+            return
+
+        if api_method_name == "AddSlice":
+            self.create_slice(slice['name'])
+        elif api_method_name == "AddSliceToNodes":
+            for node_id in node_ids:
+                node_hostname = self.get_node_hostname(wobj.api, node_id)
+                self.add_resource(slice['name'], node_hostname)
+        elif api_method_name == "DeleteSliceFromNodes":
+            for node_id in node_ids:
+                node_hostname = self.get_node_hostname(wobj.api, node_id)
+                self.delete_resource(slice['name'], node_hostname)
+
+        self.logit(wobj.name, args, kwargs, data, slice)
+
+
+class OMFAspect_xmpp(BaseOMF):
+    __metaclass__ = MetaAspect
+    name = "omfaspect_xmpp"
+
+    def __init__(self):
+        BaseOMF.__init__(self)
+        xmppserver = self.config.PLC_OMF_XMPP_SERVER
+        xmppuser = "@".join([self.config.PLC_OMF_XMPP_USER, xmppserver])
+        xmpppass = self.config.PLC_OMF_XMPP_PASSWORD
+        self.slicemgr = Slicemgr(xmppuser, xmpppass,
+                                 log=open("/var/log/omf/pubsub_client.log", "a"),
+                                 verbose=True)
+
+    def create_slice(self, slice):
+        self.slicemgr.create_slice(slice)
+
+    def add_resource(self, slice, resource):
+        self.slicemgr.add_resource(slice, resource)
+
+    def delete_slice(self, slice):
+        self.slicemgr.delete_slice(slice)
+
+    def delete_resource(self, slice, resource):
+        self.slicemgr.delete_resource(slice, resource)
+
+    def before(self, wobj, data, *args, **kwargs):
+        BaseOMF.before(self, wobj, data, *args, **kwargs)
+
+    def after(self, wobj, data, *args, **kwargs):
+        BaseOMF.after(self, wobj, data, *args, **kwargs)
+
+
+class OMFAspect_xmlrpc(BaseOMF):
+    __metaclass__ = MetaAspect
+    name = "omfaspect_xmlrpc"
+
+    def __init__(self):
+        BaseOMF.__init__(self)
+
+        slicemgr_url = self.config.PLC_OMF_SLICEMGR_URL
+        self.server = xmlrpclib.ServerProxy(slicemgr_url)
+
+    def create_slice(self, slice):
+        self.server.slicemgr.createSlice(slice)
+
+    def add_resource(self, slice, resource):
+        self.server.slicemgr.addResource(slice, resource)
+
+    def delete_slice(self, slice):
+        self.server.slicemgr.deleteSlice(slice)
+        
+    def delete_resource(self, slice, resource):
+        self.server.slicemgr.removeResource(slice, resource)
+
+    def before(self, wobj, data, *args, **kwargs):
+        BaseOMF.before(self, wobj, data, *args, **kwargs)
+
+    def after(self, wobj, data, *args, **kwargs):
+        BaseOMF.after(self, wobj, data, *args, **kwargs)
+
+
+
+OMFAspect = OMFAspect_xmlrpc
+
index 850dce5..20a1e3c 100644 (file)
@@ -153,6 +153,12 @@ slicetag_types = \
      'description': "Is a default Distributed Rate Limiting slice (1) or not (0 or unset)",
      'category' : 'slice/general',
      'min_role_id': 10},
+
+    # OMF controlled slice
+    {'tagname': "omf",
+     'description': "Enable OMF (cOntrol and Management Framework) for the slice",
+     'category' : 'slice/general',
+     'min_role_id': 10},
 ]
 
 # add in the platform supported rlimits to the default_attribute_types
diff --git a/omf/omf-slicemgr.py b/omf/omf-slicemgr.py
new file mode 100644 (file)
index 0000000..310e656
--- /dev/null
@@ -0,0 +1,270 @@
+# Baris Metin <tmetin@sophia.inria.fr>
+
+import os
+import sys
+import Queue
+from twisted.words.xish import domish
+from twisted.web import xmlrpc, server
+from twisted.internet import reactor, task
+from twisted.words.protocols.jabber import xmlstream, client, jid
+
+sys.path.append("/usr/share/plc_api/")
+from PLC.Config import Config
+
+
+class BaseClient(object):
+    """ Base XMPP client: handles authentication and basic presence/message requests. """
+    def __init__(self, id, secret, verbose = False, log = None):
+
+        if isinstance(id, (str, unicode)):
+            id = jid.JID(id)
+        x = client.XMPPClientFactory(id, secret)
+        x.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self.event_connected)
+        x.addBootstrap(xmlstream.STREAM_END_EVENT, self.event_disconnected)
+        x.addBootstrap(xmlstream.INIT_FAILED_EVENT, self.event_init_failed)
+        x.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self.event_authenticated)
+        self.id = id
+        self.factory = x
+        self.verbose = verbose
+        self.log = log or sys.stdout
+
+    def __rawDataIN(self, buf):
+        if self.verbose: self.msg("RECV: %s" % buf)
+
+    def __rawDataOUT(self, buf):
+        if self.verbose: self.msg("SEND: %s" % buf)
+
+    def msg(self, msg):
+        self.log.write("%s\n" % msg)
+        self.log.flush()
+
+    def error(self, msg):
+        self.msg("ERROR: %s" % msg)
+
+    def warn(self, msg):
+        self.msg("WARN: %s" % msg)
+
+    def info(self, msg):
+        self.msg("INFO: %s" % msg)
+
+    def event_connected(self, xs):
+        # log all traffic
+        xs.rawDataInFn = self.__rawDataIN
+        xs.rawDataOutFn = self.__rawDataOUT
+        self.xmlstream = xs
+        
+    def event_disconnected(self, xs):
+        pass
+
+    def event_init_failed(self, xs):
+        self.error("Init Failed")
+
+    def event_authenticated(self, xs):
+        presence = domish.Element(("jabber:client", "presence"))
+        presence.addElement("show", content="dnd")
+        presence.addElement("status", content="man at work")
+        xs.send(presence)
+
+        # add protocol handlers
+        xs.addObserver("/presence[@type='subscribe']", self.presence_subscribe)
+        xs.addObserver("/presence[@type='unsubscribe']", self.presence_unsubscribe)
+        xs.addObserver("/precence", self.presence)
+        xs.addObserver("/message[@type='chat']", self.message_chat)
+
+    def presence_subscribe(self, m):
+        self.info("%s request to add us, granting." % m['from'])
+        p = domish.Element(("jabber:client", "presence"))
+        p['from'], p['to'] = m['to'], m['from']
+        p['type'] = "subscribed"
+        self.xmlstream.send(p)
+
+    def presence_unsubscribe(self, m):
+        # try to re-subscribe
+        self.info("%s removed us, trying to re-authenticate." % m['from'])
+        p = domish.Element(("jabber:client", "presence"))
+        p['from'], p['to'] = m['to'], m['from']
+        p['type'] = "subscribe"
+        self.xmlstream.send(p)
+
+    def presence(self, m):
+        p = domish.Element(("jabber:client", "presence"))
+        p['from'], p['to'] = m['to'], m['from']
+        presence.addElement("show", content="dnd")
+        presence.addElement("status", content="man at work")
+        self.xmlstream.send(p)
+
+    def message_chat(self, m):
+        n = domish.Element((None, "message"))
+        n['to'], n['from'] = m['from'], m['to']
+        n.addElement("body", content = "don't have time to chat. working!")
+        self.xmlstream.send(n)
+
+    
+class PubSubClient(BaseClient):
+    """ PubSub (XEP 0060) implementation """
+
+    def __init__(self, id, secret, verbose = False, log = None):
+        BaseClient.__init__(self, id, secret, verbose = verbose, log = log)
+        self.hooks = {}
+    
+    def add_result_hook(self, hook_to, hook):
+        self.hooks[hook_to] = hook
+
+    def delete_result_hook(self, hook_to):
+        if self.hooks.has_key(hook_to):
+            del self.hooks[hook_to]
+
+    def event_authenticated(self, xs):
+        BaseClient.event_authenticated(self, xs)
+        self.requests = {}
+        xs.addObserver("/iq/pubsub/create", self.result_create_node)
+        xs.addObserver("/iq/pubsub/delete", self.result_delete_node)
+        xs.addObserver("/iq/query[@xmlns='http://jabber.org/protocol/disco#items']", self.result_discover)
+
+    def __iq(self, t="get"):
+        iq = domish.Element((None, "iq"))
+        iq['from'] = self.id.full()
+        iq['to'] = "pubsub.vplc27.inria.fr"
+        iq['type'] = t
+        iq.addUniqueId()
+        return iq
+
+    def discover(self, node = None):
+        iq = self.__iq("get")
+        query = iq.addElement("query")
+        query['xmlns'] = "http://jabber.org/protocol/disco#items"
+        if node:
+            query['node'] = node
+        self.requests[iq['id']] = node
+        self.xmlstream.send(iq)
+
+    def result_discover(self, iq):
+        if self.verbose: self.info("Items for node: %s" % self.requests[iq['id']])
+        
+        hook = self.hooks.get('discover', None)
+        for i in iq.query.elements():
+            if self.verbose: self.msg(i.toXml())
+            if hook: hook(i)
+        self.requests.pop(iq['id'])
+
+    def create_node(self, node = None):
+        iq = self.__iq("set")
+        pubsub = iq.addElement("pubsub")
+        pubsub['xmlns'] = "http://jabber.org/protocol/pubsub"
+        create = pubsub.addElement("create")
+        if node:
+            create['node'] = node
+        configure = pubsub.addElement("configure")
+        self.requests[iq['id']] = node
+        self.xmlstream.send(iq)
+
+    def result_create_node(self, iq):
+#         if hasattr(iq, "error"):
+#             node = self.requests[iq['id']]
+#             if hasattr(iq.error, "conflict"):
+#                 # node is already there, nothing important.
+#                 self.warn("NodeID exists: %s" % node)
+#             else:
+#                 err_type = ""
+#                 err_name = ""
+#                 if iq.error:
+#                     if iq.error.has_key('type'):
+#                         err_type = iq.error['type']
+#                     if iq.error.firstChildElement and hasattr(iq.error.firstChildElement, "name"):
+#                         err_name = iq.error.firstChildElement.name
+#                 self.error("Can not create node: %s (error type: %s, %s)" %  (node, err_type, err_name))
+        self.requests.pop(iq['id'])
+
+
+    def delete_node(self, node):
+        iq = self.__iq("set")
+        pubsub = iq.addElement("pubsub")
+        pubsub['xmlns'] = "http://jabber.org/protocol/pubsub#owner"
+        delete = pubsub.addElement("delete")
+        delete['node'] = node
+        self.requests[iq['id']] = node
+        self.xmlstream.send(iq)
+
+    def result_delete_node(self, iq):
+        self.requests.pop(iq['id'])
+
+
+
+
+class Slicemgr(PubSubClient, xmlrpc.XMLRPC):
+    
+    DOMAIN = "/OMF"
+    RESOURCES = 'resources'
+
+    def __init__(self, id, secret, verbose = False, log = None):
+        PubSubClient.__init__(self, id, secret, verbose = verbose, log = log)
+        self.command_queue = Queue.Queue()
+
+        # for xmlrpc interface
+        self.allowNone = True
+
+    def xmlrpc_createSlice(self, slice):
+        self.create_slice(slice)
+
+    def xmlrpc_addResource(self, slice, resource):
+        self.add_resource(slice, resource)
+
+    def xmlrpc_deleteSlice(self, slice):
+        self.delete_slice(slice)
+
+    def xmlrpc_removeResource(self, slice, resource):
+        self.delete_resource(slice, resource)
+
+
+    def flush_commands(self):
+#        self.info("Running %d commands" % self.command_queue.qsize())
+        while not self.command_queue.empty():
+            (meth, param) = self.command_queue.get()
+            meth(param)
+
+    def create_slice(self, slice):
+        self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice]) ))
+        self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice,self.RESOURCES]) ))
+
+    def add_resource(self, slice, resource):
+        self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice,self.RESOURCES,resource]) ))
+
+    def delete_slice(self, slice):
+        slice_prefix = "/".join([self.DOMAIN,slice])
+        resource_prefix = "/".join([self.DOMAIN,slice,self.RESOURCES])
+        def delete_slice_resources(iq):
+            node = iq['node']
+            if node.startswith(resource_prefix):
+                self.command_que.put(self.delete_node, node)
+
+        self.add_result_hook("discover", delete_slice_resources)
+        self.discover()
+        self.delete_result_hook("discover")
+
+        self.command_queue.put(( self.delete_node, resource_prefix) )
+        self.command_queue.put(( self.delete_node, slice_prefix) )
+
+    def delete_resource(self, slice, resource):
+        self.command_queue.put(( self.delete_node, "/".join([self.DOMAIN,slice,self.RESOURCES,resource]) ))
+        
+
+
+if __name__ == "__main__":
+
+    config = Config("/etc/planetlab/plc_config")
+
+    xmppserver = self.config.PLC_OMF_XMPP_SERVER
+    xmppuser = "@".join([self.config.PLC_OMF_XMPP_USER, xmppserver])
+    xmpppass = self.config.PLC_OMF_XMPP_PASSWORD
+    slicemgr = Slicemgr(xmppuser, xmpppass,
+                        log=open("/var/log/omf/pubsub_client.log", "a"),
+                        verbose=True)
+
+    t = task.LoopingCall(slicemgr.flush_commands)
+    t.start(5.0) # check every 5 seconds
+    reactor.connectTCP(slicemgr.id.host, 5222, slicemgr.factory)
+    reactor.listenTCP(5053, server.Site(slicemgr))
+    reactor.run(installSignalHandlers=True)
+
+
+
diff --git a/plc.d/omf-slicemgr b/plc.d/omf-slicemgr
new file mode 100755 (executable)
index 0000000..985cd6d
--- /dev/null
@@ -0,0 +1,32 @@
+#!/bin/bash
+# $Id$
+# $URL$
+#
+# priority: 800
+#
+
+
+# Source function library and configuration
+. /etc/plc.d/functions
+. /etc/planetlab/plc_config
+local_config=/etc/planetlab/configs/site.xml
+
+# Be verbose
+set -x
+
+case "$1" in
+    start)
+       if [ "$PLC_OMF_ENABLED" != "1" ] ; then
+           exit 0
+       fi
+
+       MESSAGE=$"Starting the OMF Slice Manager"
+       dialog "$MESSAGE"
+
+        daemon /usr/bin/omf-slicemgr.py
+
+       result "$MESSAGE"
+       ;;
+esac
+
+exit $ERRORS