From: Thierry Parmentelat Date: Wed, 6 Apr 2011 15:38:58 +0000 (+0200) Subject: a framework for handling call ids X-Git-Tag: sfa-1.0-21-ckp1~74^2~27 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=45ec5d374d680b30464a5c47082eb578c2acac28;p=sfa.git a framework for handling call ids for now only on ListResources/get_rspec (some AMs might not hae it yet) --- diff --git a/sfa/client/sfi.py b/sfa/client/sfi.py index 7f072d72..241d8e10 100755 --- a/sfa/client/sfi.py +++ b/sfa/client/sfi.py @@ -118,6 +118,8 @@ def load_record_from_file(filename): return record +import uuid +def unique_call_id(): return uuid.uuid4().urn class Sfi: @@ -818,7 +820,7 @@ class Sfi: if opts.delegate: delegated_cred = self.delegate_cred(cred, get_authority(self.authority)) creds.append(delegated_cred) - result = server.ListResources(creds, call_options) + result = server.ListResources(creds, call_options,unique_call_id()) format = opts.format if opts.file is None: display_rspec(result, format) diff --git a/sfa/managers/aggregate_manager_pl.py b/sfa/managers/aggregate_manager_pl.py index 03d37cd8..376f12c8 100644 --- a/sfa/managers/aggregate_manager_pl.py +++ b/sfa/managers/aggregate_manager_pl.py @@ -22,6 +22,7 @@ from sfa.plc.api import SfaAPI from sfa.plc.slices import * from sfa.util.version import version_core from sfa.util.sfatime import utcparse +from sfa.util.callids import Callids def GetVersion(api): xrn=Xrn(api.hrn) @@ -290,15 +291,19 @@ def get_slices(api, creds): return slice_urns -def get_rspec(api, creds, options): +# xxx Thierry : caching at the aggregate level sounds wrong... +caching=True +def get_rspec(api, creds, options,call_id): + if not Callids().should_handle_call_id(call_id): return "" # get slice's hrn from options xrn = options.get('geni_slice_urn', '') - hrn, type = urn_to_hrn(xrn) + (hrn, type) = urn_to_hrn(xrn) # look in cache first - if api.cache and not xrn: + if caching and api.cache and not xrn: rspec = api.cache.get('nodes') if rspec: + api.logger.info("aggregate.get_rspec: returning cached value for hrn %s"%hrn) return rspec network = Network(api) @@ -309,7 +314,7 @@ def get_rspec(api, creds, options): rspec = network.toxml() # cache the result - if api.cache and not xrn: + if caching and api.cache and not xrn: api.cache.add('nodes', rspec) return rspec diff --git a/sfa/managers/slice_manager_pl.py b/sfa/managers/slice_manager_pl.py index a814106a..bc777a63 100644 --- a/sfa/managers/slice_manager_pl.py +++ b/sfa/managers/slice_manager_pl.py @@ -23,6 +23,7 @@ from sfa.util.threadmanager import ThreadManager import sfa.util.xmlrpcprotocol as xmlrpcprotocol import sfa.plc.peers as peers from sfa.util.version import version_core +from sfa.util.callids import Callids # XX FIX ME: should merge result from multiple aggregates instead of # calling aggregate implementation @@ -320,12 +321,19 @@ def get_slices(api, creds): api.cache.add('slices', slices) return slices - -def get_rspec(api, creds, options): - + + +# Thierry : caching at the slicemgr level makes sense to some extent +caching=True +def get_rspec(api, creds, options, call_id): + + if not Callids().should_handle_call_id(call_id): + api.logger.info("%d received get_rspec with known call_id %s"%(api.interface,call_id)) + return "" + # get slice's hrn from options xrn = options.get('geni_slice_urn', '') - hrn, type = urn_to_hrn(xrn) + (hrn, type) = urn_to_hrn(xrn) # get hrn of the original caller origin_hrn = options.get('origin_hrn', None) @@ -336,13 +344,11 @@ def get_rspec(api, creds, options): origin_hrn = Credential(string=creds).get_gid_caller().get_hrn() # look in cache first - if api.cache and not xrn: + if caching and api.cache and not xrn: rspec = api.cache.get('nodes') if rspec: return rspec - hrn, type = urn_to_hrn(xrn) - # get the callers hrn valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0] caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() @@ -361,14 +367,14 @@ def get_rspec(api, creds, options): server = api.aggregates[aggregate] my_opts = copy(options) my_opts['geni_compressed'] = False - threads.run(server.ListResources, credential, my_opts) + threads.run(server.ListResources, credential, my_opts, call_id) #threads.run(server.get_resources, cred, xrn, origin_hrn) results = threads.get_results() merged_rspec = merge_rspecs(results) # cache the result - if api.cache and not xrn: + if caching and api.cache and not xrn: api.cache.add('nodes', merged_rspec) return merged_rspec diff --git a/sfa/methods/ListResources.py b/sfa/methods/ListResources.py index 6afeb490..67793033 100644 --- a/sfa/methods/ListResources.py +++ b/sfa/methods/ListResources.py @@ -28,7 +28,7 @@ class ListResources(Method): # get slice's hrn from options xrn = options.get('geni_slice_urn', '') - hrn, _ = urn_to_hrn(xrn) + (hrn, _) = urn_to_hrn(xrn) # Find the valid credentials valid_creds = self.api.auth.checkCredentials(creds, 'listnodes', hrn) @@ -46,6 +46,7 @@ class ListResources(Method): chain_name = 'OUTGOING' elif self.api.interface in ['slicemgr']: chain_name = 'FORWARD-OUTGOING' + self.api.logger.debug("ListResources: sfatables on chain %s"%chain_name) filtered_rspec = run_sfatables(chain_name, hrn, origin_hrn, rspec) if options.has_key('geni_compressed') and options['geni_compressed'] == True: diff --git a/sfa/util/callids.py b/sfa/util/callids.py new file mode 100644 index 00000000..7ffaf456 --- /dev/null +++ b/sfa/util/callids.py @@ -0,0 +1,69 @@ +#!/usr/bin/python + +import threading +import time + +from sfa.util.sfalogging import sfa_logger + +""" +Callids: a simple mechanism to remember the call ids served so fas +memory-only for now - thread-safe +implemented as a (singleton) hash 'callid'->timestamp +""" + +class _call_ids_impl (dict): + + _instance = None + # 5 minutes sounds amply enough + purge_timeout=5*60 + # when trying to get a lock + retries=8 + # in ms + wait_ms=200 + + def __init__(self): + self._lock=threading.Lock() + + # the only primitive + # return True if the callid is unknown, False otherwise + def should_handle_call_id (self,call_id): + # if not provided in the call... + if not call_id: return True + has_lock=False + for attempt in range(_call_ids_impl.retries): + sfa_logger().debug("Waiting for lock (%d)"%attempt) + if self._lock.acquire(False): + has_lock=True + sfa_logger().debug("got lock (%d)"%attempt) + break + time.sleep(float(_call_ids_impl.wait_ms)/1000) + # in the unlikely event where we can't get the lock + if not has_lock: + sfa_logger().warning("_call_ids_impl.should_handle_call_id: could not acquire lock") + return True + # we're good to go + if self.has_key(call_id): + self._purge() + self._lock.release() + return False + self[call_id]=time.time() + self._purge() + self._lock.release() + sfa_logger().debug("released lock") + return True + + def _purge(self): + now=time.time() + o_keys=[] + for (k,v) in self.iteritems(): + if (now-v) >= _call_ids_impl.purge_timeout: o_keys.append(k) + for k in o_keys: + sfa_logger().debug("Purging call_id %r (%s)"%(k,time.strftime("%H:%M:%S",time.localtime(self[k])))) + del self[k] + sfa_logger().debug("AFTER PURGE") + for (k,v) in self.iteritems(): sfa_logger().debug("%s -> %s"%(k,time.strftime("%H:%M:%S",time.localtime(v)))) + +def Callids (): + if not _call_ids_impl._instance: + _call_ids_impl._instance = _call_ids_impl() + return _call_ids_impl._instance diff --git a/sfa/util/rspecHelper.py b/sfa/util/rspecHelper.py index f6c860b4..1ccc9848 100755 --- a/sfa/util/rspecHelper.py +++ b/sfa/util/rspecHelper.py @@ -36,6 +36,8 @@ def merge_rspecs(rspecs): # the resulting tree rspec = None for input_rspec in rspecs: + # ignore empty strings as returned with used call_ids + if not input_rspec: continue try: tree = etree.parse(StringIO(input_rspec)) except etree.XMLSyntaxError: