From 48036d0e8812f841359c56a62a117d5931c10da7 Mon Sep 17 00:00:00 2001 From: Tony Mack Date: Mon, 26 Apr 2010 21:40:57 +0000 Subject: [PATCH] no longer use sfa.util.storage.SimpleStorage for caching list of instantiated slices. Using sfa.util.cache instead --- sfa/managers/aggregate_manager_pl.py | 21 ++++++--- sfa/managers/slice_manager_pl.py | 28 ++++++++--- sfa/plc/api.py | 7 +-- sfa/plc/slices.py | 70 +--------------------------- sfa/util/api.py | 10 ++-- sfa/util/cache.py | 62 ++++++++++++++++++++++++ sfa/util/server.py | 11 +++-- 7 files changed, 117 insertions(+), 92 deletions(-) create mode 100644 sfa/util/cache.py diff --git a/sfa/managers/aggregate_manager_pl.py b/sfa/managers/aggregate_manager_pl.py index 071a26e7..76bfaae5 100644 --- a/sfa/managers/aggregate_manager_pl.py +++ b/sfa/managers/aggregate_manager_pl.py @@ -168,12 +168,21 @@ def reset_slice(api, xrn): return 1 def get_slices(api): - # XX just import the legacy module and excute that until - # we transition the code to this module - from sfa.plc.slices import Slices - slices = Slices(api) - slices.refresh() - return [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slices['hrn']] + # look in cache first + if api.cache: + slices = api.cache.get('slices') + if slices: + return slices + + # get data from db + slices = api.plshell.GetSlices(api.plauth, {'peer_id': None}, ['name']) + slice_hrns = [slicename_to_hrn(api.hrn, slice['name']) for slice in slices] + slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns] + + # cache the result + api.cache.add('slices', slice_urns) + return slice_urns + def get_rspec(api, xrn=None, origin_hrn=None): diff --git a/sfa/managers/slice_manager_pl.py b/sfa/managers/slice_manager_pl.py index e7880f5a..5676416a 100644 --- a/sfa/managers/slice_manager_pl.py +++ b/sfa/managers/slice_manager_pl.py @@ -185,13 +185,27 @@ def reset_slice(api, xrn): return 1 def get_slices(api): - # XX just import the legacy module and excute that until - # we transition the code to this module - from sfa.plc.slices import Slices - slices = Slices(api) - slices.refresh() - return [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slices['hrn']] - + # look in cache first + if api.cache: + slices = api.cache.get('slices') + if slices: + return slices + + # fetch from aggregates + slices = [] + credential = api.getCredential() + for aggregate in api.aggregates: + try: + tmp_slices = api.aggregates[aggregate].get_slices(credential) + slices.extend(tmp_slices) + except: + print >> log, "%s" % (traceback.format_exc()) + print >> log, "Error calling slices at aggregate %(aggregate)s" % locals() + + # cache the result + api.cache.add('slices', slices) + return slices + def get_rspec(api, xrn=None, origin_hrn=None): hrn, type = urn_to_hrn(xrn) rspec = None diff --git a/sfa/plc/api.py b/sfa/plc/api.py index b4d295b1..f95611a9 100644 --- a/sfa/plc/api.py +++ b/sfa/plc/api.py @@ -36,11 +36,12 @@ class SfaAPI(BaseAPI): import sfa.methods methods = sfa.methods.all - def __init__(self, config = "/etc/sfa/sfa_config.py", encoding = "utf-8", methods='sfa.methods', \ - peer_cert = None, interface = None, key_file = None, cert_file = None): + def __init__(self, config = "/etc/sfa/sfa_config.py", encoding = "utf-8", + methods='sfa.methods', peer_cert = None, interface = None, + key_file = None, cert_file = None, cache = None): BaseAPI.__init__(self, config=config, encoding=encoding, methods=methods, \ peer_cert=peer_cert, interface=interface, key_file=key_file, \ - cert_file=cert_file) + cert_file=cert_file, cache=cache) self.encoding = encoding diff --git a/sfa/plc/slices.py b/sfa/plc/slices.py index 9730e972..abe5db17 100644 --- a/sfa/plc/slices.py +++ b/sfa/plc/slices.py @@ -11,7 +11,6 @@ from sfa.util.namespace import * from sfa.util.rspec import * from sfa.util.specdict import * from sfa.util.faults import * -from sfa.util.storage import * from sfa.util.record import SfaRecord from sfa.util.policy import Policy from sfa.util.prefixTree import prefixTree @@ -19,21 +18,14 @@ from sfa.util.debug import log MAXINT = 2L**31-1 -class Slices(SimpleStorage): +class Slices: rspec_to_slice_tag = {'max_rate':'net_max_rate'} def __init__(self, api, ttl = .5, origin_hrn=None): self.api = api - self.ttl = ttl - self.threshold = None - path = self.api.config.SFA_DATA_DIR - filename = ".".join([self.api.interface, self.api.hrn, "slices"]) filepath = path + os.sep + filename - self.slices_file = filepath - SimpleStorage.__init__(self, self.slices_file) self.policy = Policy(self.api) - self.load() self.origin_hrn = origin_hrn def get_slivers(self, xrn, node=None): @@ -176,66 +168,6 @@ class Slices(SimpleStorage): return sfa_peer - def refresh(self): - """ - Update the cached list of slices - """ - # Reload components list - now = datetime.datetime.now() - if not self.has_key('threshold') or not self.has_key('timestamp') or \ - now > datetime.datetime.fromtimestamp(time.mktime(time.strptime(self['threshold'], self.api.time_format))): - if self.api.interface in ['aggregate']: - self.refresh_slices_aggregate() - elif self.api.interface in ['slicemgr']: - self.refresh_slices_smgr() - - def refresh_slices_aggregate(self): - slices = self.api.plshell.GetSlices(self.api.plauth, {'peer_id': None}, ['name']) - slice_hrns = [slicename_to_hrn(self.api.hrn, slice['name']) for slice in slices] - - # update timestamp and threshold - timestamp = datetime.datetime.now() - hr_timestamp = timestamp.strftime(self.api.time_format) - delta = datetime.timedelta(hours=self.ttl) - threshold = timestamp + delta - hr_threshold = threshold.strftime(self.api.time_format) - - slice_details = {'hrn': slice_hrns, - 'timestamp': hr_timestamp, - 'threshold': hr_threshold - } - self.update(slice_details) - self.write() - - - def refresh_slices_smgr(self): - slice_hrns = [] - credential = self.api.getCredential() - for aggregate in self.api.aggregates: - success = False - try: - slices = self.api.aggregates[aggregate].get_slices(credential) - slice_hrns.extend(slices) - success = True - except: - print >> log, "%s" % (traceback.format_exc()) - print >> log, "Error calling slices at aggregate %(aggregate)s" % locals() - - # update timestamp and threshold - timestamp = datetime.datetime.now() - hr_timestamp = timestamp.strftime(self.api.time_format) - delta = datetime.timedelta(hours=self.ttl) - threshold = timestamp + delta - hr_threshold = threshold.strftime(self.api.time_format) - - slice_details = {'hrn': slice_hrns, - 'timestamp': hr_timestamp, - 'threshold': hr_threshold - } - self.update(slice_details) - self.write() - - def verify_site(self, registry, credential, slice_hrn, peer, sfa_peer): authority = get_authority(slice_hrn) authority_urn = hrn_to_urn(authority, 'authority') diff --git a/sfa/util/api.py b/sfa/util/api.py index 099f079d..0c2b78a0 100644 --- a/sfa/util/api.py +++ b/sfa/util/api.py @@ -97,14 +97,15 @@ def import_deep(name): class BaseAPI: - def __init__(self, config = "/etc/sfa/sfa_config.py", encoding = "utf-8", methods='sfa.methods', - - peer_cert = None, interface = None, key_file = None, cert_file = None): + cache = None + + def __init__(self, config = "/etc/sfa/sfa_config.py", encoding = "utf-8", + methods='sfa.methods', peer_cert = None, interface = None, + key_file = None, cert_file = None, cache = cache): self.encoding = encoding # flat list of method names - self.methods_module = methods_module = __import__(methods, fromlist=[methods]) self.methods = methods_module.all @@ -121,6 +122,7 @@ class BaseAPI: self.key = Keypair(filename=self.key_file) self.cert_file = cert_file self.cert = Certificate(filename=self.cert_file) + self.cache = cache self.credential = None self.source = None self.time_format = "%Y-%m-%d %H:%M:%S" diff --git a/sfa/util/cache.py b/sfa/util/cache.py new file mode 100644 index 00000000..45961feb --- /dev/null +++ b/sfa/util/cache.py @@ -0,0 +1,62 @@ +# +# This module implements general purpose caching system +# +from __future__ import with_statement +import time +import threading +from datetime import datetime + +# maximum lifetime of cached data (in seconds) +MAX_CACHE_TTL = 60 * 60 + +class CacheData: + + data = None + created = None + expires = None + lock = None + + def __init__(self, data, ttl = MAX_CACHE_TTL): + self.lock = threading.RLock() + self.data = data + self.renew(ttl) + + def is_expired(self): + return time.time() > self.expires + + def get_created_date(self): + return str(datetime.fromtimestamp(self.created)) + + def get_expires_date(self): + return str(datetime.fromtimestamp(self.expires)) + + def renew(self, ttl = MAX_CACHE_TTL): + self.created = time.time() + self.expires = self.created + ttl + + def set_data(self, data, renew=True, ttl = MAX_CACHE_TTL): + with self.lock: + self.data = data + if renew: + self.renew(ttl) + + def get_data(self): + return self.data + +class Cache: + + cache = {} + lock = threading.RLock() + + def add(self, key, value, ttl = MAX_CACHE_TTL): + with self.lock: + if self.cache.has_key(key): + self.cache[key].set_data(value, ttl=ttl) + else: + self.cache[key] = CacheData(value, ttl) + + def get(self, key): + data = self.cache.get(key) + if not data or data.is_expired(): + return None + return data.get_data() diff --git a/sfa/util/server.py b/sfa/util/server.py index 38bbfca9..2ee43785 100644 --- a/sfa/util/server.py +++ b/sfa/util/server.py @@ -22,7 +22,8 @@ from Queue import Queue from sfa.trust.certificate import Keypair, Certificate from sfa.trust.credential import * from sfa.util.faults import * -from sfa.plc.api import SfaAPI +from sfa.plc.api import SfaAPI +from sfa.util.cache import Cache from sfa.util.debug import log ## @@ -90,7 +91,8 @@ class SecureXMLRpcRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): def do_POST(self): """Handles the HTTPS POST request. - It was copied out from SimpleXMLRPCServer.py and modified to shutdown the socket cleanly. + It was copied out from SimpleXMLRPCServer.py and modified to shutdown + the socket cleanly. """ try: peer_cert = Certificate() @@ -98,7 +100,8 @@ class SecureXMLRpcRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): self.api = SfaAPI(peer_cert = peer_cert, interface = self.server.interface, key_file = self.server.key_file, - cert_file = self.server.cert_file) + cert_file = self.server.cert_file, + cache = self.cache) # get arguments request = self.rfile.read(int(self.headers["content-length"])) remote_addr = (remote_ip, remote_port) = self.connection.getpeername() @@ -136,6 +139,8 @@ class SecureXMLRPCServer(BaseHTTPServer.HTTPServer,SimpleXMLRPCServer.SimpleXMLR self.interface = None self.key_file = key_file self.cert_file = cert_file + # add cache to the request handler + HandlerClass.cache = Cache() #for compatibility with python 2.4 (centos53) if sys.version_info < (2, 5): SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self) -- 2.43.0