+++ /dev/null
-# Baris Metin <tmetin@sophia.inria.fr>
-
-import os
-import xmlrpclib
-
-from PLC.Slices import Slices
-from PLC.SliceTags import SliceTags, SliceTag
-from PLC.TagTypes import TagTypes
-from PLC.NodeTags import NodeTags
-from PLC.Config import Config
-from pyaspects.meta import MetaAspect
-
-
-def ignore_exception(ExceptionType=None):
- '''A decorator to ignore the given exception type. Use as
- @ignore_exception() to ignore all exceptions.'''
- def deco_ignore(f):
- def f_ignore(*args, **kwargs):
- if not ExceptionType:
- try:
- return f(*args, **kwargs)
- except:
- return None
- else:
- try:
- return f(*args, **kwargs)
- except ExceptionType, e:
- return None
- return f_ignore
- return deco_ignore
-
-
-class BaseOMF(object):
-
- def __init__(self):
- self.config = Config("/etc/planetlab/plc_config")
- self.slice = None
-
- # this was only for debugging, no need to log all method calls here -baris
- # self.log = open("/var/log/omf/plc_slice_calls.log", "a")
- self.log = None
-
-
- @ignore_exception()
- def logit(self, call, args, kwargs, data, slice):
- if not self.log: return
-
- self.log.write("%s : args: %s kwargs: %s\n" % (call, args, kwargs))
- self.log.write("data: %s\n" % data)
- self.log.write("%s\n\n" % slice)
- self.log.flush()
-
- @ignore_exception()
- def get_slice(self, api, id_or_name):
- slice_filter = {}
- try: # if integer
- slice_filter['slice_id'] = int(str(id_or_name))
- except ValueError:
- # we have a string
- slice_filter['name'] = id_or_name
- try:
- slice = Slices(api, slice_filter = slice_filter)[0]
- return slice
- except IndexError:
- return None
-# don't bother to check for slice tags for the moment. we'll only
-# create XMPP pubsub groups for all slices
-#
-# slice_tags = SliceTags(api, slice_tag_filter = { 'slice_id': slice['slice_id'] })
-# omf_tag = [tag for tag in slice_tags if tag['name'] == 'omf']
-# if omf_tag and omf_tag['value'] not in ('false','0','no'):
-# # OK, slice has the "omf" tag set.
-# return slice
-# return None
-
- @ignore_exception()
- def get_node_hrn(self, api, node_id_or_hostname):
- tag_filter = {'tagname': 'hrn'}
- try:
- tag_filter['node_id'] = int(str(node_id_or_hostname))
- except ValueError:
- # we have a hostname
- tag_filter['hostname'] = node_id_or_hostname
-
- try:
- tag = NodeTags(api, node_tag_filter = tag_filter)[0]
- return tag['value']
- except IndexError:
- return None
-
- @ignore_exception()
- def get_slice_tags(self, api, slice_id):
- return SliceTags(api, slice_tag_filter = {'slice_id': slice_id})
-
- @ignore_exception()
- def get_tag_type(self, api, tagname):
- try:
- tag = TagTypes(api, {'tagname':tagname})[0]
- return tag
- except IndexError:
- return None
-
- @ignore_exception()
- def create_slice(self, slice):
- pass
-
- @ignore_exception()
- def add_resource(self, slice, resource):
- pass
-
- @ignore_exception()
- def delete_slice(self, slice):
- pass
-
- @ignore_exception()
- def delete_resource(self, slice, resource):
- pass
-
- # aspect method
- def before(self, wobj, data, *args, **kwargs):
- api_method_name = wobj.name
- slice_name_or_id = None
-
- if api_method_name == "AddSlice":
- slice_name_or_id = args[1]['name']
- elif api_method_name == "AddSliceToNodes" or api_method_name == "DeleteSliceFromNodes":
- slice_name_or_id = args[1]
- elif api_method_name == "AddSliceTag":
- slice_name_or_id = args[1]
- elif api_method_name == "DeleteSlice":
- slice_name_or_id = args[1]
- else: # ignore the rest
- #self.logit(wobj.name, args, kwargs, data, "SLICE")
- self.slice = None
- return
-
- self.slice = self.get_slice(wobj.api, slice_name_or_id)
-
- self.logit(wobj.name, args, kwargs, data, slice)
-
- # aspect method
- def after(self, wobj, data, *args, **kwargs):
- api_method_name = wobj.name
-
- if not self.slice:
- if api_method_name == "AddSlice":
- slice_name = args[1]['name']
- self.slice = self.get_slice(wobj.api, slice_name)
- else:
- return
-
- ret_val = None
- if data.has_key("method_return_value"):
- ret_val = data['method_return_value']
-
- if api_method_name == "AddSlice" and ret_val > 0:
- self.create_slice(self.slice['name'])
-
- elif api_method_name == "AddSliceToNodes" and ret_val == 1:
- node_ids = args[2]
- for node_id in node_ids:
- node_hrn = self.get_node_hrn(wobj.api, node_id)
- self.add_resource(self.slice['name'], node_hrn)
-
- elif api_method_name == "DeleteSlice" and ret_val == 1:
- self.delete_slice(self.slice['name'])
-
- elif api_method_name == "DeleteSliceFromNodes" and ret_val == 1:
- node_ids = args[2]
- for node_id in node_ids:
- node_hrn = self.get_node_hrn(wobj.api, node_id)
- self.delete_resource(self.slice['name'], node_hrn)
-
- elif api_method_name == "AddSliceTag":
- # OMF slices need to have dotsshmount vsys tag set to be
- # able to access users' public keys.
- tag_type_id_or_name = args[2]
- omf_tag = self.get_tag_type(wobj.api, "omf_control")
- vsys_tag = self.get_tag_type(wobj.api, "vsys")
- if omf_tag and vsys_tag \
- and tag_type_id_or_name in (omf_tag['tagname'], omf_tag['tag_type_id']):
- slice_tag = SliceTag(wobj.api)
- slice_tag['slice_id'] = self.slice['slice_id']
- slice_tag['tag_type_id'] = vsys_tag['tag_type_id']
- slice_tag['value'] = u'dotsshmount'
- slice_tag.sync()
-
- self.logit(wobj.name, args, kwargs, data, self.slice)
-
-
-
-class OMFAspect_xmlrpc(BaseOMF):
- __metaclass__ = MetaAspect
- name = "omfaspect_xmlrpc"
-
- def __init__(self):
- BaseOMF.__init__(self)
-
- slicemgr_url = self.config.PLC_OMF_SLICEMGR_URL
- self.server = xmlrpclib.ServerProxy(slicemgr_url, allow_none = 1)
-
- @ignore_exception()
- def create_slice(self, slice):
- self.server.createSlice(slice)
-
- @ignore_exception()
- def add_resource(self, slice, resource):
- self.server.addResource(slice, resource)
-
- @ignore_exception()
- def delete_slice(self, slice):
- self.server.deleteSlice(slice)
-
- @ignore_exception()
- def delete_resource(self, slice, resource):
- self.server.removeResource(slice, resource)
-
- def before(self, wobj, data, *args, **kwargs):
- BaseOMF.before(self, wobj, data, *args, **kwargs)
-
- def after(self, wobj, data, *args, **kwargs):
- BaseOMF.after(self, wobj, data, *args, **kwargs)
-
-
-
-OMFAspect = OMFAspect_xmlrpc
-
+++ /dev/null
-#!/usr/bin/python
-# Baris Metin <tmetin@sophia.inria.fr>
-
-import os
-import sys
-import time
-import Queue
-from twisted.words.xish import domish
-from twisted.web import xmlrpc, server
-from twisted.internet import reactor, task
-from twisted.words.protocols.jabber import xmlstream, client, jid
-
-sys.path.append("/usr/share/plc_api/")
-from PLC.Config import Config
-
-
-class BaseClient(object):
- """ Base XMPP client: handles authentication and basic presence/message requests. """
- def __init__(self, id, secret, verbose = False, log = None):
-
- if isinstance(id, (str, unicode)):
- id = jid.JID(id)
- x = client.XMPPClientFactory(id, secret)
- x.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self.event_connected)
- x.addBootstrap(xmlstream.STREAM_END_EVENT, self.event_disconnected)
- x.addBootstrap(xmlstream.INIT_FAILED_EVENT, self.event_init_failed)
- x.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self.event_authenticated)
- self.id = id
- self.factory = x
- self.verbose = verbose
- self.log = log or sys.stdout
-
- def __rawDataIN(self, buf):
- if self.verbose: self.msg("RECV: %s" % buf)
-
- def __rawDataOUT(self, buf):
- if self.verbose: self.msg("SEND: %s" % buf)
-
- def msg(self, msg):
- self.log.write("%s\n" % msg)
- self.log.flush()
-
- def error(self, msg):
- self.msg("ERROR: %s" % msg)
-
- def warn(self, msg):
- self.msg("WARN: %s" % msg)
-
- def info(self, msg):
- self.msg("INFO: %s" % msg)
-
- def event_connected(self, xs):
- # log all traffic
- xs.rawDataInFn = self.__rawDataIN
- xs.rawDataOutFn = self.__rawDataOUT
- self.xmlstream = xs
-
- def event_disconnected(self, xs):
- pass
-
- def event_init_failed(self, xs):
- self.error("Init Failed")
-
- def event_authenticated(self, xs):
- presence = domish.Element(("jabber:client", "presence"))
- presence.addElement("show", content="dnd")
- presence.addElement("status", content="man at work")
- xs.send(presence)
-
- # add protocol handlers
- xs.addObserver("/presence[@type='subscribe']", self.presence_subscribe)
- xs.addObserver("/presence[@type='unsubscribe']", self.presence_unsubscribe)
- xs.addObserver("/presence", self.presence)
- xs.addObserver("/message[@type='chat']", self.message_chat)
-
- def presence_subscribe(self, m):
- self.info("%s request to add us, granting." % m['from'])
- p = domish.Element(("jabber:client", "presence"))
- p['from'], p['to'] = m['to'], m['from']
- p['type'] = "subscribed"
- self.xmlstream.send(p)
-
- def presence_unsubscribe(self, m):
- # try to re-subscribe
- self.info("%s removed us, trying to re-authenticate." % m['from'])
- p = domish.Element(("jabber:client", "presence"))
- p['from'], p['to'] = m['to'], m['from']
- p['type'] = "subscribe"
- self.xmlstream.send(p)
-
- def presence(self, m):
- p = domish.Element(("jabber:client", "presence"))
- p['from'], p['to'] = m['to'], m['from']
- # initially read presence.addElement, my wild guess.. -- Thierry
- p.addElement("show", content="dnd")
- p.addElement("status", content="man at work")
- self.xmlstream.send(p)
-
- def message_chat(self, m):
- n = domish.Element((None, "message"))
- n['to'] = m['from']
- n['from'] = self.id.full()
- n.addElement("body", content = "don't have time to chat. working!")
- self.xmlstream.send(n)
-
-
-class PubSubClient(BaseClient):
- """ PubSub (XEP 0060) implementation """
-
- def __init__(self, id, secret, verbose = False, log = None):
- BaseClient.__init__(self, id, secret, verbose = verbose, log = log)
- self.hooks = {}
-
- def add_result_hook(self, hook_to, hook):
- self.hooks[hook_to] = hook
-
- def delete_result_hook(self, hook_to):
- if self.hooks.has_key(hook_to):
- del self.hooks[hook_to]
-
- def event_authenticated(self, xs):
- BaseClient.event_authenticated(self, xs)
- self.requests = {}
- xs.addObserver("/iq/pubsub/create", self.result_create_node)
- xs.addObserver("/iq/pubsub/delete", self.result_delete_node)
- xs.addObserver("/iq/query[@xmlns='http://jabber.org/protocol/disco#items']", self.result_discover)
- xs.addObserver("/iq/pubsub/subscription[@subscription='subscribed']", self.result_subscribe_to_node)
- xs.addObserver("/iq/pubsub/configure/x", self.result_configure_node)
- xs.addObserver("/iq/pubsub/configure/error", self.result_configure_node)
-
- def __iq(self, t="get"):
- iq = domish.Element((None, "iq"))
- iq['from'] = self.id.full()
- iq['to'] = "pubsub.%s" % self.id.host
- iq['type'] = t
- iq.addUniqueId()
- return iq
-
- def __add_pubsub(self, iq):
- pubsub = iq.addElement("pubsub")
- pubsub['xmlns'] = "http://jabber.org/protocol/pubsub"
- return pubsub
-
-
- def discover(self, node = None):
- iq = self.__iq("get")
- query = iq.addElement("query")
- query['xmlns'] = "http://jabber.org/protocol/disco#items"
- if node:
- query['node'] = node
- self.requests[iq['id']] = node
- self.xmlstream.send(iq)
-
- def result_discover(self, iq):
- hook = self.hooks.get('discover', None)
- if hook:
- hook(iq)
- self.delete_result_hook('discover')
-
- self.requests.pop(iq['id'])
-
-
- def subscribe_to_node(self, node):
- iq = self.__iq("set")
- pubsub = self.__add_pubsub(iq)
- subscribe = pubsub.addElement("subscribe")
- subscribe['node'] = node
- subscribe['jid'] = self.id.full()
- self.requests[iq['id']] = node
- self.xmlstream.send(iq)
-
- def result_subscribe_to_node(self, iq):
- self.requests.pop(iq['id'])
-
-
- def publish_to_node(self, node, payload):
- iq = self.__iq("set")
- pubsub = self.__add_pubsub(iq)
- publish = pubsub.addElement("publish")
- publish['node'] = node
- items = publish.addElement("item", content=payload)
- self.requests[iq['id']] = node
- self.xmlstream.send(iq)
-
- def result_publish_to_node(self, iq):
- self.requests.pop(iq['id'])
-
-
- # TODO: ejabberd doesn't have the node configuration feature implmented yet!
- def configure_node(self, node, fields=None):
- iq = self.__iq("set")
- pubsub = self.__add_pubsub(iq)
- configure = pubsub.addElement("configure")
- configure['node'] = node
-
- # TODO: add fields
-
- self.requests[iq['id']] = node
- self.xmlstream.send(iq)
-
- def result_configure_node(self, iq):
- hook = self.hooks.get('configure', None)
- if hook:
- hook(iq)
- self.delete_result_hook('configure')
-
- self.requests.pop(iq['id'])
-
-
- def create_node(self, node = None):
- iq = self.__iq("set")
- pubsub = self.__add_pubsub(iq)
- create = pubsub.addElement("create")
- if node:
- create['node'] = node
- configure = pubsub.addElement("configure")
- self.requests[iq['id']] = node
- self.xmlstream.send(iq)
-
- def result_create_node(self, iq):
- node = self.requests[iq['id']]
- if iq.error:
- if iq.error.conflict:
- # node is already there, nothing important.
- self.warn("NodeID exists: %s" % node)
- else:
- err_type = ""
- if iq.error['type']:
- err_type = iq.error['type']
- self.error("Can not create node: %s (error type: %s)" % (node, err_type))
- else:
- # no errors
- # try subscribing to the node for debugging purposes
- self.subscribe_to_node(node)
-
- self.requests.pop(iq['id'])
-
-
- def delete_node(self, node):
- iq = self.__iq("set")
- pubsub = self.__add_pubsub(iq)
- delete = pubsub.addElement("delete")
- delete['node'] = node
- self.requests[iq['id']] = node
- self.xmlstream.send(iq)
-
- def result_delete_node(self, iq):
- self.requests.pop(iq['id'])
-
- def message_chat(self, m):
- body = ""
- for e in m.elements():
- if e.name == "body":
- body = "%s" % e
- break
-
-# try:
-# node = m.event.items['node']
-# n = domish.Element((None, "message"))
-# n.addElement("body", content = "published to: %s\n%s" % (node, m.event.items.toXml()))
-# # for each listener in promiscuous mode send the published event
-# self.xmlstream.send(n)
-# return
-# except:
-# # not a pubsub message continue on
-# pass
-
- if body == "list groups":
- def list_groups(iq):
- reply = ""
- for i in iq.query.elements():
- reply += "%s\n" % i['node']
- n = domish.Element((None, "message"))
- n['to'] = m['from']
- n['from'] = self.id.full()
- n.addElement("body", content = reply)
- self.xmlstream.send(n)
-
- self.add_result_hook("discover", list_groups)
- self.discover()
-
- elif body.startswith("configuration"):
- # "configuration NODE"
- node = ""
- try:
- node = body.split()[1].strip()
- except IndexError:
- pass
-
- def get_configuration(iq):
- reply = iq.toXml()
- n = domish.Element((None, "message"))
- n['to'] = m['from']
- n['from'] = self.id.full()
- n.addElement("body", content = reply)
- self.xmlstream.send(n)
-
- self.add_result_hook("configure", get_configuration)
- self.configure_node(node)
-
- else:
- BaseClient.message_chat(self, m)
-
-
-class Slicemgr(xmlrpc.XMLRPC, PubSubClient):
-
- DOMAIN = "/OMF"
- RESOURCES = 'resources'
-
- def __init__(self, id, secret, verbose = False, log = None):
- xmlrpc.XMLRPC.__init__(self, allowNone=True)
- PubSubClient.__init__(self, id, secret, verbose = verbose, log = log)
- self.command_queue = Queue.Queue()
-
- xmlrpc.addIntrospection(self)
-
- def xmlrpc_createSlice(self, slice):
- self.create_slice(slice)
-
- def xmlrpc_addResource(self, slice, resource):
- self.add_resource(slice, resource)
-
- def xmlrpc_deleteSlice(self, slice):
- self.delete_slice(slice)
-
- def xmlrpc_removeResource(self, slice, resource):
- self.delete_resource(slice, resource)
-
-
- def flush_commands(self):
-# self.info("Running %d commands" % self.command_queue.qsize())
- while not self.command_queue.empty():
- (meth, param) = self.command_queue.get()
- meth(param)
-
- def create_slice(self, slice):
- self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice]) ))
- self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice,self.RESOURCES]) ))
-
- def add_resource(self, slice, resource):
- resname = "/".join([self.DOMAIN,slice,self.RESOURCES,resource])
- self.command_queue.put(( self.create_node, resname ))
-
- def delete_slice(self, slice):
- slice_prefix = "/".join([self.DOMAIN,slice])
- resource_prefix = "/".join([self.DOMAIN,slice,self.RESOURCES])
- def delete_slice_resources(iq):
- for i in iq.query.elements():
- node = i['node']
- if node.startswith(resource_prefix):
- self.command_queue.put((self.delete_node, node))
-
- self.add_result_hook("discover", delete_slice_resources)
- self.discover()
-
- self.command_queue.put(( self.delete_node, resource_prefix) )
- self.command_queue.put(( self.delete_node, slice_prefix) )
-
- def delete_resource(self, slice, resource):
- self.command_queue.put(( self.delete_node, "/".join([self.DOMAIN,slice,self.RESOURCES,resource]) ))
-
-
-
-if __name__ == "__main__":
-
- config = Config("/etc/planetlab/plc_config")
-
- xmppserver = config.PLC_OMF_XMPP_SERVER
- xmppuser = "@".join([config.PLC_OMF_XMPP_USER, xmppserver])
- xmpppass = config.PLC_OMF_XMPP_PASSWORD
- monthstring=time.strftime("%Y-%m")
- slicemgr = Slicemgr(xmppuser, xmpppass,
- log=open("/var/log/omf/pubsub-client-%s.log"%monthstring, "a"),
- # used to be verbose=True but that amounts to huge totally helpless logs, so..
- verbose=False)
-
- t = task.LoopingCall(slicemgr.flush_commands)
- t.start(5.0) # check every 5 seconds
-
- reactor.callLater(1, slicemgr.create_node, "/OMF")
- reactor.callLater(1, slicemgr.create_node, "/OMF/SYSTEM")
-
- reactor.connectTCP(slicemgr.id.host, 5222, slicemgr.factory)
- reactor.listenTCP(5053, server.Site(slicemgr), interface="localhost")
- reactor.run(installSignalHandlers=True)
-
-
-
+++ /dev/null
-#!/usr/bin/env /usr/bin/plcsh
-
-import sys
-import xmlrpclib
-from optparse import OptionParser
-
-sys.path.append("/usr/bin/")
-from omf_slicemgr import *
-from PLC.Config import Config
-
-config = Config("/etc/planetlab/plc_config")
-pubsub=None
-verbose=False
-
-def init_global_pubsub(verbose):
- xmppserver = config.PLC_OMF_XMPP_SERVER
- xmppuser = "@".join([config.PLC_OMF_XMPP_USER, xmppserver])
- xmpppass = config.PLC_OMF_XMPP_PASSWORD
- global pubsub
- pubsub = PubSubClient(xmppuser, xmpppass, verbose=verbose)
-
-def init_xmlrpc ():
- return xmlrpclib.ServerProxy(config.PLC_OMF_SLICEMGR_URL)
-
-
-def delete_all_nodes(iq):
- global pubsub
- print "Deleting PubSub groups..."
- for i in iq.query.elements():
- node = i['node']
- if verbose: print 'deleting node',node
- reactor.callLater(1, pubsub.delete_node, node)
-
-def is_local_node(node_id, slice_name):
- try:
- return GetNodes({'node_id': node_id}, ['peer_id'])[0]['peer_id'] == None
- except IndexError:
- print "WARNING: can not find the node with node_id %s" % node_id
- print "WARNING: node_id %s was referenced in slice %s" % (node_id, slice_name)
- return False
-
-def main ():
- usage="Usage: %prog -- [options]"
- parser=OptionParser (usage=usage)
- parser.add_option ("-v","--verbose",action='store_true',dest='verbose',default=False,
- help="be verbose")
- parser.add_option ("-s","--slice_pattern", action='store', dest='slice_pattern', default=None,
- help="specify just one slice (or a slice name pattern), for debug mostly")
- (options,args) = parser.parse_args()
- global verbose
- verbose=options.verbose
- if args:
- parser.print_help()
- sys.exit(1)
-
- init_global_pubsub (options.verbose)
- xmlrpc = init_xmlrpc ()
-
- pubsub.add_result_hook("discover", delete_all_nodes)
- reactor.callLater(1, pubsub.discover)
- reactor.callLater(2, pubsub.create_node, "/OMF")
- reactor.callLater(2, pubsub.create_node, "/SYSTEM")
-
- reactor.callLater(4, reactor.stop)
- reactor.connectTCP(pubsub.id.host, 5222, pubsub.factory)
- reactor.run()
-
- print "Re-creating PubSub groups..."
- if options.slice_pattern:
- slices=GetSlices({'name':options.slice_pattern})
- if not slices:
- print 'Could not find any slice with',options.slice_pattern
- sys.exit(1)
- else:
- slices = GetSlices()
- # optimizing the API calls
- nodes = GetNodes ({},['node_id','hrn','peer_id'])
- local_node_hash = dict ( [ (n['node_id'],n['hrn']) for n in nodes if n['peer_id'] is None ] )
- foreign_node_hash = dict ( [ (n['node_id'],n['hrn']) for n in nodes if n['peer_id'] is not None ] )
- total=len(slices)
- slice_counter=1
- node_counter=0
- for slice in slices:
- print 40*'x' + " slice %s (%d/%d)"%(slice['name'],slice_counter,total)
- slice_counter +=1
- xmlrpc.createSlice(slice['name'])
- for node_id in slice['node_ids']:
- # silently ignore foreign nodes
- if node_id in foreign_node_hash: continue
- elif node_id in local_node_hash:
- hrn=local_node_hash[node_id]
- if hrn:
- print 'add resource',slice['name'],hrn
- xmlrpc.addResource(slice['name'],hrn)
- node_counter +=1
- else:
- print "WARNING: missing hrn tag for node_id: %s" % node_id
- else:
- print "Cannot find node with node_id %d (in slice %s)"%(node_id,slice['name'])
- print "Re-created a total of %d pubsub nodes"%node_counter
-
-if __name__ == "__main__":
- main()