From b3a391490764be17194820147b1cf47fff17b69a Mon Sep 17 00:00:00 2001 From: =?utf8?q?S=2E=C3=87a=C4=9Flar=20Onur?= Date: Thu, 6 Jan 2011 17:01:22 -0500 Subject: [PATCH 1/1] funcaspect branch for func integration --- PLCAPI.spec | 6 + aspects/__init__.py | 5 +- aspects/funcaspects.py | 589 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 599 insertions(+), 1 deletion(-) create mode 100644 aspects/funcaspects.py diff --git a/PLCAPI.spec b/PLCAPI.spec index 36d0f0d..c305b6e 100644 --- a/PLCAPI.spec +++ b/PLCAPI.spec @@ -121,6 +121,12 @@ mkdir -p $RPM_BUILD_ROOT/var/log/omf touch $RPM_BUILD_ROOT/var/log/plc_api_ratelimit.log chown apache:apache $RPM_BUILD_ROOT/var/log/plc_api_ratelimit.log +# Create func directories +mkdir -p $RPM_BUILD_ROOT/var/www/html/func +mkdir -p $RPM_BUILD_ROOT/var/log/funcaspect/ +chmod apache:apache $RPM_BUILD_ROOT/var/www/html/func +chmod apache:apache $RPM_BUILD_ROOT/var/log/funcaspect/ + %clean rm -rf $RPM_BUILD_ROOT diff --git a/aspects/__init__.py b/aspects/__init__.py index cdab6d6..a140869 100644 --- a/aspects/__init__.py +++ b/aspects/__init__.py @@ -3,15 +3,18 @@ from pyaspects.weaver import weave_class_method from PLC.Method import Method from aspects.omfaspects import OMFAspect from aspects.ratelimitaspects import RateLimitAspect +#disabled for now - caglar +#from aspects.funcaspects import FuncAspect def apply_omf_aspect(): # track all PLC methods to add OMF hooks weave_class_method(OMFAspect(), Method, "__call__") weave_class_method(RateLimitAspect(), Method, "__call__") +#disabled for now - caglar +# weave_class_method(FuncAspect(), Method, "__call__") def apply_debugger_aspect(): # just log all method calls w/ their parameters from pyaspects.debuggeraspect import DebuggerAspect weave_class_method(DebuggerAspect(out=open("/tmp/all_method_calls.log", "a")), Method, "__call__") - diff --git a/aspects/funcaspects.py b/aspects/funcaspects.py new file mode 100644 index 0000000..17309d8 --- /dev/null +++ b/aspects/funcaspects.py @@ -0,0 +1,589 @@ +#!/usr/bin/python +import os +import datetime + +from PLC.Config import Config +from PLC.Interfaces import Interface, Interfaces +from PLC.InterfaceTags import InterfaceTag, InterfaceTags +from PLC.Keys import Key, Keys + +# For PLC 4.3 +try: + from PLC.Methods.GetSliceFamily import GetSliceFamily +except ImportError: + pass + +from PLC.NodeGroups import NodeGroup, NodeGroups +from PLC.NodeTags import NodeTag, NodeTags +from PLC.Nodes import Node, Nodes +from PLC.Persons import Person, Persons +from PLC.SliceTags import SliceTag, SliceTags +from PLC.Slices import Slice, Slices +from PLC.TagTypes import TagTypes + +from pyaspects.meta import MetaAspect + +import func.overlord.client + +def absorb_exception(function_name): + def _wrapper(*args, **kwargs): + try: + return function_name(*args, **kwargs) + except: + return None + return _wrapper + + +class BaseFunc(object): + + def __init__(self): + self.log = open("/var/log/funcaspect/plc_slice_calls.log", "a") + #self.log = None + + self.interface_id = None + self.node_ids = None + self.person = None + self.person_name_or_id = None + self.slice = None + self.slice_ids = None + self.slice_name_or_id = None + self.tag = None + self.tag_name_or_id = None + self.tag_value = None + + def logit(self, call, args, kwargs, data, slice): + if not self.log: return + + self.log.write("%s : args: %s kwargs: %s\n" % (call, args, kwargs)) + self.log.write("data: %s\n" % data) + self.log.write("%s\n\n" % slice) + self.log.flush() + + @absorb_exception + def get_slice(self, api, slice_id_or_name): + slice_filter = {} + try: # if integer + slice_filter["slice_id"] = int(str(slice_id_or_name)) + except ValueError: + # we have a string + slice_filter["name"] = slice_id_or_name + slice = Slices(api, slice_filter = slice_filter)[0] + return slice + + @absorb_exception + def get_node_tag(self, api, node_id_or_name): + tag_filter = {} + try: # if integer + tag_filter["node_tag_id"] = int(str(node_id_or_name)) + except ValueError: + # we have a string + tag_filter["tagname"] = node_id_or_name + tag = NodeTags(api, node_tag_filter = tag_filter)[0] + return tag + + @absorb_exception + def get_person(self, api, person_id_or_name): + person_filter = {} + try: # if integer + person_filter["person_id"] = int(str(person_id_or_name)) + except ValueError: + # we have a string + person_filter["email"] = person_id_or_name + person = Persons(api, person_filter = person_filter)[0] + return person + + @absorb_exception + def get_interface(self, api, interface_id): + return Interfaces(api, interface_filter = {"interface_id": interface_id}) + + @absorb_exception + def get_interfaces(self, api, node_id): + return Interfaces(api, interface_filter = {"node_id": node_id}) + + @absorb_exception + def get_slice_family(self, api, slice_id_or_name): + # For PLC 4.3 + try: + return GetSliceFamily(api).call(api, slice_id_or_name) + except NameError: + return "planetlab-f8-i386" + + @absorb_exception + def get_slice_tag(self, api, tag_id_or_name): + tag_filter = {} + try: # if integer + tag_filter["slice_tag_id"] = int(str(tag_id_or_name)) + except ValueError: + # we have a string + tag_filter["tagname"] = tag_id_or_name + tag = SliceTags(api, slice_tag_filter = tag_filter)[0] + return tag + + @absorb_exception + def get_tag_type(self, api, type_id_or_name): + tag_filter = {} + try: # if integer + tag_filter["tag_type_id"] = int(str(type_id_or_name)) + except ValueError: + # we have a string + tag_filter["tagname"] = type_id_or_name + tag = TagTypes(api, tag_type_filter = tag_filter)[0] + return tag + + @absorb_exception + def get_node_hostname(self, api, node_id_or_hostname): + node_filter = {} + try: # if integer + node_filter["node_id"] = int(str(node_id_or_hostname)) + except ValueError: + # we have a string + node_filter["hostname"] = node_id_or_hostname + node = Nodes(api, node_filter = node_filter)[0] + return node["hostname"] + + @absorb_exception + def get_slice_tags(self, api, slice_id): + return SliceTags(api, slice_tag_filter = {"slice_id": slice_id}) + + @absorb_exception + def get_slice_tags_with_tag_type(self, api, tag_type_id): + return SliceTags(api, slice_tag_filter = {"tag_type_id": tag_type_id}) + + @absorb_exception + def get_person_keys(self, api, person_ids): + return Keys(api, key_filter = {"person_id": person_ids}) + + @absorb_exception + def get_node_tags(self, api, node_id): + return NodeTags(api, node_tag_filter = {"node_id": node_id}) + + @absorb_exception + def get_interface_tags(self, api, interface_id): + return InterfaceTags(api, interface_tag_filter = {"interface_id": interface_id}) + + @absorb_exception + def get_node_groups(self, api, nodegroup_id): + return NodeGroups(api, nodegroup_filter = {"nodegroup_id": nodegroup_id}) + + def add_slice_to_node(self, slice, node, family, tags, keys): + pass + + def delete_slice_from_node(self, slice, node): + pass + + def add_person_to_slice(self, slice, node, person, value): + pass + + def delete_person_from_slice(self, slice, node, person, value): + pass + + def add_slice_tag(self, slice, node, tag, value): + pass + + def add_node_tag(self, node, tag, value): + pass + + def add_interface(self, node, value): + pass + + def delete_interface(self, node, value): + pass + + def delete_node_tag(self, node, tag, value): + pass + + def delete_slice_tag(self, slice, node, tag, value): + pass + + def update_node_tag(self, node, tag, value): + pass + + def update_slice_tag(self, slice, node, tag, value): + pass + + # aspect method which assigns the required variables + def before(self, wobj, data, *args, **kwargs): + api_method_name = wobj.name + + if api_method_name == "AddSliceToNodes" or api_method_name == "DeleteSliceFromNodes": + self.slice_name_or_id = args[1] + self.node_ids = args[2] + elif api_method_name == "AddPersonToSlice" or api_method_name == "DeletePersonFromSlice": + self.person_name_or_id = args[1] + self.slice_name_or_id = args[2] + + elif api_method_name == "AddSliceTag": + self.slice_name_or_id = args[1] + self.tag_name_or_id = args[2] + self.tag_value = args[3] + elif api_method_name == "DeleteSliceTag": + self.tag_name_or_id = args[1] + # keep it in memory + self.tag = self.get_slice_tag(wobj.api, self.tag_name_or_id) + self.slice = self.get_slice(wobj.api, self.tag["slice_id"]) + + elif api_method_name == "AddNodeTag": + self.node_ids = [args[1]] + self.tag_name_or_id = args[2] + self.tag_value = args[3] + elif api_method_name == "DeleteNodeTag": + self.tag_name_or_id = args[1] + # keep it in memory + self.tag = self.get_node_tag(wobj.api, self.tag_name_or_id) + elif api_method_name == "UpdateSliceTag" or api_method_name == "UpdateNodeTag": + self.tag_name_or_id = args[1] + self.tag_value = args[2] + + elif api_method_name == "AddInterface" or api_method_name == "UpdateInterface": + self.node_ids = [args[1]] + elif api_method_name == "DeleteInterface": + self.interface_id = args[1] + + elif api_method_name == "DeleteSlice": + self.slice_name_or_id = args[1] + elif api_method_name == "DeletePerson": + self.person_name_or_id = args[1] + # keep it in memory + self.person = self.get_person(wobj.api, self.person_name_or_id) + elif api_method_name == "DeleteTagType": + self.tag_name_or_id = args[1] + # keep it in memory + tag_type = self.get_tag_type(wobj.api, self.tag_name_or_id)[0]["tag_type_id"] + self.slice_ids = [slice_ids["slice_id"] for slice_ids in self.get_slice_tags_with_tag_type(wobj.api, tag_type)] + self.slice_ids = list(set(self.slice_ids)) + # Node/NodeGroups ??? + #GetNodeTags + #GetInterfaceTags + else: # ignore the rest + return + + if self.slice_name_or_id != None: + self.slice = self.get_slice(wobj.api, self.slice_name_or_id) + +# self.logit(wobj.name, args, kwargs, data, slice) + + # aspect method + def after(self, wobj, data, *args, **kwargs): + #if data.has_key("method_return_value") and data['method_return_value'] > 0: + # # return value 1 means that API call was successful, we can go on. + # pass + #else: + # return + + api_method_name = wobj.name + + # assign globals to locals + interface_id = self.interface_id + node_ids = self.node_ids + person_name_or_id = self.person_name_or_id + slice = self.slice + tag = self.tag + person = self.person + tag_name_or_id = self.tag_name_or_id + tag_value = self.tag_value + + if api_method_name == "AddSliceToNodes": + slice_tags = self.get_slice_tags(wobj.api, slice["slice_id"]) + slice_keys = self.get_person_keys(wobj.api, slice["person_ids"]) + slice_family = self.get_slice_family(wobj.api, slice["slice_id"]) + for node_id in node_ids: + node_hostname = self.get_node_hostname(wobj.api, node_id) + self.add_slice_to_node(slice["name"], node_hostname, slice_family, slice_tags, slice_keys) + elif api_method_name == "DeleteSliceFromNodes": + for node_id in node_ids: + node_hostname = self.get_node_hostname(wobj.api, node_id) + self.delete_slice_from_node(slice["name"], node_hostname) + + elif api_method_name == "AddPersonToSlice": + person = self.get_person(wobj.api, person_name_or_id) + keys = self.get_person_keys(wobj.api, slice["person_ids"]) + keys += self.get_person_keys(wobj.api, person["person_id"]) + for node_id in slice["node_ids"]: + node_hostname = self.get_node_hostname(wobj.api, node_id) + self.add_person_to_slice(slice["name"], node_hostname, person["email"], keys) + elif api_method_name == "DeletePersonFromSlice": + person = self.get_person(wobj.api, person_name_or_id) + keys = self.get_person_keys(wobj.api, slice["person_ids"]) + keys.remove(self.get_person_keys(wobj.api, slice["person_ids"])[0]) + for node_id in slice["node_ids"]: + node_hostname = self.get_node_hostname(wobj.api, node_id) + self.delete_person_from_slice(slice["name"], node_hostname, person["email"], keys) + + elif api_method_name == "AddSliceTag": + tag = self.get_tag_type(wobj.api, tag_name_or_id) + + if len(args) == 4: + node_ids = slice["node_ids"] + elif len(args) == 5: + node_ids = [args[4]] + else: + node_groups = self.get_node_groups(wobj.api, args[5]) + node_ids = set.intersection(set(node_groups[0]["node_ids"]), set(slice["node_ids"])) + + for node_id in node_ids: + node_hostname = self.get_node_hostname(wobj.api, node_id) + self.add_slice_tag(slice["name"], node_hostname, tag["tagname"], tag_value) + elif api_method_name == "DeleteSliceTag": + slice = self.get_slice(wobj.api, tag["slice_id"]) + for node_id in slice["node_ids"]: + node_hostname = self.get_node_hostname(wobj.api, node_id) + self.delete_slice_tag(slice["name"], node_hostname, tag["tagname"], tag["value"]) + elif api_method_name == "UpdateSliceTag": + tag = self.get_slice_tag(wobj.api, tag_name_or_id) + slice = self.get_slice(wobj.api, tag["slice_id"]) + for node_id in slice["node_ids"]: + node_hostname = self.get_node_hostname(wobj.api, tag["node_id"]) + self.update_slice_tag(slice, node_hostname, tag["tagname"], tag_value) + + elif api_method_name == "AddNodeTag": + tag = self.get_tag_type(wobj.api, tag_name_or_id) + for node_id in node_ids: + node_hostname = self.get_node_hostname(wobj.api, node_id) + self.add_node_tag(node_hostname, tag["tagname"], tag_value) + elif api_method_name == "DeleteNodeTag": + node_hostname = self.get_node_hostname(wobj.api, tag["node_id"]) + self.delete_node_tag(node_hostname, tag["tagname"], tag["value"]) + elif api_method_name == "UpdateNodeTag": + tag = self.get_node_tag(wobj.api, tag_name_or_id) + for node_id in [tag["node_id"]]: + node_hostname = self.get_node_hostname(wobj.api, tag["node_id"]) + self.update_node_tag(node_hostname, tag["tagname"], tag_value) + + elif api_method_name == "AddInterface" or api_method_name == "UpdateInterface": + for node_id in node_ids: + node_hostname = self.get_node_hostname(wobj.api, node_id) + node_interfaces = self.get_interfaces(wobj.api, node_id) + for node_interface in node_interfaces: + #FIXME: Requires pyplnet changes + interface_tags = self.get_interface_tags(wobj.api, node_interface["interface_tag_ids"]) + node_interface["interface_tags"] = interface_tags + self.add_interface(node_hostname, node_interfaces) +# elif api_method_name == "DeleteInterface": +# node_interface = self.get_interface(wobj.api, interface_id) +# for node_id in node_interface["node_id"]: +# node_hostname = self.get_node_hostname(wobj.api, node_id) +# node_interfaces = self.get_interfaces(wobj.api, node_id) +# for node_interface in node_interfaces: +# #FIXME: Requires pyplnet changes +# interface_tags = self.get_interface_tags(wobj.api, node_interface["interface_tag_ids"]) +# node_interface["interface_tags"] = interface_tags +# self.delete_interface(node_hostname, node_interfaces) + + elif api_method_name == "DeleteSlice": + for node_id in slice["node_ids"]: + node_hostname = self.get_node_hostname(wobj.api, node_id) + self.delete_slice_from_node(slice["name"], node_hostname) + elif api_method_name == "DeletePerson": + #FIXME: Role + slice_ids = person["slice_ids"] + for slice_id in slice_ids: + slice = self.get_slice(wobj.api, slice_id) + keys = self.get_person_keys(wobj.api, slice["person_ids"]) + for node_id in slice["node_ids"]: + node_hostname = self.get_node_hostname(wobj.api, node_id) + self.delete_person_from_slice(slice["name"], node_hostname, person["email"], keys) + elif api_method_name == "DeleteTagType": + #FIXME: NodeGroups etc. + for slice in self.slice_ids: + slice = self.get_slice(wobj.api, tag["slice_id"]) + for node_id in slice["node_ids"]: + node_hostname = self.get_node_hostname(wobj.api, node_id) + self.delete_slice_tag(slice["name"], node_hostname, tag["tagname"], tag["value"]) + +# self.logit(wobj.name, args, kwargs, data, slice) + +class FuncAspect_class(BaseFunc): + __metaclass__ = MetaAspect + name = "funcaspect_class" + + #node_list = func.overlord.client.Client("*").list_minions() + #FIXME: Only control following nodes + node_list = ["planetlab-01.cs.princeton.edu", "planetlab-02.cs.princeton.edu", "planetlab-04.cs.princeton.edu", "planetlab-05.cs.princeton.edu"] + + client = None + + def __init__(self): + BaseFunc.__init__(self) + + def write_to_log(self, line): + if not self.log: return + + date = datetime.datetime.now().strftime("%d/%m/%y %H:%M") + self.log.write("%s - %s" % (date, line)) + self.log.flush() + + def ping(self, node): + try: + self.client = func.overlord.client.Client(node, timeout = 10) + except: + self.client = None + return False + + if self.client.test.ping()[node]: + self.client = func.overlord.client.Client(node, timeout = 10, async = True) + return True + + self.client = None + return False + + def add_slice_to_node(self, slice, node, family, tags, keys): + self.write_to_log("Func: AddSlice: %s toNode: %s Family: %s\n" % (slice, node, family)) + + if node in self.node_list: + self.increment_revision_for_node(node) + if self.ping(node): + self.client.nm.AddSliceToNode(slice, tags, keys) + else: + self.write_to_log("Func: (AddSlice)Cannot access to node: %s" % node) + +# if self.ping(node): +# self.client.nm.AddSliceToNode(slice, interfaces, family, tags, keys) +# else: +# AddToQueue + + def delete_slice_from_node(self, slice, node): + self.write_to_log("Func: DeleteSlice: %s FromNode: %s\n" % (slice, node)) + + if node in self.node_list: + self.increment_revision_for_node(node) + if self.ping(node): + self.client.nm.DeleteSliceFromNode(slice) + else: + self.write_to_log("Func: (DeleteSlice)Cannot access to node: %s" % node) + +# if self.pingnode(node): +# self.client.nm.DeleteSliceFromNode(slice) +# else: +# AddToQueue + + def add_person_to_slice(self, slice, node, person, value): + self.write_to_log("Func: AddPerson: %s ToSlice: %s onNode: %s\n" % (person, slice, node)) + + if node in self.node_list: + self.increment_revision_for_node(node) + if self.ping(node): + self.client.nm.AddPersonToSlice(slice, value) + else: + self.write_to_log("Func: (AddPersonToSlice)Cannot access to node: %s" % node) + +# if self.ping(node): +# self.client.nm.AddPersonToSlice(slice, value) +# else: +# AddToQueue + + def delete_person_from_slice(self, slice, node, person, value): + self.write_to_log("Func: DeletePerson: %s FromSlice: %s onNode: %s\n" % (person, slice, node)) + + if node in self.node_list: + self.increment_revision_for_node(node) + if self.ping(node): + self.client.nm.DeletePersonFromSlice(slice, value) + else: + self.write_to_log("Func: (DeletePersonFromSlice)Cannot access to node: %s" % node) + +# if self.ping(node): +# self.client.nm.DeletePersonFromSlice(slice, value) +# else: +# AddToQueue + + def add_slice_tag(self, slice, node, tag, value): + self.write_to_log("Func: AddSliceTag: %s Value: %s onSlice:%s onNode: %s\n" % (tag, value, slice, node)) + + if node in self.node_list: + self.increment_revision_for_node(node) + if self.ping(node): + self.client.nm.AddSliceTag(slice, tag, value) + else: + self.write_to_log("Func: (AddSliceTag)Cannot access to node: %s" % node) + +# if self.ping(node): +# self.client.nm.AddSliceTag(slice, tag, value) +# else: +# AddToQueue + + def delete_slice_tag(self, slice, node, tag, value): + self.write_to_log("Func: DeleteSliceTag: %s Value: %s onSlice:%s onNode: %s\n" % (tag, value, slice, node)) + + if node in self.node_list: + self.increment_revision_for_node(node) + if self.ping(node): + self.client.nm.DeleteSliceTag(slice, tag, value) + else: + self.write_to_log("Func: (DeleteSliceTag)Cannot access to node: %s" % node) + +# if self.ping(node): +# self.client.nm.DeleteSliceTag(slice, tag, value) +# else: +# AddToQueue + + def update_slice_tag(self, slice, node, tag, value): + self.write_to_log("Func: UpdateSliceTag: %s Value: %s toSlice: %s onNode: %s\n" % (tag, value, slice, node)) + +# client = func.overlord.client.Client(node, timeout=5, async=True) +# if self.ping(node): +# self.client.nm.UpdateSliceTag(slice, tag, value) +# else: +# AddToQueue + + def add_node_tag(self, node, tag, value): + self.write_to_log("Func: AddNodeTag: %s Value: %s toNode: %s\n" % (tag, value, node)) + +# if self.ping(node): +# self.client.nm.AddNodeTag(tag, value) +# else: +# AddToQueue + + def delete_node_tag(self, node, tag, value): + self.write_to_log("Func: DeleteNodeTag: %s Value: %s fromNode: %s\n" % (tag, value, node)) + +# if self.ping(node): +# self.client.nm.DeleteSliceTag(tag, value) +# else: +# AddToQueue + + def update_node_tag(self, node, tag, value): + self.write_to_log("Func: UpdateNodeTag: %s Value: %s toNode: %s\n" % (tag, value, node)) + +# if self.ping(node): +# self.client.nm.UpdateNodeTag(tag, value) +# else: +# AddToQueue + + def add_interface(self, node, value): + self.write_to_log("Func: AddInterface: %s toNode: %s\n" % (len(value), node)) + +# if self.ping(node): +# self.client.nm.AddInterface(value) +# else: +# AddToQueue + + def delete_interface(self, node, value): + self.write_to_log("Func: DeleteInterface: %s toNode: %s\n" % (len(value), node)) + +# if self.ping(node): +# self.client.nm.DeleteInterface(value) +# else: +# AddToQueue + + @absorb_exception + def increment_revision_for_node(self, node): + filename = "/var/www/html/func/%s" % node + value = 0 + + if os.path.exists(filename): + f = open(filename, "r") + value = int(f.read().strip()) + f.close() + + f = open(filename, "w") + value += 1 + f.write("%d" % int(value)) + f.close() + + def before(self, wobj, data, *args, **kwargs): + BaseFunc.before(self, wobj, data, *args, **kwargs) + + def after(self, wobj, data, *args, **kwargs): + BaseFunc.after(self, wobj, data, *args, **kwargs) + +FuncAspect = FuncAspect_class -- 2.43.0