merged
[sfa.git] / sfa / managers / slice_manager_pl.py
index 08529f3..0f7774f 100644 (file)
@@ -1,4 +1,4 @@
-# 
+#
 import sys
 import time,datetime
 from StringIO import StringIO
@@ -15,13 +15,9 @@ from sfa.util.rspec import *
 from sfa.util.specdict import *
 from sfa.util.faults import *
 from sfa.util.record import SfaRecord
-from sfa.rspecs.pg_rspec import PGRSpec
-from sfa.rspecs.sfa_rspec import SfaRSpec
 from sfa.rspecs.rspec_converter import RSpecConverter
-from sfa.rspecs.rspec_parser import parse_rspec    
-from sfa.rspecs.rspec_version import RSpecVersion
-from sfa.rspecs.sfa_rspec import sfa_rspec_version
-from sfa.rspecs.pg_rspec import pg_rspec_ad_version, pg_rspec_request_version   
+from sfa.rspecs.version_manager import VersionManager
+from sfa.rspecs.rspec import RSpec 
 from sfa.util.policy import Policy
 from sfa.util.prefixTree import prefixTree
 from sfa.util.sfaticket import *
@@ -58,22 +54,29 @@ def get_serverproxy_url (server):
         return server.url
     except:
         logger.warning("GetVersion, falling back to xmlrpclib.ServerProxy internals")
-        return server._ServerProxy__host + server._ServerProxy__handler 
+        return server._ServerProxy__host + server._ServerProxy__handler
 
 def GetVersion(api):
     # peers explicitly in aggregates.xml
-    peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems() 
+    peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems()
                    if peername != api.hrn])
-    xrn=Xrn (api.hrn)
-    request_rspec_versions = [dict(pg_rspec_request_version), dict(sfa_rspec_version)]
-    ad_rspec_versions = [dict(pg_rspec_ad_version), dict(sfa_rspec_version)]
+    version_manager = VersionManager()
+    ad_rspec_versions = []
+    request_rspec_versions = []
+    for rspec_version in version_manager.versions:
+        if rspec_version in ['*', 'ad']:
+            request_rspec_versions.append(rspec_version.to_dict())
+        if rspec_version in ['*', 'request']:
+            request_rspec_version.append(rspec_version.to_dict())
+    default_rspec_version = version_manager.get_version("sfa 1").to_dict()
+    xrn=Xrn(api.hrn)
     version_more = {'interface':'slicemgr',
                     'hrn' : xrn.get_hrn(),
                     'urn' : xrn.get_urn(),
                     'peers': peers,
                     'request_rspec_versions': request_rspec_versions,
                     'ad_rspec_versions': ad_rspec_versions,
-                    'default_ad_rspec': dict(sfa_rspec_version)
+                    'default_ad_rspec': default_rspec_version
                     }
     sm_version=version_core(version_more)
     # local aggregate if present needs to have localhost resolved
@@ -82,30 +85,57 @@ def GetVersion(api):
         sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname'])
     return sm_version
 
+def drop_slicemgr_stats(rspec):
+    try:
+        stats_elements = rspec.xml.xpath('//statistics')
+        for node in stats_elements:
+            node.getparent().remove(node)
+    except Exception, e:
+        api.logger.warn("drop_slicemgr_stats failed: %s " % (str(e)))
+
+def add_slicemgr_stat(rspec, callname, aggname, elapsed, status):
+    try:
+        stats_tags = rspec.xml.xpath('//statistics[@call="%s"]' % callname)
+        if stats_tags:
+            stats_tag = stats_tags[0]
+        else:
+            stats_tag = etree.SubElement(rspec.xml.root, "statistics", call=callname)
+
+        etree.SubElement(stats_tag, "aggregate", name=str(aggname), elapsed=str(elapsed), status=str(status))
+    except Exception, e:
+        api.logger.warn("add_slicemgr_stat failed on  %s: %s" %(aggname, str(e)))
 
 def ListResources(api, creds, options, call_id):
-    def _ListResources(server, credential, my_opts, call_id):
+    version_manager = VersionManager()
+    def _ListResources(aggregate, server, credential, opts, call_id):
+
+        my_opts = copy(opts)
         args = [credential, my_opts]
-        if _call_id_supported(api, server):
-            args.append(call_id)
+        tStart = time.time()
         try:
-            return server.ListResources(*args)
+            if _call_id_supported(api, server):
+                args.append(call_id)
+            version = api.get_cached_server_version(server)
+            # force ProtoGENI aggregates to give us a v2 RSpec
+            if 'sfa' not in version.keys():
+                my_opts['rspec_version'] = version_manager.get_version('ProtoGENI 2').to_dict()
+            rspec = server.ListResources(*args)
+            return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"}
         except Exception, e:
-            api.logger.warn("ListResources failed at %s: %s" %(server.url, str(e)))
+            api.logger.log_exc("ListResources failed at %s: %s" %(server.url))
+            return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"}
 
     if Callids().already_handled(call_id): return ""
 
     # get slice's hrn from options
     xrn = options.get('geni_slice_urn', '')
     (hrn, type) = urn_to_hrn(xrn)
-    my_opts = copy(options)
-    my_opts['geni_compressed'] = False
-    if 'rspec_version' in my_opts:
-        del my_opts['rspec_version']
+    if 'geni_compressed' in options:
+        del(options['geni_compressed'])
 
     # get the rspec's return format from options
-    rspec_version = RSpecVersion(options.get('rspec_version'))
-    version_string = "rspec_%s" % (rspec_version.get_version_name())
+    rspec_version = version_manager.get_version(options.get('rspec_version'))
+    version_string = "rspec_%s" % (rspec_version.to_string())
 
     # look in cache first
     if caching and api.cache and not xrn:
@@ -131,20 +161,22 @@ def ListResources(api, creds, options, call_id):
 
         # get the rspec from the aggregate
         server = api.aggregates[aggregate]
-        #threads.run(server.ListResources, credentials, my_opts, call_id)
-        threads.run(_ListResources, server, credentials, my_opts, call_id)
+        threads.run(_ListResources, aggregate, server, credentials, options, call_id)
 
     results = threads.get_results()
-    rspec_version = RSpecVersion(my_opts.get('rspec_version'))
-    if rspec_version['type'] == pg_rspec_ad_version['type']:
-        rspec = PGRSpec()
-    else:
-        rspec = SfaRSpec()
+    rspec_version = version_manager.get_version(options.get('rspec_version'))
+    if xrn:    
+        result_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'manifest')
+    else: 
+        result_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'ad')
+    rspec = RSpec(version=result_version)
     for result in results:
-        try:
-            rspec.merge(result)
-        except:
-            api.logger.info("SM.ListResources: Failed to merge aggregate rspec")
+        add_slicemgr_stat(rspec, "ListResources", result["aggregate"], result["elapsed"], result["status"])
+        if result["status"]=="success":
+            try:
+                rspec.version.merge(result["rspec"])
+            except:
+                api.logger.log_exc("SM.ListResources: Failed to merge aggregate rspec")
 
     # cache the result
     if caching and api.cache and not xrn:
@@ -155,31 +187,41 @@ def ListResources(api, creds, options, call_id):
 
 def CreateSliver(api, xrn, creds, rspec_str, users, call_id):
 
-    def _CreateSliver(server, xrn, credential, rspec, users, call_id):
+    version_manager = VersionManager()
+    def _CreateSliver(aggregate, server, xrn, credential, rspec, users, call_id):
+        tStart = time.time()
         try:
-            # Need to call GetVersion at an aggregate to determine the supported 
-            # rspec type/format beofre calling CreateSliver at an Aggregate. 
-            server_version = api.get_cached_server_version(server)    
+            # Need to call GetVersion at an aggregate to determine the supported
+            # rspec type/format beofre calling CreateSliver at an Aggregate.
+            server_version = api.get_cached_server_version(server)
+            requested_users = users
             if 'sfa' not in server_version and 'geni_api' in server_version:
                 # sfa aggregtes support both sfa and pg rspecs, no need to convert
                 # if aggregate supports sfa rspecs. otherwise convert to pg rspec
-                rspec = RSpecConverter.to_pg_rspec(rspec)
-            args = [xrn, credential, rspec, users]
+                rspec = RSpecConverter.to_pg_rspec(rspec, 'request')
+                requested_users = sfa_to_pg_users(users)
+            args = [xrn, credential, rspec, requested_users]
             if _call_id_supported(api, server):
                 args.append(call_id)
-            return server.CreateSliver(*args)
+            rspec = server.CreateSliver(*args)
+            return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"}
         except: 
             logger.log_exc('Something wrong in _CreateSliver with URL %s'%server.url)
+            return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"}
 
     if Callids().already_handled(call_id): return ""
     # Validate the RSpec against PlanetLab's schema --disabled for now
     # The schema used here needs to aggregate the PL and VINI schemas
     # schema = "/var/www/html/schemas/pl.rng"
-    rspec = parse_rspec(rspec_str)
+    rspec = RSpec(rspec_str)
     schema = None
     if schema:
         rspec.validate(schema)
 
+    # if there is a <statistics> section, the aggregates don't care about it,
+    # so delete it.
+    drop_slicemgr_stats(rspec)
+
     # attempt to use delegated credential first
     credential = api.getDelegatedCredential(creds)
     if not credential:
@@ -197,13 +239,19 @@ def CreateSliver(api, xrn, creds, rspec_str, users, call_id):
             continue
         server = api.aggregates[aggregate]
         # Just send entire RSpec to each aggregate
-        threads.run(_CreateSliver, server, xrn, credential, rspec.toxml(), users, call_id)
+        threads.run(_CreateSliver, aggregate, server, xrn, credential, rspec.toxml(), users, call_id)
             
     results = threads.get_results()
-    rspec = SfaRSpec()
+    manifest_version = version_manager._get_version(rspec.version.type, rspec.version.version, 'manifest')
+    result_rspec = RSpec(version=manifest_version)
     for result in results:
-        rspec.merge(result)     
-    return rspec.toxml()
+        add_slicemgr_stat(result_rspec, "CreateSliver", result["aggregate"], result["elapsed"], result["status"])
+        if result["status"]=="success":
+            try:
+                result_rspec.version.merge(result["rspec"])
+            except:
+                api.logger.log_exc("SM.CreateSliver: Failed to merge aggregate rspec")
+    return result_rspec.toxml()
 
 def RenewSliver(api, xrn, creds, expiration_time, call_id):
     def _RenewSliver(server, xrn, creds, expiration_time, call_id):
@@ -310,7 +358,7 @@ caching=True
 #caching=False
 def ListSlices(api, creds, call_id):
     def _ListSlices(server, creds, call_id):
-        server_version = api.get_cached_server_version(api, server)
+        server_version = api.get_cached_server_version(server)
         args =  [creds]
         if _call_id_supported(api, server):
             args.append(call_id)
@@ -391,7 +439,7 @@ def get_ticket(api, xrn, creds, rspec, users):
                     continue
                 # send the request to this address 
                 url = target_aggs[0]['url']
-                server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
+                server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file, timeout=30)
                 # aggregate found, no need to keep looping
                 break   
         if server is None:
@@ -502,4 +550,4 @@ def main():
 
 if __name__ == "__main__":
     main()
-    
+