add statistics to slicemanager listresources/createsliver, better error checking...
authorsmbaker <smbaker@fc8clean.lan>
Tue, 9 Aug 2011 05:50:42 +0000 (22:50 -0700)
committersmbaker <smbaker@fc8clean.lan>
Tue, 9 Aug 2011 05:50:42 +0000 (22:50 -0700)
sfa/managers/slice_manager_pl.py

index 7ad48f9..bda355e 100644 (file)
@@ -1,4 +1,4 @@
-# 
+#
 import sys
 import time,datetime
 from StringIO import StringIO
@@ -58,11 +58,11 @@ def get_serverproxy_url (server):
         return server.url
     except:
         logger.warning("GetVersion, falling back to xmlrpclib.ServerProxy internals")
-        return server._ServerProxy__host + server._ServerProxy__handler 
+        return server._ServerProxy__host + server._ServerProxy__handler
 
 def GetVersion(api):
     # peers explicitly in aggregates.xml
-    peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems() 
+    peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems()
                    if peername != api.hrn])
     xrn=Xrn (api.hrn)
     request_rspec_versions = [dict(pg_rspec_request_version), dict(sfa_rspec_version)]
@@ -82,22 +82,44 @@ def GetVersion(api):
         sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname'])
     return sm_version
 
+def drop_slicemgr_stats(rspec):
+    try:
+        stats_elements = rspec.xml.xpath('//statistics')
+        for node in stats_elements:
+            node.getparent().remove(node)
+    except Exception, e:
+        api.logger.warn("drop_slicemgr_stats failed: %s " % (str(e)))
+
+def add_slicemgr_stat(rspec, callname, aggname, elapsed, status):
+    try:
+        stats_tags = rspec.xml.xpath('//statistics[@call="%s"]' % callname)
+        if stats_tags:
+            stats_tag = stats_tags[0]
+        else:
+            stats_tag = etree.SubElement(rspec.xml, "statistics", call="ListResources")
+
+        etree.SubElement(stats_tag, "aggregate", name=str(aggname), elapsed=str(elapsed), status=str(status))
+    except Exception, e:
+        api.logger.warn("add_slicemgr_stat failed on  %s: %s" %(aggname, str(e)))
 
 def ListResources(api, creds, options, call_id):
-    def _ListResources(server, credential, opts, call_id):
-        
+    def _ListResources(aggregate, server, credential, opts, call_id):
+
         my_opts = copy(opts)
         args = [credential, my_opts]
-        if _call_id_supported(api, server):
-            args.append(call_id)
-        version = api.get_cached_server_version(server)
-        # force ProtoGENI aggregates to give us a v2 RSpec
-        if 'sfa' not in version.keys():
-            my_opts['rspec_version'] = pg_rspec_ad_version 
+        tStart = time.time()
         try:
-            return server.ListResources(*args)
+            if _call_id_supported(api, server):
+                args.append(call_id)
+            version = api.get_cached_server_version(server)
+            # force ProtoGENI aggregates to give us a v2 RSpec
+            if 'sfa' not in version.keys():
+                my_opts['rspec_version'] = pg_rspec_ad_version
+            rspec = server.ListResources(*args)
+            return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"}
         except Exception, e:
             api.logger.warn("ListResources failed at %s: %s" %(server.url, str(e)))
+            return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"}
 
     if Callids().already_handled(call_id): return ""
 
@@ -106,7 +128,7 @@ def ListResources(api, creds, options, call_id):
     (hrn, type) = urn_to_hrn(xrn)
     if 'geni_compressed' in options:
         del(options['geni_compressed'])
-    
+
     # get the rspec's return format from options
     rspec_version = RSpecVersion(options.get('rspec_version'))
     version_string = "rspec_%s" % (rspec_version.get_version_name())
@@ -135,7 +157,7 @@ def ListResources(api, creds, options, call_id):
 
         # get the rspec from the aggregate
         server = api.aggregates[aggregate]
-        threads.run(_ListResources, server, credentials, options, call_id)
+        threads.run(_ListResources, aggregate, server, credentials, options, call_id)
 
     results = threads.get_results()
     rspec_version = RSpecVersion(options.get('rspec_version'))
@@ -143,11 +165,14 @@ def ListResources(api, creds, options, call_id):
         rspec = PGRSpec()
     else:
         rspec = SfaRSpec()
+
     for result in results:
-        try:
-            rspec.merge(result)
-        except:
-            api.logger.log_exc("SM.ListResources: Failed to merge aggregate rspec")
+        add_slicemgr_stat(rspec, "ListResources", result["aggregate"], result["elapsed"], result["status"])
+        if result["status"]=="success":
+            try:
+                rspec.merge(result["rspec"])
+            except:
+                api.logger.log_exc("SM.ListResources: Failed to merge aggregate rspec")
 
     # cache the result
     if caching and api.cache and not xrn:
@@ -158,11 +183,12 @@ def ListResources(api, creds, options, call_id):
 
 def CreateSliver(api, xrn, creds, rspec_str, users, call_id):
 
-    def _CreateSliver(server, xrn, credential, rspec, users, call_id):
+    def _CreateSliver(aggregate, server, xrn, credential, rspec, users, call_id):
+        tStart = time.time()
         try:
-            # Need to call GetVersion at an aggregate to determine the supported 
-            # rspec type/format beofre calling CreateSliver at an Aggregate. 
-            server_version = api.get_cached_server_version(server)    
+            # Need to call GetVersion at an aggregate to determine the supported
+            # rspec type/format beofre calling CreateSliver at an Aggregate.
+            server_version = api.get_cached_server_version(server)
             if 'sfa' not in server_version and 'geni_api' in server_version:
                 # sfa aggregtes support both sfa and pg rspecs, no need to convert
                 # if aggregate supports sfa rspecs. otherwise convert to pg rspec
@@ -170,9 +196,11 @@ def CreateSliver(api, xrn, creds, rspec_str, users, call_id):
             args = [xrn, credential, rspec, users]
             if _call_id_supported(api, server):
                 args.append(call_id)
-            return server.CreateSliver(*args)
+            rspec = server.CreateSliver(*args)
+            return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"}
         except: 
             logger.log_exc('Something wrong in _CreateSliver with URL %s'%server.url)
+            return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"}
 
     if Callids().already_handled(call_id): return ""
     # Validate the RSpec against PlanetLab's schema --disabled for now
@@ -183,6 +211,10 @@ def CreateSliver(api, xrn, creds, rspec_str, users, call_id):
     if schema:
         rspec.validate(schema)
 
+    # if there is a <statistics> section, the aggregates don't care about it,
+    # so delete it.
+    drop_slicemgr_stats(rspec)
+
     # attempt to use delegated credential first
     credential = api.getDelegatedCredential(creds)
     if not credential:
@@ -200,12 +232,17 @@ def CreateSliver(api, xrn, creds, rspec_str, users, call_id):
             continue
         server = api.aggregates[aggregate]
         # Just send entire RSpec to each aggregate
-        threads.run(_CreateSliver, server, xrn, credential, rspec.toxml(), users, call_id)
+        threads.run(_CreateSliver, aggregate, server, xrn, credential, rspec.toxml(), users, call_id)
             
     results = threads.get_results()
     rspec = SfaRSpec()
     for result in results:
-        rspec.merge(result)     
+        add_slicemgr_stat(rspec, "CreateSliver", result["aggregate"], result["elapsed"], result["status"])
+        if result["status"]=="success":
+            try:
+                rspec.merge(result["rspec"])
+            except:
+                api.logger.log_exc("SM.CreateSliver: Failed to merge aggregate rspec")
     return rspec.toxml()
 
 def RenewSliver(api, xrn, creds, expiration_time, call_id):
@@ -505,4 +542,4 @@ def main():
 
 if __name__ == "__main__":
     main()
-    
+