Merge remote-tracking branch 'origin/geni-v3' into geni-v3
authorGit User <support@planet-lab.org>
Thu, 14 Nov 2013 07:34:43 +0000 (02:34 -0500)
committerGit User <support@planet-lab.org>
Thu, 14 Nov 2013 07:34:43 +0000 (02:34 -0500)
35 files changed:
docs/dbsession.readme [new file with mode: 0644]
sfa/client/sfaadmin.py
sfa/dummy/dummyaggregate.py
sfa/dummy/dummydriver.py
sfa/dummy/dummyslices.py
sfa/federica/fddriver.py
sfa/generic/__init__.py
sfa/generic/architecture.txt
sfa/importer/__init__.py
sfa/importer/dummyimporter.py
sfa/importer/iotlabimporter.py
sfa/importer/nitosimporter.py
sfa/importer/openstackimporter.py
sfa/importer/plimporter.py
sfa/managers/aggregate_manager.py
sfa/managers/driver.py
sfa/managers/registry_manager.py
sfa/managers/v2_to_v3_adapter.py
sfa/methods/Delete.py
sfa/methods/Describe.py
sfa/methods/GetSelfCredential.py
sfa/methods/Provision.py
sfa/methods/Renew.py
sfa/methods/Shutdown.py
sfa/methods/Status.py
sfa/nitos/nitosdriver.py
sfa/openstack/nova_driver.py
sfa/openstack/osaggregate.py
sfa/planetlab/plaggregate.py
sfa/planetlab/pldriver.py
sfa/planetlab/plslices.py
sfa/server/sfaapi.py
sfa/server/threadedserver.py
sfa/storage/alchemy.py
sfa/storage/model.py

diff --git a/docs/dbsession.readme b/docs/dbsession.readme
new file mode 100644 (file)
index 0000000..8ee608a
--- /dev/null
@@ -0,0 +1,37 @@
+As of Nov. 2013, when moving from 3.0 to 3.2
+--------------------------------------------
+
+* driver-creation
+. expect an api instead of a config (and api.config is set)
+
+* managers
+. cannot access their driver from self, but from the context (api is passed to methods)
+
+* dbsession : implementation
+. storage.Alchemy still exports a global dbsession object, but named global_session(); together with close_global_session()
+. storage.Alchemy also exports a method called session(), that is *NOT* managed - caller is expected to close_session()
+. storage.Alchemy only exports global_session (as alchemy.global_session) 
+  so that any code that would still need adaptation will break at import time
+
+* dbsession : usage
+. use api.dbsession() whenever possible
+. it's fair to have importers and sfaadmin use the global session (there won't be instances of api in their case)
+. there is one or 2 exceptions where dbsession is retrieved from an sqlalchemy object but this is deemed poor practice, please DO NOT do this as far as possible
+
+---
+OTHER NOTES:
+
+* iotlab/cortexlab:
+. while browsing this code I noticed that the code for cortexlab seems very close to the one for iotlab
+  I wonder if some inheritance would have allowed to reduce code duplication
+  so I'll forget about cortexlab for now as all/most of the folowing comments probably apply as-is to cortex
+
+* iotlab/iotlabapi
+. it's confusing that the class name here does not match the filename (class IotlabTestbedAPI in iotlabapi.py)
+. IIUC this could/should be renamed IotlabShell (in iotlabshell.py) instead, that's exactly what our notion of a shell is
+. regardless; in order to fetch dbsession() from the context api, I tried to tweak iotlabtestbedapi so that it also takes an api instead of a config in argument
+  however I am puzzled at why most(all?) the IotlabTestbadAPI methods that actually use dbsession are labelled as methodstatic ?
+  Is this a strong constraint ? 
+  It would help me a lot if this could be made a regular class, as opposed to what looks like a mere namespace, so we can retrieve dbsession() from an api object
+
+
index b7b45d4..82c915c 100755 (executable)
@@ -125,9 +125,8 @@ class RegistryCommands(Commands):
         """Check the correspondance between the GID and the PubKey"""
 
         # db records
-        from sfa.storage.alchemy import dbsession
         from sfa.storage.model import RegRecord
-        db_query = dbsession.query(RegRecord).filter_by(type=type)
+        db_query = self.api.dbsession().query(RegRecord).filter_by(type=type)
         if xrn and not all:
             hrn = Xrn(xrn).get_hrn()
             db_query = db_query.filter_by(hrn=hrn)
@@ -315,10 +314,9 @@ class CertCommands(Commands):
     @args('-o', '--outfile', dest='outfile', metavar='<outfile>', help='output file', default=None)
     def export(self, xrn, type=None, outfile=None):
         """Fetch an object's GID from the Registry"""  
-        from sfa.storage.alchemy import dbsession
         from sfa.storage.model import RegRecord
         hrn = Xrn(xrn).get_hrn()
-        request=dbsession.query(RegRecord).filter_by(hrn=hrn)
+        request=self.api.dbsession().query(RegRecord).filter_by(hrn=hrn)
         if type: request = request.filter_by(type=type)
         record=request.first()
         if record:
index dca47b5..c6381a6 100644 (file)
@@ -19,7 +19,6 @@ from sfa.rspecs.version_manager import VersionManager
 
 from sfa.dummy.dummyxrn import DummyXrn, hostname_to_urn, hrn_to_dummy_slicename, slicename_to_hrn
 
-from sfa.storage.alchemy import dbsession
 from sfa.storage.model import SliverAllocation
 import time
 
@@ -243,7 +242,7 @@ class DummyAggregate:
         geni_urn = urns[0]
         sliver_ids = [sliver['sliver_id'] for sliver in slivers]
         constraint = SliverAllocation.sliver_id.in_(sliver_ids)
-        sliver_allocations = dbsession.query(SliverAllocation).filter(constraint)
+        sliver_allocations = self.driver.api.dbsession().query(SliverAllocation).filter(constraint)
         sliver_allocation_dict = {}
         for sliver_allocation in sliver_allocations:
             geni_urn = sliver_allocation.slice_urn
index 0d0514e..b33c85d 100644 (file)
@@ -11,7 +11,6 @@ from sfa.util.xrn import Xrn, hrn_to_urn, get_leaf
 from sfa.util.cache import Cache
 
 # one would think the driver should not need to mess with the SFA db, but..
-from sfa.storage.alchemy import dbsession
 from sfa.storage.model import RegRecord, SliverAllocation
 from sfa.trust.credential import Credential
 
@@ -44,9 +43,9 @@ class DummyDriver (Driver):
     # the cache instance is a class member so it survives across incoming requests
     cache = None
 
-    def __init__ (self, config):
-        Driver.__init__ (self, config)
-        self.config = config
+    def __init__ (self, api):
+        Driver.__init__ (self, api)
+        config = api.config
         self.hrn = config.SFA_INTERFACE_HRN
         self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
         self.shell = DummyShell (config)
@@ -336,7 +335,7 @@ class DummyDriver (Driver):
         
         # get the registry records
         user_list, users = [], {}
-        user_list = dbsession.query (RegRecord).filter(RegRecord.pointer.in_(user_ids))
+        user_list = self.api.dbsession().query (RegRecord).filter(RegRecord.pointer.in_(user_ids))
         # create a hrns keyed on the sfa record's pointer.
         # Its possible for multiple records to have the same pointer so
         # the dict's value will be a list of hrns.
@@ -464,7 +463,8 @@ class DummyDriver (Driver):
         #users = slices.verify_users(None, slice, geni_users, options=options)
         # update sliver allocation states and set them to geni_provisioned
         sliver_ids = [sliver['sliver_id'] for sliver in slivers]
-        SliverAllocation.set_allocations(sliver_ids, 'geni_provisioned')
+        dbsession=self.api.dbsession()
+        SliverAllocation.set_allocations(sliver_ids, 'geni_provisioned',dbsession)
         version_manager = VersionManager()
         rspec_version = version_manager.get_version(options['geni_rspec_version'])
         return self.describe(urns, rspec_version, options=options)
@@ -490,7 +490,8 @@ class DummyDriver (Driver):
             try:
                 self.shell.DeleteSliceFromNodes({'slice_id': slice_id, 'node_ids': node_ids})
                 # delete sliver allocation states
-                SliverAllocation.delete_allocations(sliver_ids)
+                dbsession=self.api.dbsession()
+                SliverAllocation.delete_allocations(sliver_ids,dbsession)
             finally:
                 pass
 
index dddf1a6..cf5a6da 100644 (file)
@@ -8,7 +8,6 @@ from sfa.util.xrn import Xrn, get_leaf, get_authority, urn_to_hrn
 
 from sfa.rspecs.rspec import RSpec
 from sfa.storage.model import SliverAllocation
-from sfa.storage.alchemy import dbsession
 
 from sfa.dummy.dummyxrn import DummyXrn, hrn_to_dummy_slicename
 
@@ -107,7 +106,7 @@ class DummySlices:
                                       component_id=component_id,
                                       slice_urn = slice_urn,
                                       allocation_state='geni_allocated')
-            record.sync()
+            record.sync(self.driver.api.dbsession())
         return resulting_nodes
         
 
index 96c7aa4..cec702d 100644 (file)
@@ -23,8 +23,9 @@ federica_version_string="RSpecV2"
 
 class FdDriver (PlDriver):
 
-    def __init__ (self,config): 
-        PlDriver.__init__ (self, config)
+    def __init__ (self,api): 
+        PlDriver.__init__ (self, api)
+        config = api.config
         self.shell=FdShell(config)
 
     # the agreement with the federica driver is for them to expose results in a way
index 99d15bc..ece7e2b 100644 (file)
@@ -67,16 +67,12 @@ class Generic:
         # xxx can probably drop support for managers implemented as modules 
         # which makes it a bit awkward
         manager_class_or_module = self.make_manager(api.interface)
-        driver = self.make_driver (api.config, api.interface)
+        driver = self.make_driver (api)
         ### arrange stuff together
         # add a manager wrapper
         manager_wrap = ManagerWrapper(manager_class_or_module,api.interface,api.config)
         api.manager=manager_wrap
-        # insert driver in manager
-        logger.debug("Setting manager.driver, manager=%s"%manager_class_or_module)
-        # xxx this should go into the object and not the class !?!
-        manager_class_or_module.driver=driver
-        # add it in api as well for convenience
+        # add it in api as well; driver.api is set too as part of make_driver
         api.driver=driver
         return api
 
@@ -100,7 +96,9 @@ class Generic:
             logger.log_exc_critical(message)
         
     # need interface to select the right driver
-    def make_driver (self, config, interface):
+    def make_driver (self, api):
+        config=api.config
+        interface=api.interface
         flavour = self.flavour
         message="Generic.make_driver for flavour=%s and interface=%s"%(flavour,interface)
         
@@ -111,7 +109,7 @@ class Generic:
         try:
             class_obj = getattr(self,classname)()
             logger.debug("%s : %s"%(message,class_obj))
-            return class_obj(config)
+            return class_obj(api)
         except:
             logger.log_exc_critical(message)
         
index ff63549..fe793bf 100644 (file)
@@ -20,8 +20,8 @@ configurable in a flavour (e.g. sfa.generic.pl.py)
   following layout:
 
 api.manager 
-manager.driver
-api.driver (for convenience)
+api.driver
+driver.api
 
 ------
 example
index 35f8acd..d1721da 100644 (file)
@@ -10,7 +10,9 @@ from sfa.util.sfalogging import _SfaLogger
 from sfa.trust.hierarchy import Hierarchy
 #from sfa.trust.trustedroots import TrustedRoots
 from sfa.trust.gid import create_uuid
-from sfa.storage.alchemy import dbsession
+# using global alchemy.session() here is fine 
+# as importer is on standalone one-shot process
+from sfa.storage.alchemy import global_dbsession
 from sfa.storage.model import RegRecord, RegAuthority, RegUser
 from sfa.trust.certificate import convert_public_key, Keypair
 
@@ -35,7 +37,7 @@ class Importer:
    
     # check before creating a RegRecord entry as we run this over and over
     def record_exists (self, type, hrn):
-       return dbsession.query(RegRecord).filter_by(hrn=hrn,type=type).count()!=0 
+       return global_dbsession.query(RegRecord).filter_by(hrn=hrn,type=type).count()!=0 
 
     def create_top_level_auth_records(self, hrn):
         """
@@ -56,8 +58,8 @@ class Importer:
             auth_record = RegAuthority(hrn=hrn, gid=auth_info.get_gid_object(),
                                        authority=get_authority(hrn))
             auth_record.just_created()
-            dbsession.add (auth_record)
-            dbsession.commit()
+            global_dbsession.add (auth_record)
+            global_dbsession.commit()
             self.logger.info("SfaImporter: imported authority (parent) %s " % auth_record)     
    
 
@@ -76,8 +78,8 @@ class Importer:
         user_record = RegUser(hrn=hrn, gid=auth_info.get_gid_object(),
                               authority=get_authority(hrn))
         user_record.just_created()
-        dbsession.add (user_record)
-        dbsession.commit()
+        global_dbsession.add (user_record)
+        global_dbsession.commit()
         self.logger.info("SfaImporter: importing user (slicemanager) %s " % user_record)
 
 
@@ -98,8 +100,8 @@ class Importer:
             interface_record = RegAuthority(type=type, hrn=hrn, gid=gid,
                                             authority=get_authority(hrn))
             interface_record.just_created()
-            dbsession.add (interface_record)
-            dbsession.commit()
+            global_dbsession.add (interface_record)
+            global_dbsession.commit()
             self.logger.info("SfaImporter: imported authority (%s) %s " % (type,interface_record))
  
     def run(self, options=None):
index 5001849..d274b27 100644 (file)
@@ -23,7 +23,9 @@ from sfa.util.xrn import Xrn, get_leaf, get_authority, hrn_to_urn
 from sfa.trust.gid import create_uuid    
 from sfa.trust.certificate import convert_public_key, Keypair
 
-from sfa.storage.alchemy import dbsession
+# using global alchemy.session() here is fine 
+# as importer is on standalone one-shot process
+from sfa.storage.alchemy import global_dbsession
 from sfa.storage.model import RegRecord, RegAuthority, RegSlice, RegNode, RegUser, RegKey
 
 from sfa.dummy.dummyshell import DummyShell    
@@ -99,7 +101,7 @@ class DummyImporter:
         shell = DummyShell (config)
 
         ######## retrieve all existing SFA objects
-        all_records = dbsession.query(RegRecord).all()
+        all_records = global_dbsession.query(RegRecord).all()
 
         # create hash by (type,hrn) 
         # we essentially use this to know if a given record is already known to SFA 
@@ -159,8 +161,8 @@ class DummyImporter:
                                                pointer= -1,
                                                authority=get_authority(site_hrn))
                     site_record.just_created()
-                    dbsession.add(site_record)
-                    dbsession.commit()
+                    global_dbsession.add(site_record)
+                    global_dbsession.commit()
                     self.logger.info("DummyImporter: imported authority (site) : %s" % site_record) 
                     self.remember_record (site_record)
                 except:
@@ -190,8 +192,8 @@ class DummyImporter:
                                                pointer =node['node_id'],
                                                authority=get_authority(node_hrn))
                         node_record.just_created()
-                        dbsession.add(node_record)
-                        dbsession.commit()
+                        global_dbsession.add(node_record)
+                        global_dbsession.commit()
                         self.logger.info("DummyImporter: imported node: %s" % node_record)  
                         self.remember_record (node_record)
                     except:
@@ -249,8 +251,8 @@ class DummyImporter:
                         else:
                             self.logger.warning("No key found for user %s"%user_record)
                         user_record.just_created()
-                        dbsession.add (user_record)
-                        dbsession.commit()
+                        global_dbsession.add (user_record)
+                        global_dbsession.commit()
                         self.logger.info("DummyImporter: imported person: %s" % user_record)
                         self.remember_record ( user_record )
 
@@ -277,7 +279,7 @@ class DummyImporter:
                                 user_record.reg_keys=[ RegKey (pubkey)]
                             self.logger.info("DummyImporter: updated person: %s" % user_record)
                     user_record.email = user['email']
-                    dbsession.commit()
+                    global_dbsession.commit()
                     user_record.stale=False
                 except:
                     self.logger.log_exc("DummyImporter: failed to import user %d %s"%(user['user_id'],user['email']))
@@ -296,8 +298,8 @@ class DummyImporter:
                                                  pointer=slice['slice_id'],
                                                  authority=get_authority(slice_hrn))
                         slice_record.just_created()
-                        dbsession.add(slice_record)
-                        dbsession.commit()
+                        global_dbsession.add(slice_record)
+                        global_dbsession.commit()
                         self.logger.info("DummyImporter: imported slice: %s" % slice_record)  
                         self.remember_record ( slice_record )
                     except:
@@ -309,7 +311,7 @@ class DummyImporter:
                 # record current users affiliated with the slice
                 slice_record.reg_researchers = \
                     [ self.locate_by_type_pointer ('user',user_id) for user_id in slice['user_ids'] ]
-                dbsession.commit()
+                global_dbsession.commit()
                 slice_record.stale=False
 
         ### remove stale records
@@ -328,5 +330,5 @@ class DummyImporter:
                 self.logger.warning("stale not found with %s"%record)
             if stale:
                 self.logger.info("DummyImporter: deleting stale record: %s" % record)
-                dbsession.delete(record)
-                dbsession.commit()
+                global_dbsession.delete(record)
+                global_dbsession.commit()
index 8687437..bfa094b 100644 (file)
@@ -11,7 +11,9 @@ from sfa.iotlab.iotlabpostgres import TestbedAdditionalSfaDB
 from sfa.trust.certificate import Keypair, convert_public_key
 from sfa.trust.gid import create_uuid
 
-from sfa.storage.alchemy import dbsession
+# using global alchemy.session() here is fine 
+# as importer is on standalone one-shot process
+from sfa.storage.alchemy import global_dbsession
 from sfa.storage.model import RegRecord, RegAuthority, RegSlice, RegNode, \
     RegUser, RegKey
 
@@ -43,7 +45,7 @@ class IotlabImporter:
         self.logger = loc_logger
         self.logger.setLevelDebug()
         #retrieve all existing SFA objects
-        self.all_records = dbsession.query(RegRecord).all()
+        self.all_records = global_dbsession.query(RegRecord).all()
 
         # initialize record.stale to True by default,
         # then mark stale=False on the ones that are in use
@@ -213,8 +215,8 @@ class IotlabImporter:
                 try:
 
                     node_record.just_created()
-                    dbsession.add(node_record)
-                    dbsession.commit()
+                    global_dbsession.add(node_record)
+                    global_dbsession.commit()
                     self.logger.info("IotlabImporter: imported node: %s"
                                      % node_record)
                     self.update_just_added_records_dict(node_record)
@@ -259,8 +261,8 @@ class IotlabImporter:
                                      pointer='-1',
                                      authority=get_authority(site_hrn))
                     site_record.just_created()
-                    dbsession.add(site_record)
-                    dbsession.commit()
+                    global_dbsession.add(site_record)
+                    global_dbsession.commit()
                     self.logger.info("IotlabImporter: imported authority \
                                     (site) %s" % site_record)
                     self.update_just_added_records_dict(site_record)
@@ -404,8 +406,8 @@ class IotlabImporter:
 
                         try:
                             user_record.just_created()
-                            dbsession.add (user_record)
-                            dbsession.commit()
+                            global_dbsession.add (user_record)
+                            global_dbsession.commit()
                             self.logger.info("IotlabImporter: imported person \
                                             %s" % (user_record))
                             self.update_just_added_records_dict(user_record)
@@ -440,7 +442,7 @@ class IotlabImporter:
                     user_record.email = person['email']
 
             try:
-                dbsession.commit()
+                global_dbsession.commit()
                 user_record.stale = False
             except SQLAlchemyError:
                 self.logger.log_exc("IotlabImporter: \
@@ -478,8 +480,8 @@ class IotlabImporter:
                                     authority=get_authority(slice_hrn))
             try:
                 slice_record.just_created()
-                dbsession.add(slice_record)
-                dbsession.commit()
+                global_dbsession.add(slice_record)
+                global_dbsession.commit()
 
 
                 self.update_just_added_records_dict(slice_record)
@@ -497,7 +499,7 @@ class IotlabImporter:
 
         slice_record.reg_researchers = [user_record]
         try:
-            dbsession.commit()
+            global_dbsession.commit()
             slice_record.stale = False
         except SQLAlchemyError:
             self.logger.log_exc("IotlabImporter: failed to update slice")
@@ -551,8 +553,8 @@ class IotlabImporter:
                                  % (record))
 
                 try:
-                    dbsession.delete(record)
-                    dbsession.commit()
+                    global_dbsession.delete(record)
+                    global_dbsession.commit()
                 except SQLAlchemyError:
                     self.logger.log_exc("IotlabImporter: failed to delete \
                         stale record %s" % (record))
index 78bccc4..425be77 100644 (file)
@@ -7,7 +7,9 @@ from sfa.util.xrn import Xrn, get_leaf, get_authority, hrn_to_urn
 from sfa.trust.gid import create_uuid    
 from sfa.trust.certificate import convert_public_key, Keypair
 
-from sfa.storage.alchemy import dbsession
+# using global alchemy.session() here is fine 
+# as importer is on standalone one-shot process
+from sfa.storage.alchemy import global_dbsession
 from sfa.storage.model import RegRecord, RegAuthority, RegSlice, RegNode, RegUser, RegKey
 
 from sfa.nitos.nitosshell import NitosShell    
@@ -83,7 +85,7 @@ class NitosImporter:
         shell = NitosShell (config)
 
         ######## retrieve all existing SFA objects
-        all_records = dbsession.query(RegRecord).all()
+        all_records = global_dbsession.query(RegRecord).all()
 
         # create hash by (type,hrn) 
         # we essentially use this to know if a given record is already known to SFA 
@@ -146,8 +148,8 @@ class NitosImporter:
                                                pointer=0,
                                                authority=get_authority(site_hrn))
                     site_record.just_created()
-                    dbsession.add(site_record)
-                    dbsession.commit()
+                    global_dbsession.add(site_record)
+                    global_dbsession.commit()
                     self.logger.info("NitosImporter: imported authority (site) : %s" % site_record) 
                     self.remember_record (site_record)
                 except:
@@ -177,8 +179,8 @@ class NitosImporter:
                                                pointer =node['node_id'],
                                                authority=get_authority(node_hrn))
                         node_record.just_created()
-                        dbsession.add(node_record)
-                        dbsession.commit()
+                        global_dbsession.add(node_record)
+                        global_dbsession.commit()
                         self.logger.info("NitosImporter: imported node: %s" % node_record)  
                         self.remember_record (node_record)
                     except:
@@ -236,8 +238,8 @@ class NitosImporter:
                         else:
                             self.logger.warning("No key found for user %s"%user_record)
                         user_record.just_created()
-                        dbsession.add (user_record)
-                        dbsession.commit()
+                        global_dbsession.add (user_record)
+                        global_dbsession.commit()
                         self.logger.info("NitosImporter: imported user: %s" % user_record)
                         self.remember_record ( user_record )
                     else:
@@ -270,7 +272,7 @@ class NitosImporter:
                             user_record.just_updated()
                             self.logger.info("NitosImporter: updated user: %s" % user_record)
                     user_record.email = user['email']
-                    dbsession.commit()
+                    global_dbsession.commit()
                     user_record.stale=False
                 except:
                     self.logger.log_exc("NitosImporter: failed to import user %s %s"%(user['user_id'],user['email']))
@@ -289,8 +291,8 @@ class NitosImporter:
                                                  pointer=slice['slice_id'],
                                                  authority=get_authority(slice_hrn))
                         slice_record.just_created()
-                        dbsession.add(slice_record)
-                        dbsession.commit()
+                        global_dbsession.add(slice_record)
+                        global_dbsession.commit()
                         self.logger.info("NitosImporter: imported slice: %s" % slice_record)  
                         self.remember_record ( slice_record )
                     except:
@@ -302,7 +304,7 @@ class NitosImporter:
                 # record current users affiliated with the slice
                 slice_record.reg_researchers = \
                       [ self.locate_by_type_pointer ('user',int(user_id)) for user_id in slice['user_ids'] ]
-                dbsession.commit()
+                global_dbsession.commit()
                 slice_record.stale=False
 
 
@@ -322,7 +324,7 @@ class NitosImporter:
                 self.logger.warning("stale not found with %s"%record)
             if stale:
                 self.logger.info("NitosImporter: deleting stale record: %s" % record)
-                dbsession.delete(record)
-                dbsession.commit()
+                global_dbsession.delete(record)
+                global_dbsession.commit()
 
 
index 0cf729c..c8233bd 100644 (file)
@@ -4,7 +4,9 @@ from sfa.util.config import Config
 from sfa.util.xrn import Xrn, get_leaf, get_authority, hrn_to_urn
 from sfa.trust.gid import create_uuid    
 from sfa.trust.certificate import convert_public_key, Keypair
-from sfa.storage.alchemy import dbsession
+# using global alchemy.session() here is fine 
+# as importer is on standalone one-shot process
+from sfa.storage.alchemy import global_dbsession
 from sfa.storage.model import RegRecord, RegAuthority, RegUser, RegSlice, RegNode
 from sfa.openstack.osxrn import OSXrn
 from sfa.openstack.shell import Shell    
@@ -79,8 +81,8 @@ class OpenstackImporter:
                 user_record.hrn=hrn
                 user_record.gid=user_gid
                 user_record.authority=get_authority(hrn)
-                dbsession.add(user_record)
-                dbsession.commit()
+                global_dbsession.add(user_record)
+                global_dbsession.commit()
                 self.logger.info("OpenstackImporter: imported person %s" % user_record)   
 
         return users_dict, user_keys
@@ -112,8 +114,8 @@ class OpenstackImporter:
                 record.hrn=hrn
                 record.gid=gid
                 record.authority=get_authority(hrn)
-                dbsession.add(record)
-                dbsession.commit()
+                global_dbsession.add(record)
+                global_dbsession.commit()
                 self.logger.info("OpenstackImporter: imported authority: %s" % record)
 
             else:
@@ -125,8 +127,8 @@ class OpenstackImporter:
                 record.hrn=hrn
                 record.gid=gid
                 record.authority=get_authority(hrn)
-                dbsession.add(record)
-                dbsession.commit()
+                global_dbsession.add(record)
+                global_dbsession.commit()
                 self.logger.info("OpenstackImporter: imported slice: %s" % record) 
 
         return tenants_dict
@@ -139,7 +141,7 @@ class OpenstackImporter:
         existing_records = {}
         existing_hrns = []
         key_ids = []
-        for record in dbsession.query(RegRecord):
+        for record in global_dbsession.query(RegRecord):
             existing_records[ (record.hrn, record.type,) ] = record
             existing_hrns.append(record.hrn) 
             
@@ -168,8 +170,8 @@ class OpenstackImporter:
         
             record_object = existing_records[ (record_hrn, type) ]
             self.logger.info("OpenstackImporter: removing %s " % record)
-            dbsession.delete(record_object)
-            dbsession.commit()
+            global_dbsession.delete(record_object)
+            global_dbsession.commit()
                                    
         # save pub keys
         self.logger.info('OpenstackImporter: saving current pub keys')
index 7f5cf86..d281c61 100644 (file)
@@ -24,7 +24,9 @@ from sfa.util.xrn import Xrn, get_leaf, get_authority, hrn_to_urn
 from sfa.trust.gid import create_uuid    
 from sfa.trust.certificate import convert_public_key, Keypair
 
-from sfa.storage.alchemy import dbsession
+# using global alchemy.session() here is fine 
+# as importer is on standalone one-shot process
+from sfa.storage.alchemy import global_dbsession
 from sfa.storage.model import RegRecord, RegAuthority, RegSlice, RegNode, RegUser, RegKey
 
 from sfa.planetlab.plshell import PlShell    
@@ -115,8 +117,8 @@ class PlImporter:
                                            pointer=site['site_id'],
                                            authority=get_authority(site_hrn))
                 auth_record.just_created()
-                dbsession.add(auth_record)
-                dbsession.commit()
+                global_dbsession.add(auth_record)
+                global_dbsession.commit()
                 self.logger.info("PlImporter: Imported authority (vini site) %s"%auth_record)
                 self.remember_record ( site_record )
 
@@ -127,7 +129,7 @@ class PlImporter:
         shell = PlShell (config)
 
         ######## retrieve all existing SFA objects
-        all_records = dbsession.query(RegRecord).all()
+        all_records = global_dbsession.query(RegRecord).all()
 
         # create hash by (type,hrn) 
         # we essentially use this to know if a given record is already known to SFA 
@@ -209,8 +211,8 @@ class PlImporter:
                                                pointer=site['site_id'],
                                                authority=get_authority(site_hrn))
                     site_record.just_created()
-                    dbsession.add(site_record)
-                    dbsession.commit()
+                    global_dbsession.add(site_record)
+                    global_dbsession.commit()
                     self.logger.info("PlImporter: imported authority (site) : %s" % site_record) 
                     self.remember_record (site_record)
                 except:
@@ -245,8 +247,8 @@ class PlImporter:
                                                pointer =node['node_id'],
                                                authority=get_authority(node_hrn))
                         node_record.just_created()
-                        dbsession.add(node_record)
-                        dbsession.commit()
+                        global_dbsession.add(node_record)
+                        global_dbsession.commit()
                         self.logger.info("PlImporter: imported node: %s" % node_record)  
                         self.remember_record (node_record)
                     except:
@@ -310,8 +312,8 @@ class PlImporter:
                         else:
                             self.logger.warning("No key found for user %s"%user_record)
                         user_record.just_created()
-                        dbsession.add (user_record)
-                        dbsession.commit()
+                        global_dbsession.add (user_record)
+                        global_dbsession.commit()
                         self.logger.info("PlImporter: imported person: %s" % user_record)
                         self.remember_record ( user_record )
                     else:
@@ -359,7 +361,7 @@ class PlImporter:
                             user_record.just_updated()
                             self.logger.info("PlImporter: updated person: %s" % user_record)
                     user_record.email = person['email']
-                    dbsession.commit()
+                    global_dbsession.commit()
                     user_record.stale=False
                     # accumulate PIs - PLCAPI has a limitation that when someone has PI role
                     # this is valid for all sites she is in..
@@ -377,7 +379,7 @@ class PlImporter:
             # could be performed twice with the same person...
             # so hopefully we do not need to eliminate duplicates explicitly here anymore
             site_record.reg_pis = list(set(site_pis))
-            dbsession.commit()
+            global_dbsession.commit()
 
             # import slices
             for slice_id in site['slice_ids']:
@@ -396,8 +398,8 @@ class PlImporter:
                                                  pointer=slice['slice_id'],
                                                  authority=get_authority(slice_hrn))
                         slice_record.just_created()
-                        dbsession.add(slice_record)
-                        dbsession.commit()
+                        global_dbsession.add(slice_record)
+                        global_dbsession.commit()
                         self.logger.info("PlImporter: imported slice: %s" % slice_record)  
                         self.remember_record ( slice_record )
                     except:
@@ -410,7 +412,7 @@ class PlImporter:
                 # record current users affiliated with the slice
                 slice_record.reg_researchers = \
                     [ self.locate_by_type_pointer ('user',user_id) for user_id in slice['person_ids'] ]
-                dbsession.commit()
+                global_dbsession.commit()
                 slice_record.stale=False
 
         ### remove stale records
@@ -432,5 +434,5 @@ class PlImporter:
                 self.logger.warning("stale not found with %s"%record)
             if stale:
                 self.logger.info("PlImporter: deleting stale record: %s" % record)
-                dbsession.delete(record)
-                dbsession.commit()
+                global_dbsession.delete(record)
+                global_dbsession.commit()
index 792f824..342cd19 100644 (file)
@@ -53,7 +53,7 @@ class AggregateManager:
         geni_api_versions = ApiVersions().get_versions()
         geni_api_versions['3'] = 'http://%s:%s' % (api.config.sfa_aggregate_host, api.config.sfa_aggregate_port)
         version_generic = {
-            'testbed': self.driver.testbed_name(),
+            'testbed': api.driver.testbed_name(),
             'interface':'aggregate',
             'hrn':xrn.get_hrn(),
             'urn':xrn.get_urn(),
@@ -65,7 +65,7 @@ class AggregateManager:
         }
         version.update(version_generic)
         version.update(self.rspec_versions())
-        testbed_version = self.driver.aggregate_version()
+        testbed_version = api.driver.aggregate_version()
         version.update(testbed_version)
         return version
     
@@ -80,16 +80,16 @@ class AggregateManager:
 
         # look in cache first
         cached_requested = options.get('cached', True)
-        if cached_requested and self.driver.cache:
-            rspec = self.driver.cache.get(version_string)
+        if cached_requested and api.driver.cache:
+            rspec = api.driver.cache.get(version_string)
             if rspec:
-                logger.debug("%s.ListResources returning cached advertisement" % (self.driver.__module__))
+                logger.debug("%s.ListResources returning cached advertisement" % (api.driver.__module__))
                 return rspec
        
-        rspec = self.driver.list_resources (rspec_version, options) 
-        if self.driver.cache:
-            logger.debug("%s.ListResources stores advertisement in cache" % (self.driver.__module__))
-            self.driver.cache.add(version_string, rspec)    
+        rspec = api.driver.list_resources (rspec_version, options) 
+        if api.driver.cache:
+            logger.debug("%s.ListResources stores advertisement in cache" % (api.driver.__module__))
+            api.driver.cache.add(version_string, rspec)    
         return rspec
     
     def Describe(self, api, creds, urns, options):
@@ -98,13 +98,13 @@ class AggregateManager:
 
         version_manager = VersionManager()
         rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
-        return self.driver.describe(urns, rspec_version, options)
+        return api.driver.describe(urns, rspec_version, options)
         
     
     def Status (self, api, urns, creds, options):
         call_id = options.get('call_id')
         if Callids().already_handled(call_id): return {}
-        return self.driver.status (urns, options=options)
+        return api.driver.status (urns, options=options)
    
 
     def Allocate(self, api, xrn, creds, rspec_string, expiration, options):
@@ -114,7 +114,7 @@ class AggregateManager:
         """
         call_id = options.get('call_id')
         if Callids().already_handled(call_id): return ""
-        return self.driver.allocate(xrn, rspec_string, expiration, options)
+        return api.driver.allocate(xrn, rspec_string, expiration, options)
  
     def Provision(self, api, xrns, creds, options):
         """
@@ -134,25 +134,25 @@ class AggregateManager:
         if not rspec_version:
             raise InvalidRSpecVersion(options['geni_rspec_version'])
                        
-        return self.driver.provision(xrns, options)
+        return api.driver.provision(xrns, options)
     
     def Delete(self, api, xrns, creds, options):
         call_id = options.get('call_id')
         if Callids().already_handled(call_id): return True
-        return self.driver.delete(xrns, options)
+        return api.driver.delete(xrns, options)
 
     def Renew(self, api, xrns, creds, expiration_time, options):
         call_id = options.get('call_id')
         if Callids().already_handled(call_id): return True
-        return self.driver.renew(xrns, expiration_time, options)
+        return api.driver.renew(xrns, expiration_time, options)
 
     def PerformOperationalAction(self, api, xrns, creds, action, options={}):
         call_id = options.get('call_id')
         if Callids().already_handled(call_id): return True
-        return self.driver.perform_operational_action(xrns, action, options) 
+        return api.driver.perform_operational_action(xrns, action, options) 
 
     def Shutdown(self, api, xrn, creds, options={}):
         call_id = options.get('call_id')
         if Callids().already_handled(call_id): return True
-        return self.driver.shutdown(xrn, options) 
+        return api.driver.shutdown(xrn, options) 
     
index fa25a83..0e8b71d 100644 (file)
@@ -5,9 +5,10 @@
 
 class Driver:
     
-    def __init__ (self, config): 
+    def __init__ (self, api): 
+        self.api = api
         # this is the hrn attached to the running server
-        self.hrn = config.SFA_INTERFACE_HRN
+        self.hrn = api.config.SFA_INTERFACE_HRN
 
     ########################################
     ########## registry oriented
index c24c1f5..bbd958e 100644 (file)
@@ -19,14 +19,14 @@ from sfa.trust.gid import create_uuid
 
 from sfa.storage.model import make_record, RegRecord, RegAuthority, RegUser, RegSlice, RegKey, \
     augment_with_sfa_builtins
-from sfa.storage.alchemy import dbsession
 ### the types that we need to exclude from sqlobjects before being able to dump
 # them on the xmlrpc wire
 from sqlalchemy.orm.collections import InstrumentedList
 
 class RegistryManager:
 
-    def __init__ (self, config): pass
+    def __init__ (self, config): 
+        logger.info("Creating RegistryManager[%s]"%id(self))
 
     # The GENI GetVersion call
     def GetVersion(self, api, options):
@@ -41,6 +41,7 @@ class RegistryManager:
                              'peers':peers})
     
     def GetCredential(self, api, xrn, type, caller_xrn=None):
+        dbsession = api.dbsession()
         # convert xrn to hrn     
         if type:
             hrn = urn_to_hrn(xrn)[0]
@@ -50,7 +51,7 @@ class RegistryManager:
         # Slivers don't have credentials but users should be able to 
         # specify a sliver xrn and receive the slice's credential
         if type == 'sliver' or '-' in Xrn(hrn).leaf:
-            slice_xrn = self.driver.sliver_to_slice_xrn(hrn)
+            slice_xrn = api.driver.sliver_to_slice_xrn(hrn)
             hrn = slice_xrn.hrn 
   
         # Is this a root or sub authority
@@ -110,6 +111,7 @@ class RegistryManager:
     # the default for full, which means 'dig into the testbed as well', should be false
     def Resolve(self, api, xrns, type=None, details=False):
     
+        dbsession = api.dbsession()
         if not isinstance(xrns, types.ListType):
             # try to infer type if not set and we get a single input
             if not type:
@@ -172,7 +174,7 @@ class RegistryManager:
         if details:
             # in details mode we get as much info as we can, which involves contacting the 
             # testbed for getting implementation details about the record
-            self.driver.augment_records_with_testbed_info(local_dicts)
+            api.driver.augment_records_with_testbed_info(local_dicts)
             # also we fill the 'url' field for known authorities
             # used to be in the driver code, sounds like a poorman thing though
             def solve_neighbour_url (record):
@@ -196,6 +198,7 @@ class RegistryManager:
         return records
     
     def List (self, api, xrn, origin_hrn=None, options={}):
+        dbsession=api.dbsession()
         # load all know registry names into a prefix tree and attempt to find
         # the longest matching prefix
         hrn, type = urn_to_hrn(xrn)
@@ -263,18 +266,19 @@ class RegistryManager:
     # subject_record describes the subject of the relationships
     # ref_record contains the target values for the various relationships we need to manage
     # (to begin with, this is just the slice x person (researcher) and authority x person (pi) relationships)
-    def update_driver_relations (self, subject_obj, ref_obj):
+    def update_driver_relations (self, api, subject_obj, ref_obj):
         type=subject_obj.type
         #for (k,v) in subject_obj.__dict__.items(): print k,'=',v
         if type=='slice' and hasattr(ref_obj,'researcher'):
-            self.update_driver_relation(subject_obj, ref_obj.researcher, 'user', 'researcher')
+            self.update_driver_relation(api, subject_obj, ref_obj.researcher, 'user', 'researcher')
         elif type=='authority' and hasattr(ref_obj,'pi'):
-            self.update_driver_relation(subject_obj,ref_obj.pi, 'user', 'pi')
+            self.update_driver_relation(api, subject_obj,ref_obj.pi, 'user', 'pi')
         
     # field_key is the name of one field in the record, typically 'researcher' for a 'slice' record
     # hrns is the list of hrns that should be linked to the subject from now on
     # target_type would be e.g. 'user' in the 'slice' x 'researcher' example
-    def update_driver_relation (self, record_obj, hrns, target_type, relation_name):
+    def update_driver_relation (self, api, record_obj, hrns, target_type, relation_name):
+        dbsession=api.dbsession()
         # locate the linked objects in our db
         subject_type=record_obj.type
         subject_id=record_obj.pointer
@@ -282,10 +286,11 @@ class RegistryManager:
         link_id_tuples = dbsession.query(RegRecord.pointer).filter_by(type=target_type).filter(RegRecord.hrn.in_(hrns)).all()
         # sqlalchemy returns named tuples for columns
         link_ids = [ tuple.pointer for tuple in link_id_tuples ]
-        self.driver.update_relation (subject_type, target_type, relation_name, subject_id, link_ids)
+        api.driver.update_relation (subject_type, target_type, relation_name, subject_id, link_ids)
 
     def Register(self, api, record_dict):
     
+        dbsession=api.dbsession()
         hrn, type = record_dict['hrn'], record_dict['type']
         urn = hrn_to_urn(hrn,type)
         # validate the type
@@ -331,11 +336,11 @@ class RegistryManager:
 
             # locate objects for relationships
             pi_hrns = getattr(record,'pi',None)
-            if pi_hrns is not None: record.update_pis (pi_hrns)
+            if pi_hrns is not None: record.update_pis (pi_hrns, dbsession)
 
         elif isinstance (record, RegSlice):
             researcher_hrns = getattr(record,'researcher',None)
-            if researcher_hrns is not None: record.update_researchers (researcher_hrns)
+            if researcher_hrns is not None: record.update_researchers (researcher_hrns, dbsession)
         
         elif isinstance (record, RegUser):
             # create RegKey objects for incoming keys
@@ -344,18 +349,19 @@ class RegistryManager:
                 record.reg_keys = [ RegKey (key) for key in record.keys ]
             
         # update testbed-specific data if needed
-        pointer = self.driver.register (record.__dict__, hrn, pub_key)
+        pointer = api.driver.register (record.__dict__, hrn, pub_key)
 
         record.pointer=pointer
         dbsession.add(record)
         dbsession.commit()
     
         # update membership for researchers, pis, owners, operators
-        self.update_driver_relations (record, record)
+        self.update_driver_relations (api, record, record)
         
         return record.get_gid_object().save_to_string(save_parents=True)
     
     def Update(self, api, record_dict):
+        dbsession=api.dbsession()
         assert ('type' in record_dict)
         new_record=make_record(dict=record_dict)
         (type,hrn) = (new_record.type, new_record.hrn)
@@ -394,11 +400,11 @@ class RegistryManager:
         # update native relations
         if isinstance (record, RegSlice):
             researcher_hrns = getattr(new_record,'researcher',None)
-            if researcher_hrns is not None: record.update_researchers (researcher_hrns)
+            if researcher_hrns is not None: record.update_researchers (researcher_hrns, dbsession)
 
         elif isinstance (record, RegAuthority):
             pi_hrns = getattr(new_record,'pi',None)
-            if pi_hrns is not None: record.update_pis (pi_hrns)
+            if pi_hrns is not None: record.update_pis (pi_hrns, dbsession)
         
         # update the PLC information that was specified with the record
         # xxx oddly enough, without this useless statement, 
@@ -408,7 +414,7 @@ class RegistryManager:
         print "DO NOT REMOVE ME before driver.update, record=%s"%record
         new_key_pointer = -1
         try:
-           (pointer, new_key_pointer) = self.driver.update (record.__dict__, new_record.__dict__, hrn, new_key)
+           (pointer, new_key_pointer) = api.driver.update (record.__dict__, new_record.__dict__, hrn, new_key)
         except:
            pass
         if new_key and new_key_pointer:
@@ -417,12 +423,13 @@ class RegistryManager:
 
         dbsession.commit()
         # update membership for researchers, pis, owners, operators
-        self.update_driver_relations (record, new_record)
+        self.update_driver_relations (api, record, new_record)
         
         return 1 
     
     # expecting an Xrn instance
     def Remove(self, api, xrn, origin_hrn=None):
+        dbsession=api.dbsession()
         hrn=xrn.get_hrn()
         type=xrn.get_type()
         request=dbsession.query(RegRecord).filter_by(hrn=hrn)
@@ -454,7 +461,7 @@ class RegistryManager:
 
         # call testbed callback first
         # IIUC this is done on the local testbed TOO because of the refreshpeer link
-        if not self.driver.remove(record.__dict__):
+        if not api.driver.remove(record.__dict__):
             logger.warning("driver.remove failed")
 
         # delete from sfa db
@@ -465,13 +472,14 @@ class RegistryManager:
 
     # This is a PLC-specific thing, won't work with other platforms
     def get_key_from_incoming_ip (self, api):
+        dbsession=api.dbsession()
         # verify that the callers's ip address exist in the db and is an interface
         # for a node in the db
         (ip, port) = api.remote_addr
-        interfaces = self.driver.shell.GetInterfaces({'ip': ip}, ['node_id'])
+        interfaces = api.driver.shell.GetInterfaces({'ip': ip}, ['node_id'])
         if not interfaces:
             raise NonExistingRecord("no such ip %(ip)s" % locals())
-        nodes = self.driver.shell.GetNodes([interfaces[0]['node_id']], ['node_id', 'hostname'])
+        nodes = api.driver.shell.GetNodes([interfaces[0]['node_id']], ['node_id', 'hostname'])
         if not nodes:
             raise NonExistingRecord("no such node using ip %(ip)s" % locals())
         node = nodes[0]
index 15a8cd8..069ef69 100644 (file)
@@ -7,6 +7,7 @@ from sfa.util.xrn import Xrn, urn_to_hrn, hrn_to_urn, get_leaf, get_authority
 from sfa.util.cache import Cache
 from sfa.rspecs.rspec import RSpec
 from sfa.storage.model import SliverAllocation
+# xxx 1-dbsession-per-request
 from sfa.storage.alchemy import dbsession
 
 class V2ToV3Adapter:
index e8c5128..94684b9 100644 (file)
@@ -25,7 +25,7 @@ class Delete(Method):
     
     def call(self, xrns, creds, options):
         valid_creds = self.api.auth.checkCredentials(creds, 'deletesliver', xrns,
-                      check_sliver_callback = self.api.manager.driver.check_sliver_credentials)
+                      check_sliver_callback = self.api.driver.check_sliver_credentials)
 
         #log the call
         origin_hrn = Credential(cred=valid_creds[0]).get_gid_caller().get_hrn()
index b66780a..8485f79 100644 (file)
@@ -38,7 +38,7 @@ class Describe(Method):
                 raise SfaInvalidArgument('Must specify an rspec version option. geni_rspec_version cannot be null')
  
         valid_creds = self.api.auth.checkCredentials(creds, 'listnodes', urns, \
-                      check_sliver_callback = self.api.manager.driver.check_sliver_credentials)
+                      check_sliver_callback = self.api.driver.check_sliver_credentials)
 
         # get hrn of the original caller 
         origin_hrn = options.get('origin_hrn', None)
index aa53def..f3a9612 100644 (file)
@@ -1,4 +1,3 @@
-
 from sfa.util.faults import RecordNotFound, ConnectionKeyGIDMismatch
 from sfa.util.xrn import urn_to_hrn
 from sfa.util.method import Method
index 74ee350..a94c1bc 100644 (file)
@@ -33,7 +33,7 @@ class Provision(Method):
 
         # Find the valid credentials
         valid_creds = self.api.auth.checkCredentials(creds, 'createsliver', xrns,
-                      check_sliver_callback = self.api.manager.driver.check_sliver_credentials) 
+                      check_sliver_callback = self.api.driver.check_sliver_credentials) 
         origin_hrn = Credential(cred=valid_creds[0]).get_gid_caller().get_hrn()
         self.api.logger.info("interface: %s\tcaller-hrn: %s\ttarget-hrn: %s\tmethod-name: %s"%(self.api.interface, origin_hrn, xrns, self.name))
         result = self.api.manager.Provision(self.api, xrns, creds, options)
index 288e970..8f31786 100644 (file)
@@ -34,7 +34,7 @@ class Renew(Method):
 
         # Find the valid credentials
         valid_creds = self.api.auth.checkCredentials(creds, 'renewsliver', urns,
-                      check_sliver_callback = self.api.manager.driver.check_sliver_credentials)
+                      check_sliver_callback = self.api.driver.check_sliver_credentials)
 
         # Validate that the time does not go beyond the credential's expiration time
         requested_time = utcparse(expiration_time)
index 8641bd0..3eee878 100644 (file)
@@ -20,7 +20,7 @@ class Shutdown(Method):
     def call(self, xrn, creds):
 
         valid_creds = self.api.auth.checkCredentials(creds, 'stopslice', xrn,
-                      check_sliver_callback = self.api.manager.driver.check_sliver_credentials)
+                      check_sliver_callback = self.api.driver.check_sliver_credentials)
         #log the call
         origin_hrn = Credential(cred=valid_creds[0]).get_gid_caller().get_hrn()
         self.api.logger.info("interface: %s\tcaller-hrn: %s\ttarget-hrn: %s\tmethod-name: %s"%(self.api.interface, origin_hrn, xrn, self.name))
index 044e252..164093e 100644 (file)
@@ -20,7 +20,7 @@ class Status(Method):
 
     def call(self, xrns, creds, options):
         valid_creds = self.api.auth.checkCredentials(creds, 'sliverstatus', xrns,
-                      check_sliver_callback = self.api.manager.driver.check_sliver_credentials)
+                      check_sliver_callback = self.api.driver.check_sliver_credentials)
 
         self.api.logger.info("interface: %s\ttarget-hrn: %s\tmethod-name: %s"%(self.api.interface, xrns, self.name))
         return self.api.manager.Status(self.api, xrns, creds, options)
index da35ca3..40db2a7 100644 (file)
@@ -11,7 +11,6 @@ from sfa.util.xrn import Xrn, hrn_to_urn, get_leaf, urn_to_hrn
 from sfa.util.cache import Cache
 
 # one would think the driver should not need to mess with the SFA db, but..
-from sfa.storage.alchemy import dbsession
 from sfa.storage.model import RegRecord
 
 # used to be used in get_ticket
@@ -46,8 +45,9 @@ class NitosDriver (Driver):
     # the cache instance is a class member so it survives across incoming requests
     cache = None
 
-    def __init__ (self, config):
-        Driver.__init__ (self, config)
+    def __init__ (self, api):
+        Driver.__init__ (self, api)
+        config = api.config
         self.shell = NitosShell (config)
         self.cache=None
         self.testbedInfo = self.shell.getTestbedInfo()
@@ -367,7 +367,7 @@ class NitosDriver (Driver):
         
         # get the registry records
         user_list, users = [], {}
-        user_list = dbsession.query(RegRecord).filter(RegRecord.pointer.in_(user_ids)).all()
+        user_list = self.api.dbsession().query(RegRecord).filter(RegRecord.pointer.in_(user_ids)).all()
         # create a hrns keyed on the sfa record's pointer.
         # Its possible for multiple records to have the same pointer so
         # the dict's value will be a list of hrns.
index e0afd07..e36946e 100644 (file)
@@ -15,7 +15,6 @@ from sfa.trust.credential import Credential
 #from sfa.trust.sfaticket import SfaTicket
 from sfa.rspecs.version_manager import VersionManager
 from sfa.rspecs.rspec import RSpec
-from sfa.storage.alchemy import dbsession
 from sfa.storage.model import RegRecord, SliverAllocation
 
 # the driver interface, mostly provides default behaviours
@@ -41,8 +40,9 @@ class NovaDriver(Driver):
     # the cache instance is a class member so it survives across incoming requests
     cache = None
 
-    def __init__ (self, config):
-        Driver.__init__(self, config)
+    def __init__ (self, api):
+        Driver.__init__(self, api)
+        config = api.config
         self.shell = Shell(config=config)
         self.cache=None
         if config.SFA_AGGREGATE_CACHING:
@@ -396,7 +396,8 @@ class NovaDriver(Driver):
         
         # update all sliver allocation states setting then to geni_allocated    
         sliver_ids = [sliver.id for sliver in slivers]
-        SliverAllocation.set_allocations(sliver_ids, 'geni_allocated')
+        dbsession=self.api.dbsession()
+        SliverAllocation.set_allocations(sliver_ids, 'geni_provisioned',dbsession)
    
         return aggregate.describe(urns=[urn], version=rspec.version)
 
@@ -408,7 +409,8 @@ class NovaDriver(Driver):
         for instance in instances:
             sliver_hrn = "%s.%s" % (self.driver.hrn, instance.id)
             sliver_ids.append(Xrn(sliver_hrn, type='sliver').urn)
-        SliverAllocation.set_allocations(sliver_ids, 'geni_provisioned') 
+        dbsession=self.api.dbsession()
+        SliverAllocation.set_allocations(sliver_ids, 'geni_provisioned',dbsession) 
         version_manager = VersionManager()
         rspec_version = version_manager.get_version(options['geni_rspec_version'])
         return self.describe(urns, rspec_version, options=options) 
@@ -427,7 +429,8 @@ class NovaDriver(Driver):
             aggregate.delete_instance(instance)
             
         # delete sliver allocation states
-        SliverAllocation.delete_allocations(sliver_ids)
+        dbsession=self.api.dbsession()
+        SliverAllocation.delete_allocations(sliver_ids, dbsession)
 
         # return geni_slivers
         geni_slivers = []
index 16eec41..d6d7367 100644 (file)
@@ -75,7 +75,7 @@ class OSAggregate:
         # lookup the sliver allocations
         sliver_ids = [sliver['sliver_id'] for sliver in slivers]
         constraint = SliverAllocation.sliver_id.in_(sliver_ids)
-        sliver_allocations = dbsession.query(SliverAllocation).filter(constraint)
+        sliver_allocations = self.driver.api.dbsession().query(SliverAllocation).filter(constraint)
         sliver_allocation_dict = {}
         for sliver_allocation in sliver_allocations:
             sliver_allocation_dict[sliver_allocation.sliver_id] = sliver_allocation
index 2d26f0f..399b3a2 100644 (file)
@@ -21,7 +21,6 @@ from sfa.rspecs.version_manager import VersionManager
 from sfa.planetlab.plxrn import PlXrn, hostname_to_urn, hrn_to_pl_slicename, slicename_to_hrn, top_auth, hash_loginbase
 from sfa.planetlab.vlink import get_tc_rate
 from sfa.planetlab.topology import Topology
-from sfa.storage.alchemy import dbsession
 from sfa.storage.model import SliverAllocation
 
 
index 27c6b4b..0978a57 100644 (file)
@@ -10,7 +10,6 @@ from sfa.util.xrn import Xrn, hrn_to_urn, get_leaf
 from sfa.util.cache import Cache
 
 # one would think the driver should not need to mess with the SFA db, but..
-from sfa.storage.alchemy import dbsession
 from sfa.storage.model import RegRecord, SliverAllocation
 from sfa.trust.credential import Credential
 
@@ -45,8 +44,9 @@ class PlDriver (Driver):
     # the cache instance is a class member so it survives across incoming requests
     cache = None
 
-    def __init__ (self, config):
-        Driver.__init__ (self, config)
+    def __init__ (self, api):
+        Driver.__init__ (self, api)
+        config=api.config
         self.shell = PlShell (config)
         self.cache=None
         if config.SFA_AGGREGATE_CACHING:
@@ -508,7 +508,7 @@ class PlDriver (Driver):
         
         # get the registry records
         person_list, persons = [], {}
-        person_list = dbsession.query (RegRecord).filter(RegRecord.pointer.in_(person_ids))
+        person_list = self.api.dbsession().query (RegRecord).filter(RegRecord.pointer.in_(person_ids))
         # create a hrns keyed on the sfa record's pointer.
         # Its possible for multiple records to have the same pointer so
         # the dict's value will be a list of hrns.
@@ -687,7 +687,8 @@ class PlDriver (Driver):
         slices.handle_peer(None, None, persons, peer)
         # update sliver allocation states and set them to geni_provisioned
         sliver_ids = [sliver['sliver_id'] for sliver in slivers]
-        SliverAllocation.set_allocations(sliver_ids, 'geni_provisioned')
+        dbsession=self.api.dbsession()
+        SliverAllocation.set_allocations(sliver_ids, 'geni_provisioned',dbsession)
         version_manager = VersionManager()
         rspec_version = version_manager.get_version(options['geni_rspec_version']) 
         return self.describe(urns, rspec_version, options=options)
@@ -726,7 +727,8 @@ class PlDriver (Driver):
                     self.shell.DeleteLeases(leases_ids)
      
                 # delete sliver allocation states
-                SliverAllocation.delete_allocations(sliver_ids)
+                dbsession=self.api.dbsession()
+                SliverAllocation.delete_allocations(sliver_ids,dbsession)
             finally:
                 if peer:
                     self.shell.BindObjectToPeer('slice', slice_id, peer, slice['peer_slice_id'])
index 703dac3..b3c6813 100644 (file)
@@ -10,7 +10,6 @@ from sfa.planetlab.vlink import VLink
 from sfa.planetlab.topology import Topology
 from sfa.planetlab.plxrn import PlXrn, hrn_to_pl_slicename, xrn_to_hostname, top_auth, hash_loginbase
 from sfa.storage.model import SliverAllocation
-from sfa.storage.alchemy import dbsession
 
 MAXINT =  2L**31-1
 
@@ -272,7 +271,7 @@ class PlSlices:
                                       component_id=component_id,
                                       slice_urn = slice_urn, 
                                       allocation_state='geni_allocated')      
-            record.sync()
+            record.sync(self.driver.api.dbsession())
         return resulting_nodes
 
     def free_egre_key(self):
index a0fc7f4..9911e46 100644 (file)
@@ -14,10 +14,10 @@ from sfa.util.version import version_core
 from sfa.server.xmlrpcapi import XmlrpcApi
 from sfa.client.return_value import ReturnValue
 
+from sfa.storage.alchemy import alchemy
 
 ####################
 class SfaApi (XmlrpcApi): 
-    
     """
     An SfaApi instance is a basic xmlrcp service
     augmented with the local cryptographic material and hrn
@@ -31,8 +31,8 @@ class SfaApi (XmlrpcApi):
 
     It gets augmented by the generic layer with 
     (*) an instance of manager (actually a manager module for now)
-    (*) which in turn holds an instance of a testbed driver
-    For convenience api.manager.driver == api.driver
+        beware that this is shared among all instances of api
+    (*) an instance of a testbed driver
     """
 
     def __init__ (self, encoding="utf-8", methods='sfa.methods', 
@@ -69,6 +69,7 @@ class SfaApi (XmlrpcApi):
         
         # filled later on by generic/Generic
         self.manager=None
+        self._dbsession=None
 
     def server_proxy(self, interface, cred, timeout=30):
         """
@@ -89,7 +90,16 @@ class SfaApi (XmlrpcApi):
         server = interface.server_proxy(key_file, cert_file, timeout)
         return server
                
-        
+    def dbsession(self):
+        if self._dbsession is None:
+            self._dbsession=alchemy.session()
+        return self._dbsession
+
+    def close_dbsession(self):
+        if self._dbsession is None: return
+        alchemy.close_session(self._dbsession)
+        self._dbsession=None
+
     def getCredential(self, minimumExpiration=0):
         """
         Return a valid credential for this interface.
@@ -159,7 +169,8 @@ class SfaApi (XmlrpcApi):
         if not auth_hrn or hrn == self.config.SFA_INTERFACE_HRN:
             auth_hrn = hrn
         auth_info = self.auth.get_auth_info(auth_hrn)
-        from sfa.storage.alchemy import dbsession
+        # xxx although unlikely we might want to check for a potential leak
+        dbsession=self.dbsession()
         from sfa.storage.model import RegRecord
         record = dbsession.query(RegRecord).filter_by(type='authority+sa', hrn=hrn).first()
         if not record:
index 7bc434c..dbdde3f 100644 (file)
@@ -127,16 +127,18 @@ class SecureXMLRpcRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
             #self.send_response(500)
             #self.end_headers()
        
-        # got a valid response
-        self.send_response(200)
-        self.send_header("Content-type", "text/xml")
-        self.send_header("Content-length", str(len(response)))
-        self.end_headers()
-        self.wfile.write(response)
-
-        # shut down the connection
-        self.wfile.flush()
-        self.connection.shutdown() # Modified here!
+        # avoid session/connection leaks : do this no matter what 
+        finally:
+            self.send_response(200)
+            self.send_header("Content-type", "text/xml")
+            self.send_header("Content-length", str(len(response)))
+            self.end_headers()
+            self.wfile.write(response)
+            self.wfile.flush()
+            # close db connection
+            self.api.close_dbsession()
+            # shut down the connection
+            self.connection.shutdown() # Modified here!
 
 ##
 # Taken from the web (XXX find reference). Implements an HTTPS xmlrpc server
index e9f96dd..84c987f 100644 (file)
@@ -49,21 +49,34 @@ class Alchemy:
     def check (self):
         self.engine.execute ("select 1").scalar()
 
-    def session (self):
+    def global_session (self):
         if self._session is None:
             Session=sessionmaker ()
             self._session=Session(bind=self.engine)
+            logger.info('alchemy.global_session created session %s'%self._session)
         return self._session
 
-    def close_session (self):
+    def close_global_session (self):
         if self._session is None: return
+        logger.info('alchemy.close_global_session %s'%self._session)
         self._session.close()
         self._session=None
 
+    # create a dbsession to be managed separately
+    def session (self):
+        Session=sessionmaker()
+        session=Session (bind=self.engine)
+        logger.info('alchemy.session created session %s'%session)
+        return session
+
+    def close_session (self, session):
+        logger.info('alchemy.close_session closed session %s'%session)
+        session.close()
+
 ####################
 from sfa.util.config import Config
 
 alchemy=Alchemy (Config())
 engine=alchemy.engine
-dbsession=alchemy.session()
+global_dbsession=alchemy.global_session()
 
index b095042..46e2dee 100644 (file)
@@ -191,9 +191,7 @@ class RegAuthority (RegRecord):
     def __repr__ (self):
         return RegRecord.__repr__(self).replace("Record","Authority")
 
-    def update_pis (self, pi_hrns):
-        # don't ruin the import of that file in a client world
-        from sfa.storage.alchemy import dbsession
+    def update_pis (self, pi_hrns, dbsession):
         # strip that in case we have <researcher> words </researcher>
         pi_hrns = [ x.strip() for x in pi_hrns ]
         request = dbsession.query (RegUser).filter(RegUser.hrn.in_(pi_hrns))
@@ -221,9 +219,7 @@ class RegSlice (RegRecord):
     def __repr__ (self):
         return RegRecord.__repr__(self).replace("Record","Slice")
 
-    def update_researchers (self, researcher_hrns):
-        # don't ruin the import of that file in a client world
-        from sfa.storage.alchemy import dbsession
+    def update_researchers (self, researcher_hrns, dbsession):
         # strip that in case we have <researcher> words </researcher>
         researcher_hrns = [ x.strip() for x in researcher_hrns ]
         request = dbsession.query (RegUser).filter(RegUser.hrn.in_(researcher_hrns))
@@ -232,9 +228,12 @@ class RegSlice (RegRecord):
         self.reg_researchers = researchers
 
     # when dealing with credentials, we need to retrieve the PIs attached to a slice
+    # WARNING: with the move to passing dbsessions around, we face a glitch here because this
+    # helper function is called from the trust/ area that
     def get_pis (self):
-        # don't ruin the import of that file in a client world
-        from sfa.storage.alchemy import dbsession
+        from sqlalchemy.orm import sessionmaker
+        Session=sessionmaker()
+        dbsession=Session.object_session(self)
         from sfa.util.xrn import get_authority
         authority_hrn = get_authority(self.hrn)
         auth_record = dbsession.query(RegAuthority).filter_by(hrn=authority_hrn).first()
@@ -344,8 +343,7 @@ class SliverAllocation(Base,AlchemyObj):
         return state
 
     @staticmethod    
-    def set_allocations(sliver_ids, state):
-        from sfa.storage.alchemy import dbsession
+    def set_allocations(sliver_ids, state, dbsession):
         if not isinstance(sliver_ids, list):
             sliver_ids = [sliver_ids]
         sliver_state_updated = {}
@@ -366,8 +364,7 @@ class SliverAllocation(Base,AlchemyObj):
         dbsession.commit()
 
     @staticmethod
-    def delete_allocations(sliver_ids):
-        from sfa.storage.alchemy import dbsession
+    def delete_allocations(sliver_ids, dbsession):
         if not isinstance(sliver_ids, list):
             sliver_ids = [sliver_ids]
         constraint = SliverAllocation.sliver_id.in_(sliver_ids)
@@ -376,9 +373,7 @@ class SliverAllocation(Base,AlchemyObj):
             dbsession.delete(sliver_allocation)
         dbsession.commit()
     
-    def sync(self):
-        from sfa.storage.alchemy import dbsession
-        
+    def sync(self, dbsession):
         constraints = [SliverAllocation.sliver_id==self.sliver_id]
         results = dbsession.query(SliverAllocation).filter(and_(*constraints))
         records = []