a framework for handling call ids
authorThierry Parmentelat <thierry.parmentelat@sophia.inria.fr>
Wed, 6 Apr 2011 15:38:58 +0000 (17:38 +0200)
committerThierry Parmentelat <thierry.parmentelat@sophia.inria.fr>
Wed, 6 Apr 2011 15:38:58 +0000 (17:38 +0200)
for now only on ListResources/get_rspec (some AMs might not hae it yet)

sfa/client/sfi.py
sfa/managers/aggregate_manager_pl.py
sfa/managers/slice_manager_pl.py
sfa/methods/ListResources.py
sfa/util/callids.py [new file with mode: 0644]
sfa/util/rspecHelper.py

index 7f072d7..241d8e1 100755 (executable)
@@ -118,6 +118,8 @@ def load_record_from_file(filename):
     return record
 
 
+import uuid
+def unique_call_id(): return uuid.uuid4().urn
 
 class Sfi:
     
@@ -818,7 +820,7 @@ class Sfi:
         if opts.delegate:
             delegated_cred = self.delegate_cred(cred, get_authority(self.authority))
             creds.append(delegated_cred) 
-        result = server.ListResources(creds, call_options)
+        result = server.ListResources(creds, call_options,unique_call_id())
         format = opts.format
         if opts.file is None:
             display_rspec(result, format)
index 03d37cd..376f12c 100644 (file)
@@ -22,6 +22,7 @@ from sfa.plc.api import SfaAPI
 from sfa.plc.slices import *
 from sfa.util.version import version_core
 from sfa.util.sfatime import utcparse
+from sfa.util.callids import Callids
 
 def GetVersion(api):
     xrn=Xrn(api.hrn)
@@ -290,15 +291,19 @@ def get_slices(api, creds):
 
     return slice_urns
     
-def get_rspec(api, creds, options):
+# xxx Thierry : caching at the aggregate level sounds wrong...
+caching=True
+def get_rspec(api, creds, options,call_id):
+    if not Callids().should_handle_call_id(call_id): return ""
     # get slice's hrn from options
     xrn = options.get('geni_slice_urn', '')
-    hrn, type = urn_to_hrn(xrn)
+    (hrn, type) = urn_to_hrn(xrn)
 
     # look in cache first
-    if api.cache and not xrn:
+    if caching and api.cache and not xrn:
         rspec = api.cache.get('nodes')
         if rspec:
+            api.logger.info("aggregate.get_rspec: returning cached value for hrn %s"%hrn)
             return rspec 
 
     network = Network(api)
@@ -309,7 +314,7 @@ def get_rspec(api, creds, options):
     rspec = network.toxml()
 
     # cache the result
-    if api.cache and not xrn:
+    if caching and api.cache and not xrn:
         api.cache.add('nodes', rspec)
 
     return rspec
index a814106..bc777a6 100644 (file)
@@ -23,6 +23,7 @@ from sfa.util.threadmanager import ThreadManager
 import sfa.util.xmlrpcprotocol as xmlrpcprotocol     
 import sfa.plc.peers as peers
 from sfa.util.version import version_core
+from sfa.util.callids import Callids
 
 # XX FIX ME:  should merge result from multiple aggregates instead of 
 # calling aggregate implementation
@@ -320,12 +321,19 @@ def get_slices(api, creds):
         api.cache.add('slices', slices)
 
     return slices
-def get_rspec(api, creds, options):
-    
+
+
+# Thierry : caching at the slicemgr level makes sense to some extent
+caching=True
+def get_rspec(api, creds, options, call_id):
+
+    if not Callids().should_handle_call_id(call_id): 
+        api.logger.info("%d received get_rspec with known call_id %s"%(api.interface,call_id))
+        return ""
+
     # get slice's hrn from options
     xrn = options.get('geni_slice_urn', '')
-    hrn, type = urn_to_hrn(xrn)
+    (hrn, type) = urn_to_hrn(xrn)
 
     # get hrn of the original caller
     origin_hrn = options.get('origin_hrn', None)
@@ -336,13 +344,11 @@ def get_rspec(api, creds, options):
             origin_hrn = Credential(string=creds).get_gid_caller().get_hrn()
     
     # look in cache first 
-    if api.cache and not xrn:
+    if caching and api.cache and not xrn:
         rspec =  api.cache.get('nodes')
         if rspec:
             return rspec
 
-    hrn, type = urn_to_hrn(xrn)
-
     # get the callers hrn
     valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
@@ -361,14 +367,14 @@ def get_rspec(api, creds, options):
         server = api.aggregates[aggregate]
         my_opts = copy(options)
         my_opts['geni_compressed'] = False
-        threads.run(server.ListResources, credential, my_opts)
+        threads.run(server.ListResources, credential, my_opts, call_id)
         #threads.run(server.get_resources, cred, xrn, origin_hrn)
                     
     results = threads.get_results()
     merged_rspec = merge_rspecs(results)
 
     # cache the result
-    if api.cache and not xrn:
+    if caching and api.cache and not xrn:
         api.cache.add('nodes', merged_rspec)
  
     return merged_rspec
index 6afeb49..6779303 100644 (file)
@@ -28,7 +28,7 @@ class ListResources(Method):
         
         # get slice's hrn from options    
         xrn = options.get('geni_slice_urn', '')
-        hrn, _ = urn_to_hrn(xrn)
+        (hrn, _) = urn_to_hrn(xrn)
 
         # Find the valid credentials
         valid_creds = self.api.auth.checkCredentials(creds, 'listnodes', hrn)
@@ -46,6 +46,7 @@ class ListResources(Method):
             chain_name = 'OUTGOING'
         elif self.api.interface in ['slicemgr']: 
             chain_name = 'FORWARD-OUTGOING'
+        self.api.logger.debug("ListResources: sfatables on chain %s"%chain_name)
         filtered_rspec = run_sfatables(chain_name, hrn, origin_hrn, rspec) 
  
         if options.has_key('geni_compressed') and options['geni_compressed'] == True:
diff --git a/sfa/util/callids.py b/sfa/util/callids.py
new file mode 100644 (file)
index 0000000..7ffaf45
--- /dev/null
@@ -0,0 +1,69 @@
+#!/usr/bin/python
+
+import threading
+import time
+
+from sfa.util.sfalogging import sfa_logger
+
+"""
+Callids: a simple mechanism to remember the call ids served so fas
+memory-only for now - thread-safe
+implemented as a (singleton) hash 'callid'->timestamp
+"""
+
+class _call_ids_impl (dict):
+
+    _instance = None
+    # 5 minutes sounds amply enough
+    purge_timeout=5*60
+    # when trying to get a lock
+    retries=8
+    # in ms
+    wait_ms=200
+
+    def __init__(self): 
+        self._lock=threading.Lock()
+
+    # the only primitive
+    # return True if the callid is unknown, False otherwise
+    def should_handle_call_id (self,call_id):
+        # if not provided in the call...
+        if not call_id: return True
+        has_lock=False
+        for attempt in range(_call_ids_impl.retries):
+            sfa_logger().debug("Waiting for lock (%d)"%attempt)
+            if self._lock.acquire(False): 
+                has_lock=True
+                sfa_logger().debug("got lock (%d)"%attempt)
+                break
+            time.sleep(float(_call_ids_impl.wait_ms)/1000)
+        # in the unlikely event where we can't get the lock
+        if not has_lock:
+            sfa_logger().warning("_call_ids_impl.should_handle_call_id: could not acquire lock")
+            return True
+        # we're good to go
+        if self.has_key(call_id):
+            self._purge()
+            self._lock.release()
+            return False
+        self[call_id]=time.time()
+        self._purge()
+        self._lock.release()
+        sfa_logger().debug("released lock")
+        return True
+        
+    def _purge(self):
+        now=time.time()
+        o_keys=[]
+        for (k,v) in self.iteritems():
+            if (now-v) >= _call_ids_impl.purge_timeout: o_keys.append(k)
+        for k in o_keys: 
+            sfa_logger().debug("Purging call_id %r (%s)"%(k,time.strftime("%H:%M:%S",time.localtime(self[k]))))
+            del self[k]
+        sfa_logger().debug("AFTER PURGE")
+        for (k,v) in self.iteritems(): sfa_logger().debug("%s -> %s"%(k,time.strftime("%H:%M:%S",time.localtime(v))))
+        
+def Callids ():
+    if not _call_ids_impl._instance:
+        _call_ids_impl._instance = _call_ids_impl()
+    return _call_ids_impl._instance
index f6c860b..1ccc984 100755 (executable)
@@ -36,6 +36,8 @@ def merge_rspecs(rspecs):
     # the resulting tree
     rspec = None
     for input_rspec in rspecs:
+        # ignore empty strings as returned with used call_ids
+        if not input_rspec: continue
         try:
             tree = etree.parse(StringIO(input_rspec))
         except etree.XMLSyntaxError: