--- /dev/null
+# Baris Metin <tmetin@sophia.inria.fr>
+
+import os
+import xmlrpclib
+
+from PLC.Slices import Slices
+from PLC.SliceTags import SliceTags
+from PLC.Nodes import Nodes
+from PLC.Config import Config
+from pyaspects.meta import MetaAspect
+
+
+class BaseOMF(object):
+
+ def __init__(self):
+ self.config = Config("/etc/planetlab/plc_config")
+ self.log = open("/var/log/omf/plc_slice_calls.log", "a")
+
+ def logit(self, call, args, kwargs, data, slice):
+ self.log.write("%s : args: %s kwargs: %s\n" % (call, args, kwargs))
+ self.log.write("data: %s\n" % data)
+ self.log.write("%s\n\n" % slice)
+ self.log.flush()
+
+
+ def get_slice(self, api, id_or_name):
+ slice_filter = {}
+ if isinstance(id_or_name, str):
+ slice_filter['name'] = id_or_name
+ else:
+ slice_filter['slice_id']= id_or_name
+ slice = Slices(api, slice_filter = slice_filter)[0]
+# don't bother to check for slice tags for the moment. we'll only
+# create XMPP pubsub groups for all slices
+#
+# slice_tags = SliceTags(api, slice_tag_filter = { 'slice_id': slice['slice_id'] })
+# omf_tag = [tag for tag in slice_tags if tag['name'] == 'omf']
+# if omf_tag and omf_tag['value'] not in ('false','0','no'):
+# # OK, slice has the "omf" tag set.
+# return slice
+# return None
+ return slice
+
+ def get_node_hostname(self, api, node_id):
+ node_filter = {'node_id': node_id }
+ try:
+ node = Nodes(api, node_filter = node_filter)[0]
+ return node['hostname']
+ except IndexError:
+ return None
+
+ def get_slice_tags(self, api, slice_id):
+ return SliceTags(api, slice_tag_filter = {'slice_id': slice_id})
+
+ def create_slice(self, slice):
+ pass
+
+ def add_resource(self, slice, resource):
+ pass
+
+ def delete_slice(self, slice):
+ pass
+
+ def delete_resource(self, slice, resource):
+ pass
+
+ # aspect method
+ def before(self, wobj, data, *args, **kwargs):
+ api_method_name = wobj.name
+ slice_name_or_id = None
+ node_ids = None
+
+ # DeleteSlice shall be handled before the actual method call;
+ # after the call we won't be able to acess the slice.
+ if api_method_name == "DeleteSlice":
+ slice_name_or_id = args[1]
+ else: # ignore the rest
+ return
+
+ slice = self.get_slice(wobj.api, slice_name_or_id)
+ if not slice:
+ return
+
+ if api_method_name == "DeleteSlice":
+ self.delete_slice(slice['name'])
+
+ self.logit(wobj.name, args, kwargs, data, slice)
+
+ # aspect method
+ def after(self, wobj, data, *args, **kwargs):
+ api_method_name = wobj.name
+ slice_name_or_id = None
+ node_ids = None
+ if api_method_name == "AddSlice":
+ slice_name_or_id = args[1]['name']
+ elif api_method_name == "AddSliceToNodes" or api_method_name == "DeleteSliceFromNodes":
+ slice_name_or_id = args[1]
+ node_ids = args[2]
+ else: # ignore the rest
+ #self.logit(wobj.name, args, kwargs, data, "SLICE")
+ return
+
+ slice = self.get_slice(wobj.api, slice_name_or_id)
+ if not slice:
+ return
+
+ if api_method_name == "AddSlice":
+ self.create_slice(slice['name'])
+ elif api_method_name == "AddSliceToNodes":
+ for node_id in node_ids:
+ node_hostname = self.get_node_hostname(wobj.api, node_id)
+ self.add_resource(slice['name'], node_hostname)
+ elif api_method_name == "DeleteSliceFromNodes":
+ for node_id in node_ids:
+ node_hostname = self.get_node_hostname(wobj.api, node_id)
+ self.delete_resource(slice['name'], node_hostname)
+
+ self.logit(wobj.name, args, kwargs, data, slice)
+
+
+class OMFAspect_xmpp(BaseOMF):
+ __metaclass__ = MetaAspect
+ name = "omfaspect_xmpp"
+
+ def __init__(self):
+ BaseOMF.__init__(self)
+ xmppserver = self.config.PLC_OMF_XMPP_SERVER
+ xmppuser = "@".join([self.config.PLC_OMF_XMPP_USER, xmppserver])
+ xmpppass = self.config.PLC_OMF_XMPP_PASSWORD
+ self.slicemgr = Slicemgr(xmppuser, xmpppass,
+ log=open("/var/log/omf/pubsub_client.log", "a"),
+ verbose=True)
+
+ def create_slice(self, slice):
+ self.slicemgr.create_slice(slice)
+
+ def add_resource(self, slice, resource):
+ self.slicemgr.add_resource(slice, resource)
+
+ def delete_slice(self, slice):
+ self.slicemgr.delete_slice(slice)
+
+ def delete_resource(self, slice, resource):
+ self.slicemgr.delete_resource(slice, resource)
+
+ def before(self, wobj, data, *args, **kwargs):
+ BaseOMF.before(self, wobj, data, *args, **kwargs)
+
+ def after(self, wobj, data, *args, **kwargs):
+ BaseOMF.after(self, wobj, data, *args, **kwargs)
+
+
+class OMFAspect_xmlrpc(BaseOMF):
+ __metaclass__ = MetaAspect
+ name = "omfaspect_xmlrpc"
+
+ def __init__(self):
+ BaseOMF.__init__(self)
+
+ slicemgr_url = self.config.PLC_OMF_SLICEMGR_URL
+ self.server = xmlrpclib.ServerProxy(slicemgr_url)
+
+ def create_slice(self, slice):
+ self.server.slicemgr.createSlice(slice)
+
+ def add_resource(self, slice, resource):
+ self.server.slicemgr.addResource(slice, resource)
+
+ def delete_slice(self, slice):
+ self.server.slicemgr.deleteSlice(slice)
+
+ def delete_resource(self, slice, resource):
+ self.server.slicemgr.removeResource(slice, resource)
+
+ def before(self, wobj, data, *args, **kwargs):
+ BaseOMF.before(self, wobj, data, *args, **kwargs)
+
+ def after(self, wobj, data, *args, **kwargs):
+ BaseOMF.after(self, wobj, data, *args, **kwargs)
+
+
+
+OMFAspect = OMFAspect_xmlrpc
+
--- /dev/null
+# Baris Metin <tmetin@sophia.inria.fr>
+
+import os
+import sys
+import Queue
+from twisted.words.xish import domish
+from twisted.web import xmlrpc, server
+from twisted.internet import reactor, task
+from twisted.words.protocols.jabber import xmlstream, client, jid
+
+sys.path.append("/usr/share/plc_api/")
+from PLC.Config import Config
+
+
+class BaseClient(object):
+ """ Base XMPP client: handles authentication and basic presence/message requests. """
+ def __init__(self, id, secret, verbose = False, log = None):
+
+ if isinstance(id, (str, unicode)):
+ id = jid.JID(id)
+ x = client.XMPPClientFactory(id, secret)
+ x.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self.event_connected)
+ x.addBootstrap(xmlstream.STREAM_END_EVENT, self.event_disconnected)
+ x.addBootstrap(xmlstream.INIT_FAILED_EVENT, self.event_init_failed)
+ x.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self.event_authenticated)
+ self.id = id
+ self.factory = x
+ self.verbose = verbose
+ self.log = log or sys.stdout
+
+ def __rawDataIN(self, buf):
+ if self.verbose: self.msg("RECV: %s" % buf)
+
+ def __rawDataOUT(self, buf):
+ if self.verbose: self.msg("SEND: %s" % buf)
+
+ def msg(self, msg):
+ self.log.write("%s\n" % msg)
+ self.log.flush()
+
+ def error(self, msg):
+ self.msg("ERROR: %s" % msg)
+
+ def warn(self, msg):
+ self.msg("WARN: %s" % msg)
+
+ def info(self, msg):
+ self.msg("INFO: %s" % msg)
+
+ def event_connected(self, xs):
+ # log all traffic
+ xs.rawDataInFn = self.__rawDataIN
+ xs.rawDataOutFn = self.__rawDataOUT
+ self.xmlstream = xs
+
+ def event_disconnected(self, xs):
+ pass
+
+ def event_init_failed(self, xs):
+ self.error("Init Failed")
+
+ def event_authenticated(self, xs):
+ presence = domish.Element(("jabber:client", "presence"))
+ presence.addElement("show", content="dnd")
+ presence.addElement("status", content="man at work")
+ xs.send(presence)
+
+ # add protocol handlers
+ xs.addObserver("/presence[@type='subscribe']", self.presence_subscribe)
+ xs.addObserver("/presence[@type='unsubscribe']", self.presence_unsubscribe)
+ xs.addObserver("/precence", self.presence)
+ xs.addObserver("/message[@type='chat']", self.message_chat)
+
+ def presence_subscribe(self, m):
+ self.info("%s request to add us, granting." % m['from'])
+ p = domish.Element(("jabber:client", "presence"))
+ p['from'], p['to'] = m['to'], m['from']
+ p['type'] = "subscribed"
+ self.xmlstream.send(p)
+
+ def presence_unsubscribe(self, m):
+ # try to re-subscribe
+ self.info("%s removed us, trying to re-authenticate." % m['from'])
+ p = domish.Element(("jabber:client", "presence"))
+ p['from'], p['to'] = m['to'], m['from']
+ p['type'] = "subscribe"
+ self.xmlstream.send(p)
+
+ def presence(self, m):
+ p = domish.Element(("jabber:client", "presence"))
+ p['from'], p['to'] = m['to'], m['from']
+ presence.addElement("show", content="dnd")
+ presence.addElement("status", content="man at work")
+ self.xmlstream.send(p)
+
+ def message_chat(self, m):
+ n = domish.Element((None, "message"))
+ n['to'], n['from'] = m['from'], m['to']
+ n.addElement("body", content = "don't have time to chat. working!")
+ self.xmlstream.send(n)
+
+
+class PubSubClient(BaseClient):
+ """ PubSub (XEP 0060) implementation """
+
+ def __init__(self, id, secret, verbose = False, log = None):
+ BaseClient.__init__(self, id, secret, verbose = verbose, log = log)
+ self.hooks = {}
+
+ def add_result_hook(self, hook_to, hook):
+ self.hooks[hook_to] = hook
+
+ def delete_result_hook(self, hook_to):
+ if self.hooks.has_key(hook_to):
+ del self.hooks[hook_to]
+
+ def event_authenticated(self, xs):
+ BaseClient.event_authenticated(self, xs)
+ self.requests = {}
+ xs.addObserver("/iq/pubsub/create", self.result_create_node)
+ xs.addObserver("/iq/pubsub/delete", self.result_delete_node)
+ xs.addObserver("/iq/query[@xmlns='http://jabber.org/protocol/disco#items']", self.result_discover)
+
+ def __iq(self, t="get"):
+ iq = domish.Element((None, "iq"))
+ iq['from'] = self.id.full()
+ iq['to'] = "pubsub.vplc27.inria.fr"
+ iq['type'] = t
+ iq.addUniqueId()
+ return iq
+
+ def discover(self, node = None):
+ iq = self.__iq("get")
+ query = iq.addElement("query")
+ query['xmlns'] = "http://jabber.org/protocol/disco#items"
+ if node:
+ query['node'] = node
+ self.requests[iq['id']] = node
+ self.xmlstream.send(iq)
+
+ def result_discover(self, iq):
+ if self.verbose: self.info("Items for node: %s" % self.requests[iq['id']])
+
+ hook = self.hooks.get('discover', None)
+ for i in iq.query.elements():
+ if self.verbose: self.msg(i.toXml())
+ if hook: hook(i)
+ self.requests.pop(iq['id'])
+
+ def create_node(self, node = None):
+ iq = self.__iq("set")
+ pubsub = iq.addElement("pubsub")
+ pubsub['xmlns'] = "http://jabber.org/protocol/pubsub"
+ create = pubsub.addElement("create")
+ if node:
+ create['node'] = node
+ configure = pubsub.addElement("configure")
+ self.requests[iq['id']] = node
+ self.xmlstream.send(iq)
+
+ def result_create_node(self, iq):
+# if hasattr(iq, "error"):
+# node = self.requests[iq['id']]
+# if hasattr(iq.error, "conflict"):
+# # node is already there, nothing important.
+# self.warn("NodeID exists: %s" % node)
+# else:
+# err_type = ""
+# err_name = ""
+# if iq.error:
+# if iq.error.has_key('type'):
+# err_type = iq.error['type']
+# if iq.error.firstChildElement and hasattr(iq.error.firstChildElement, "name"):
+# err_name = iq.error.firstChildElement.name
+# self.error("Can not create node: %s (error type: %s, %s)" % (node, err_type, err_name))
+ self.requests.pop(iq['id'])
+
+
+ def delete_node(self, node):
+ iq = self.__iq("set")
+ pubsub = iq.addElement("pubsub")
+ pubsub['xmlns'] = "http://jabber.org/protocol/pubsub#owner"
+ delete = pubsub.addElement("delete")
+ delete['node'] = node
+ self.requests[iq['id']] = node
+ self.xmlstream.send(iq)
+
+ def result_delete_node(self, iq):
+ self.requests.pop(iq['id'])
+
+
+
+
+class Slicemgr(PubSubClient, xmlrpc.XMLRPC):
+
+ DOMAIN = "/OMF"
+ RESOURCES = 'resources'
+
+ def __init__(self, id, secret, verbose = False, log = None):
+ PubSubClient.__init__(self, id, secret, verbose = verbose, log = log)
+ self.command_queue = Queue.Queue()
+
+ # for xmlrpc interface
+ self.allowNone = True
+
+ def xmlrpc_createSlice(self, slice):
+ self.create_slice(slice)
+
+ def xmlrpc_addResource(self, slice, resource):
+ self.add_resource(slice, resource)
+
+ def xmlrpc_deleteSlice(self, slice):
+ self.delete_slice(slice)
+
+ def xmlrpc_removeResource(self, slice, resource):
+ self.delete_resource(slice, resource)
+
+
+ def flush_commands(self):
+# self.info("Running %d commands" % self.command_queue.qsize())
+ while not self.command_queue.empty():
+ (meth, param) = self.command_queue.get()
+ meth(param)
+
+ def create_slice(self, slice):
+ self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice]) ))
+ self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice,self.RESOURCES]) ))
+
+ def add_resource(self, slice, resource):
+ self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice,self.RESOURCES,resource]) ))
+
+ def delete_slice(self, slice):
+ slice_prefix = "/".join([self.DOMAIN,slice])
+ resource_prefix = "/".join([self.DOMAIN,slice,self.RESOURCES])
+ def delete_slice_resources(iq):
+ node = iq['node']
+ if node.startswith(resource_prefix):
+ self.command_que.put(self.delete_node, node)
+
+ self.add_result_hook("discover", delete_slice_resources)
+ self.discover()
+ self.delete_result_hook("discover")
+
+ self.command_queue.put(( self.delete_node, resource_prefix) )
+ self.command_queue.put(( self.delete_node, slice_prefix) )
+
+ def delete_resource(self, slice, resource):
+ self.command_queue.put(( self.delete_node, "/".join([self.DOMAIN,slice,self.RESOURCES,resource]) ))
+
+
+
+if __name__ == "__main__":
+
+ config = Config("/etc/planetlab/plc_config")
+
+ xmppserver = self.config.PLC_OMF_XMPP_SERVER
+ xmppuser = "@".join([self.config.PLC_OMF_XMPP_USER, xmppserver])
+ xmpppass = self.config.PLC_OMF_XMPP_PASSWORD
+ slicemgr = Slicemgr(xmppuser, xmpppass,
+ log=open("/var/log/omf/pubsub_client.log", "a"),
+ verbose=True)
+
+ t = task.LoopingCall(slicemgr.flush_commands)
+ t.start(5.0) # check every 5 seconds
+ reactor.connectTCP(slicemgr.id.host, 5222, slicemgr.factory)
+ reactor.listenTCP(5053, server.Site(slicemgr))
+ reactor.run(installSignalHandlers=True)
+
+
+