return record
+import uuid
+def unique_call_id(): return uuid.uuid4().urn
class Sfi:
if opts.delegate:
delegated_cred = self.delegate_cred(cred, get_authority(self.authority))
creds.append(delegated_cred)
- result = server.ListResources(creds, call_options)
+ result = server.ListResources(creds, call_options,unique_call_id())
format = opts.format
if opts.file is None:
display_rspec(result, format)
from sfa.plc.slices import *
from sfa.util.version import version_core
from sfa.util.sfatime import utcparse
+from sfa.util.callids import Callids
def GetVersion(api):
xrn=Xrn(api.hrn)
return slice_urns
-def get_rspec(api, creds, options):
+# xxx Thierry : caching at the aggregate level sounds wrong...
+caching=True
+def get_rspec(api, creds, options,call_id):
+ if not Callids().should_handle_call_id(call_id): return ""
# get slice's hrn from options
xrn = options.get('geni_slice_urn', '')
- hrn, type = urn_to_hrn(xrn)
+ (hrn, type) = urn_to_hrn(xrn)
# look in cache first
- if api.cache and not xrn:
+ if caching and api.cache and not xrn:
rspec = api.cache.get('nodes')
if rspec:
+ api.logger.info("aggregate.get_rspec: returning cached value for hrn %s"%hrn)
return rspec
network = Network(api)
rspec = network.toxml()
# cache the result
- if api.cache and not xrn:
+ if caching and api.cache and not xrn:
api.cache.add('nodes', rspec)
return rspec
import sfa.util.xmlrpcprotocol as xmlrpcprotocol
import sfa.plc.peers as peers
from sfa.util.version import version_core
+from sfa.util.callids import Callids
# XX FIX ME: should merge result from multiple aggregates instead of
# calling aggregate implementation
api.cache.add('slices', slices)
return slices
-
-def get_rspec(api, creds, options):
-
+
+
+# Thierry : caching at the slicemgr level makes sense to some extent
+caching=True
+def get_rspec(api, creds, options, call_id):
+
+ if not Callids().should_handle_call_id(call_id):
+ api.logger.info("%d received get_rspec with known call_id %s"%(api.interface,call_id))
+ return ""
+
# get slice's hrn from options
xrn = options.get('geni_slice_urn', '')
- hrn, type = urn_to_hrn(xrn)
+ (hrn, type) = urn_to_hrn(xrn)
# get hrn of the original caller
origin_hrn = options.get('origin_hrn', None)
origin_hrn = Credential(string=creds).get_gid_caller().get_hrn()
# look in cache first
- if api.cache and not xrn:
+ if caching and api.cache and not xrn:
rspec = api.cache.get('nodes')
if rspec:
return rspec
- hrn, type = urn_to_hrn(xrn)
-
# get the callers hrn
valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
server = api.aggregates[aggregate]
my_opts = copy(options)
my_opts['geni_compressed'] = False
- threads.run(server.ListResources, credential, my_opts)
+ threads.run(server.ListResources, credential, my_opts, call_id)
#threads.run(server.get_resources, cred, xrn, origin_hrn)
results = threads.get_results()
merged_rspec = merge_rspecs(results)
# cache the result
- if api.cache and not xrn:
+ if caching and api.cache and not xrn:
api.cache.add('nodes', merged_rspec)
return merged_rspec
# get slice's hrn from options
xrn = options.get('geni_slice_urn', '')
- hrn, _ = urn_to_hrn(xrn)
+ (hrn, _) = urn_to_hrn(xrn)
# Find the valid credentials
valid_creds = self.api.auth.checkCredentials(creds, 'listnodes', hrn)
chain_name = 'OUTGOING'
elif self.api.interface in ['slicemgr']:
chain_name = 'FORWARD-OUTGOING'
+ self.api.logger.debug("ListResources: sfatables on chain %s"%chain_name)
filtered_rspec = run_sfatables(chain_name, hrn, origin_hrn, rspec)
if options.has_key('geni_compressed') and options['geni_compressed'] == True:
--- /dev/null
+#!/usr/bin/python
+
+import threading
+import time
+
+from sfa.util.sfalogging import sfa_logger
+
+"""
+Callids: a simple mechanism to remember the call ids served so fas
+memory-only for now - thread-safe
+implemented as a (singleton) hash 'callid'->timestamp
+"""
+
+class _call_ids_impl (dict):
+
+ _instance = None
+ # 5 minutes sounds amply enough
+ purge_timeout=5*60
+ # when trying to get a lock
+ retries=8
+ # in ms
+ wait_ms=200
+
+ def __init__(self):
+ self._lock=threading.Lock()
+
+ # the only primitive
+ # return True if the callid is unknown, False otherwise
+ def should_handle_call_id (self,call_id):
+ # if not provided in the call...
+ if not call_id: return True
+ has_lock=False
+ for attempt in range(_call_ids_impl.retries):
+ sfa_logger().debug("Waiting for lock (%d)"%attempt)
+ if self._lock.acquire(False):
+ has_lock=True
+ sfa_logger().debug("got lock (%d)"%attempt)
+ break
+ time.sleep(float(_call_ids_impl.wait_ms)/1000)
+ # in the unlikely event where we can't get the lock
+ if not has_lock:
+ sfa_logger().warning("_call_ids_impl.should_handle_call_id: could not acquire lock")
+ return True
+ # we're good to go
+ if self.has_key(call_id):
+ self._purge()
+ self._lock.release()
+ return False
+ self[call_id]=time.time()
+ self._purge()
+ self._lock.release()
+ sfa_logger().debug("released lock")
+ return True
+
+ def _purge(self):
+ now=time.time()
+ o_keys=[]
+ for (k,v) in self.iteritems():
+ if (now-v) >= _call_ids_impl.purge_timeout: o_keys.append(k)
+ for k in o_keys:
+ sfa_logger().debug("Purging call_id %r (%s)"%(k,time.strftime("%H:%M:%S",time.localtime(self[k]))))
+ del self[k]
+ sfa_logger().debug("AFTER PURGE")
+ for (k,v) in self.iteritems(): sfa_logger().debug("%s -> %s"%(k,time.strftime("%H:%M:%S",time.localtime(v))))
+
+def Callids ():
+ if not _call_ids_impl._instance:
+ _call_ids_impl._instance = _call_ids_impl()
+ return _call_ids_impl._instance
# the resulting tree
rspec = None
for input_rspec in rspecs:
+ # ignore empty strings as returned with used call_ids
+ if not input_rspec: continue
try:
tree = etree.parse(StringIO(input_rspec))
except etree.XMLSyntaxError: