rspec version must be a python dict'
[sfa.git] / sfa / managers / slice_manager_pl.py
index 2cbc823..7b57e0f 100644 (file)
@@ -1,4 +1,4 @@
-# 
+#
 import sys
 import time,datetime
 from StringIO import StringIO
@@ -44,7 +44,7 @@ def _call_id_supported(api, server):
         code_tag_parts = code_tag.split("-")
 
         version_parts = code_tag_parts[0].split(".")
-        major, minor = version_parts[0], version_parts[1]
+        major, minor = version_parts[0:2]
         rev = code_tag_parts[1]
         if int(major) > 1:
             if int(minor) > 0 or int(rev) > 20:
@@ -58,11 +58,11 @@ def get_serverproxy_url (server):
         return server.url
     except:
         logger.warning("GetVersion, falling back to xmlrpclib.ServerProxy internals")
-        return server._ServerProxy__host + server._ServerProxy__handler 
+        return server._ServerProxy__host + server._ServerProxy__handler
 
 def GetVersion(api):
     # peers explicitly in aggregates.xml
-    peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems() 
+    peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems()
                    if peername != api.hrn])
     xrn=Xrn (api.hrn)
     request_rspec_versions = [dict(pg_rspec_request_version), dict(sfa_rspec_version)]
@@ -82,26 +82,52 @@ def GetVersion(api):
         sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname'])
     return sm_version
 
+def drop_slicemgr_stats(rspec):
+    try:
+        stats_elements = rspec.xml.xpath('//statistics')
+        for node in stats_elements:
+            node.getparent().remove(node)
+    except Exception, e:
+        api.logger.warn("drop_slicemgr_stats failed: %s " % (str(e)))
+
+def add_slicemgr_stat(rspec, callname, aggname, elapsed, status):
+    try:
+        stats_tags = rspec.xml.xpath('//statistics[@call="%s"]' % callname)
+        if stats_tags:
+            stats_tag = stats_tags[0]
+        else:
+            stats_tag = etree.SubElement(rspec.xml, "statistics", call=callname)
+
+        etree.SubElement(stats_tag, "aggregate", name=str(aggname), elapsed=str(elapsed), status=str(status))
+    except Exception, e:
+        api.logger.warn("add_slicemgr_stat failed on  %s: %s" %(aggname, str(e)))
 
 def ListResources(api, creds, options, call_id):
-    def _ListResources(server, credential, my_opts, call_id):
+    def _ListResources(aggregate, server, credential, opts, call_id):
+
+        my_opts = copy(opts)
         args = [credential, my_opts]
-        if _call_id_supported(api, server):
-            args.append(call_id)
+        tStart = time.time()
         try:
-            return server.ListResources(*args)
+            if _call_id_supported(api, server):
+                args.append(call_id)
+            version = api.get_cached_server_version(server)
+            # force ProtoGENI aggregates to give us a v2 RSpec
+            if 'sfa' not in version.keys():
+                my_opts['rspec_version'] = dict(pg_rspec_ad_version)
+            rspec = server.ListResources(*args)
+            return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"}
         except Exception, e:
             api.logger.warn("ListResources failed at %s: %s" %(server.url, str(e)))
+            return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"}
 
     if Callids().already_handled(call_id): return ""
 
     # get slice's hrn from options
     xrn = options.get('geni_slice_urn', '')
     (hrn, type) = urn_to_hrn(xrn)
-    my_opts = copy(options)
-    my_opts['geni_compressed'] = False
-    if 'rspec_version' in my_opts:
-        del my_opts['rspec_version']
+    if 'geni_compressed' in options:
+        del(options['geni_compressed'])
 
     # get the rspec's return format from options
     rspec_version = RSpecVersion(options.get('rspec_version'))
@@ -128,22 +154,25 @@ def ListResources(api, creds, options, call_id):
         # unless the caller is the aggregate's SM
         if caller_hrn == aggregate and aggregate != api.hrn:
             continue
+
         # get the rspec from the aggregate
         server = api.aggregates[aggregate]
-        #threads.run(server.ListResources, credentials, my_opts, call_id)
-        threads.run(_ListResources, server, credentials, my_opts, call_id)
+        threads.run(_ListResources, aggregate, server, credentials, options, call_id)
 
     results = threads.get_results()
-    rspec_version = RSpecVersion(my_opts.get('rspec_version'))
+    rspec_version = RSpecVersion(options.get('rspec_version'))
     if rspec_version['type'] == pg_rspec_ad_version['type']:
         rspec = PGRSpec()
     else:
         rspec = SfaRSpec()
+
     for result in results:
-        try:
-            rspec.merge(result)
-        except:
-            api.logger.info("SM.ListResources: Failed to merge aggregate rspec")
+        add_slicemgr_stat(rspec, "ListResources", result["aggregate"], result["elapsed"], result["status"])
+        if result["status"]=="success":
+            try:
+                rspec.merge(result["rspec"])
+            except:
+                api.logger.log_exc("SM.ListResources: Failed to merge aggregate rspec")
 
     # cache the result
     if caching and api.cache and not xrn:
@@ -154,24 +183,24 @@ def ListResources(api, creds, options, call_id):
 
 def CreateSliver(api, xrn, creds, rspec_str, users, call_id):
 
-    def _CreateSliver(server, xrn, credential, rspec, users, call_id):
+    def _CreateSliver(aggregate, server, xrn, credential, rspec, users, call_id):
+        tStart = time.time()
         try:
-            # Need to call GetVersion at an aggregate to determine the supported 
-            # rspec type/format beofre calling CreateSliver at an Aggregate. 
-            server_version = api.get_cached_server_version(server)    
-            if 'sfa' not in aggregate_version and 'geni_api' in aggregate_version:
+            # Need to call GetVersion at an aggregate to determine the supported
+            # rspec type/format beofre calling CreateSliver at an Aggregate.
+            server_version = api.get_cached_server_version(server)
+            if 'sfa' not in server_version and 'geni_api' in server_version:
                 # sfa aggregtes support both sfa and pg rspecs, no need to convert
                 # if aggregate supports sfa rspecs. otherwise convert to pg rspec
                 rspec = RSpecConverter.to_pg_rspec(rspec)
             args = [xrn, credential, rspec, users]
             if _call_id_supported(api, server):
                 args.append(call_id)
-            try:
-                return server.CreateSliver(*args)
-            except Exception, e:
-                api.logger.warn("CreateSliver failed at %s: %s" %(server.url, str(e)))
+            rspec = server.CreateSliver(*args)
+            return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"}
         except: 
-            logger.log_exc('Something wrong in _CreateSliver')
+            logger.log_exc('Something wrong in _CreateSliver with URL %s'%server.url)
+            return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"}
 
     if Callids().already_handled(call_id): return ""
     # Validate the RSpec against PlanetLab's schema --disabled for now
@@ -182,6 +211,10 @@ def CreateSliver(api, xrn, creds, rspec_str, users, call_id):
     if schema:
         rspec.validate(schema)
 
+    # if there is a <statistics> section, the aggregates don't care about it,
+    # so delete it.
+    drop_slicemgr_stats(rspec)
+
     # attempt to use delegated credential first
     credential = api.getDelegatedCredential(creds)
     if not credential:
@@ -199,17 +232,22 @@ def CreateSliver(api, xrn, creds, rspec_str, users, call_id):
             continue
         server = api.aggregates[aggregate]
         # Just send entire RSpec to each aggregate
-        threads.run(_CreateSliver, server, xrn, credential, rspec.toxml(), users, call_id)
+        threads.run(_CreateSliver, aggregate, server, xrn, credential, rspec.toxml(), users, call_id)
             
     results = threads.get_results()
     rspec = SfaRSpec()
     for result in results:
-        rspec.merge(result)     
+        add_slicemgr_stat(rspec, "CreateSliver", result["aggregate"], result["elapsed"], result["status"])
+        if result["status"]=="success":
+            try:
+                rspec.merge(result["rspec"])
+            except:
+                api.logger.log_exc("SM.CreateSliver: Failed to merge aggregate rspec")
     return rspec.toxml()
 
 def RenewSliver(api, xrn, creds, expiration_time, call_id):
     def _RenewSliver(server, xrn, creds, expiration_time, call_id):
-        server_version = _get_server_version(api, server)
+        server_version = api.get_cached_server_version(server)
         args =  [xrn, creds, expiration_time, call_id]
         if _call_id_supported(api, server):
             args.append(call_id)
@@ -239,7 +277,7 @@ def RenewSliver(api, xrn, creds, expiration_time, call_id):
 
 def DeleteSliver(api, xrn, creds, call_id):
     def _DeleteSliver(server, xrn, creds, call_id):
-        server_version = _get_server_version(api, server)
+        server_version = api.get_cached_server_version(server)
         args =  [xrn, creds]
         if _call_id_supported(api, server):
             args.append(call_id)
@@ -270,7 +308,7 @@ def DeleteSliver(api, xrn, creds, call_id):
 # first draft at a merging SliverStatus
 def SliverStatus(api, slice_xrn, creds, call_id):
     def _SliverStatus(server, xrn, creds, call_id):
-        server_version = _get_server_version(api, server)
+        server_version = api.get_cached_server_version(server)
         args =  [xrn, creds]
         if _call_id_supported(api, server):
             args.append(call_id)
@@ -312,7 +350,7 @@ caching=True
 #caching=False
 def ListSlices(api, creds, call_id):
     def _ListSlices(server, creds, call_id):
-        server_version = _get_server_version(api, server)
+        server_version = api.get_cached_server_version(server)
         args =  [creds]
         if _call_id_supported(api, server):
             args.append(call_id)
@@ -393,7 +431,7 @@ def get_ticket(api, xrn, creds, rspec, users):
                     continue
                 # send the request to this address 
                 url = target_aggs[0]['url']
-                server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
+                server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file, timeout=30)
                 # aggregate found, no need to keep looping
                 break   
         if server is None:
@@ -504,4 +542,4 @@ def main():
 
 if __name__ == "__main__":
     main()
-    
+