From: Git User Date: Thu, 14 Nov 2013 07:34:43 +0000 (-0500) Subject: Merge remote-tracking branch 'origin/geni-v3' into geni-v3 X-Git-Tag: sfa-3.1-1~42 X-Git-Url: http://git.onelab.eu/?p=sfa.git;a=commitdiff_plain;h=7e1d7d63b6e65af7451c533fda704b14e327e48f;hp=e7f22ef06ecae9985de86d41785effa443ba4816 Merge remote-tracking branch 'origin/geni-v3' into geni-v3 --- diff --git a/docs/dbsession.readme b/docs/dbsession.readme new file mode 100644 index 00000000..8ee608a1 --- /dev/null +++ b/docs/dbsession.readme @@ -0,0 +1,37 @@ +As of Nov. 2013, when moving from 3.0 to 3.2 +-------------------------------------------- + +* driver-creation +. expect an api instead of a config (and api.config is set) + +* managers +. cannot access their driver from self, but from the context (api is passed to methods) + +* dbsession : implementation +. storage.Alchemy still exports a global dbsession object, but named global_session(); together with close_global_session() +. storage.Alchemy also exports a method called session(), that is *NOT* managed - caller is expected to close_session() +. storage.Alchemy only exports global_session (as alchemy.global_session) + so that any code that would still need adaptation will break at import time + +* dbsession : usage +. use api.dbsession() whenever possible +. it's fair to have importers and sfaadmin use the global session (there won't be instances of api in their case) +. there is one or 2 exceptions where dbsession is retrieved from an sqlalchemy object but this is deemed poor practice, please DO NOT do this as far as possible + +--- +OTHER NOTES: + +* iotlab/cortexlab: +. while browsing this code I noticed that the code for cortexlab seems very close to the one for iotlab + I wonder if some inheritance would have allowed to reduce code duplication + so I'll forget about cortexlab for now as all/most of the folowing comments probably apply as-is to cortex + +* iotlab/iotlabapi +. it's confusing that the class name here does not match the filename (class IotlabTestbedAPI in iotlabapi.py) +. IIUC this could/should be renamed IotlabShell (in iotlabshell.py) instead, that's exactly what our notion of a shell is +. regardless; in order to fetch dbsession() from the context api, I tried to tweak iotlabtestbedapi so that it also takes an api instead of a config in argument + however I am puzzled at why most(all?) the IotlabTestbadAPI methods that actually use dbsession are labelled as methodstatic ? + Is this a strong constraint ? + It would help me a lot if this could be made a regular class, as opposed to what looks like a mere namespace, so we can retrieve dbsession() from an api object + + diff --git a/sfa/client/sfaadmin.py b/sfa/client/sfaadmin.py index b7b45d46..82c915c2 100755 --- a/sfa/client/sfaadmin.py +++ b/sfa/client/sfaadmin.py @@ -125,9 +125,8 @@ class RegistryCommands(Commands): """Check the correspondance between the GID and the PubKey""" # db records - from sfa.storage.alchemy import dbsession from sfa.storage.model import RegRecord - db_query = dbsession.query(RegRecord).filter_by(type=type) + db_query = self.api.dbsession().query(RegRecord).filter_by(type=type) if xrn and not all: hrn = Xrn(xrn).get_hrn() db_query = db_query.filter_by(hrn=hrn) @@ -315,10 +314,9 @@ class CertCommands(Commands): @args('-o', '--outfile', dest='outfile', metavar='', help='output file', default=None) def export(self, xrn, type=None, outfile=None): """Fetch an object's GID from the Registry""" - from sfa.storage.alchemy import dbsession from sfa.storage.model import RegRecord hrn = Xrn(xrn).get_hrn() - request=dbsession.query(RegRecord).filter_by(hrn=hrn) + request=self.api.dbsession().query(RegRecord).filter_by(hrn=hrn) if type: request = request.filter_by(type=type) record=request.first() if record: diff --git a/sfa/dummy/dummyaggregate.py b/sfa/dummy/dummyaggregate.py index dca47b59..c6381a61 100644 --- a/sfa/dummy/dummyaggregate.py +++ b/sfa/dummy/dummyaggregate.py @@ -19,7 +19,6 @@ from sfa.rspecs.version_manager import VersionManager from sfa.dummy.dummyxrn import DummyXrn, hostname_to_urn, hrn_to_dummy_slicename, slicename_to_hrn -from sfa.storage.alchemy import dbsession from sfa.storage.model import SliverAllocation import time @@ -243,7 +242,7 @@ class DummyAggregate: geni_urn = urns[0] sliver_ids = [sliver['sliver_id'] for sliver in slivers] constraint = SliverAllocation.sliver_id.in_(sliver_ids) - sliver_allocations = dbsession.query(SliverAllocation).filter(constraint) + sliver_allocations = self.driver.api.dbsession().query(SliverAllocation).filter(constraint) sliver_allocation_dict = {} for sliver_allocation in sliver_allocations: geni_urn = sliver_allocation.slice_urn diff --git a/sfa/dummy/dummydriver.py b/sfa/dummy/dummydriver.py index 0d0514e8..b33c85d3 100644 --- a/sfa/dummy/dummydriver.py +++ b/sfa/dummy/dummydriver.py @@ -11,7 +11,6 @@ from sfa.util.xrn import Xrn, hrn_to_urn, get_leaf from sfa.util.cache import Cache # one would think the driver should not need to mess with the SFA db, but.. -from sfa.storage.alchemy import dbsession from sfa.storage.model import RegRecord, SliverAllocation from sfa.trust.credential import Credential @@ -44,9 +43,9 @@ class DummyDriver (Driver): # the cache instance is a class member so it survives across incoming requests cache = None - def __init__ (self, config): - Driver.__init__ (self, config) - self.config = config + def __init__ (self, api): + Driver.__init__ (self, api) + config = api.config self.hrn = config.SFA_INTERFACE_HRN self.root_auth = config.SFA_REGISTRY_ROOT_AUTH self.shell = DummyShell (config) @@ -336,7 +335,7 @@ class DummyDriver (Driver): # get the registry records user_list, users = [], {} - user_list = dbsession.query (RegRecord).filter(RegRecord.pointer.in_(user_ids)) + user_list = self.api.dbsession().query (RegRecord).filter(RegRecord.pointer.in_(user_ids)) # create a hrns keyed on the sfa record's pointer. # Its possible for multiple records to have the same pointer so # the dict's value will be a list of hrns. @@ -464,7 +463,8 @@ class DummyDriver (Driver): #users = slices.verify_users(None, slice, geni_users, options=options) # update sliver allocation states and set them to geni_provisioned sliver_ids = [sliver['sliver_id'] for sliver in slivers] - SliverAllocation.set_allocations(sliver_ids, 'geni_provisioned') + dbsession=self.api.dbsession() + SliverAllocation.set_allocations(sliver_ids, 'geni_provisioned',dbsession) version_manager = VersionManager() rspec_version = version_manager.get_version(options['geni_rspec_version']) return self.describe(urns, rspec_version, options=options) @@ -490,7 +490,8 @@ class DummyDriver (Driver): try: self.shell.DeleteSliceFromNodes({'slice_id': slice_id, 'node_ids': node_ids}) # delete sliver allocation states - SliverAllocation.delete_allocations(sliver_ids) + dbsession=self.api.dbsession() + SliverAllocation.delete_allocations(sliver_ids,dbsession) finally: pass diff --git a/sfa/dummy/dummyslices.py b/sfa/dummy/dummyslices.py index dddf1a66..cf5a6da9 100644 --- a/sfa/dummy/dummyslices.py +++ b/sfa/dummy/dummyslices.py @@ -8,7 +8,6 @@ from sfa.util.xrn import Xrn, get_leaf, get_authority, urn_to_hrn from sfa.rspecs.rspec import RSpec from sfa.storage.model import SliverAllocation -from sfa.storage.alchemy import dbsession from sfa.dummy.dummyxrn import DummyXrn, hrn_to_dummy_slicename @@ -107,7 +106,7 @@ class DummySlices: component_id=component_id, slice_urn = slice_urn, allocation_state='geni_allocated') - record.sync() + record.sync(self.driver.api.dbsession()) return resulting_nodes diff --git a/sfa/federica/fddriver.py b/sfa/federica/fddriver.py index 96c7aa4b..cec702d6 100644 --- a/sfa/federica/fddriver.py +++ b/sfa/federica/fddriver.py @@ -23,8 +23,9 @@ federica_version_string="RSpecV2" class FdDriver (PlDriver): - def __init__ (self,config): - PlDriver.__init__ (self, config) + def __init__ (self,api): + PlDriver.__init__ (self, api) + config = api.config self.shell=FdShell(config) # the agreement with the federica driver is for them to expose results in a way diff --git a/sfa/generic/__init__.py b/sfa/generic/__init__.py index 99d15bc1..ece7e2b8 100644 --- a/sfa/generic/__init__.py +++ b/sfa/generic/__init__.py @@ -67,16 +67,12 @@ class Generic: # xxx can probably drop support for managers implemented as modules # which makes it a bit awkward manager_class_or_module = self.make_manager(api.interface) - driver = self.make_driver (api.config, api.interface) + driver = self.make_driver (api) ### arrange stuff together # add a manager wrapper manager_wrap = ManagerWrapper(manager_class_or_module,api.interface,api.config) api.manager=manager_wrap - # insert driver in manager - logger.debug("Setting manager.driver, manager=%s"%manager_class_or_module) - # xxx this should go into the object and not the class !?! - manager_class_or_module.driver=driver - # add it in api as well for convenience + # add it in api as well; driver.api is set too as part of make_driver api.driver=driver return api @@ -100,7 +96,9 @@ class Generic: logger.log_exc_critical(message) # need interface to select the right driver - def make_driver (self, config, interface): + def make_driver (self, api): + config=api.config + interface=api.interface flavour = self.flavour message="Generic.make_driver for flavour=%s and interface=%s"%(flavour,interface) @@ -111,7 +109,7 @@ class Generic: try: class_obj = getattr(self,classname)() logger.debug("%s : %s"%(message,class_obj)) - return class_obj(config) + return class_obj(api) except: logger.log_exc_critical(message) diff --git a/sfa/generic/architecture.txt b/sfa/generic/architecture.txt index ff63549b..fe793bff 100644 --- a/sfa/generic/architecture.txt +++ b/sfa/generic/architecture.txt @@ -20,8 +20,8 @@ configurable in a flavour (e.g. sfa.generic.pl.py) following layout: api.manager -manager.driver -api.driver (for convenience) +api.driver +driver.api ------ example diff --git a/sfa/importer/__init__.py b/sfa/importer/__init__.py index 35f8acdc..d1721da9 100644 --- a/sfa/importer/__init__.py +++ b/sfa/importer/__init__.py @@ -10,7 +10,9 @@ from sfa.util.sfalogging import _SfaLogger from sfa.trust.hierarchy import Hierarchy #from sfa.trust.trustedroots import TrustedRoots from sfa.trust.gid import create_uuid -from sfa.storage.alchemy import dbsession +# using global alchemy.session() here is fine +# as importer is on standalone one-shot process +from sfa.storage.alchemy import global_dbsession from sfa.storage.model import RegRecord, RegAuthority, RegUser from sfa.trust.certificate import convert_public_key, Keypair @@ -35,7 +37,7 @@ class Importer: # check before creating a RegRecord entry as we run this over and over def record_exists (self, type, hrn): - return dbsession.query(RegRecord).filter_by(hrn=hrn,type=type).count()!=0 + return global_dbsession.query(RegRecord).filter_by(hrn=hrn,type=type).count()!=0 def create_top_level_auth_records(self, hrn): """ @@ -56,8 +58,8 @@ class Importer: auth_record = RegAuthority(hrn=hrn, gid=auth_info.get_gid_object(), authority=get_authority(hrn)) auth_record.just_created() - dbsession.add (auth_record) - dbsession.commit() + global_dbsession.add (auth_record) + global_dbsession.commit() self.logger.info("SfaImporter: imported authority (parent) %s " % auth_record) @@ -76,8 +78,8 @@ class Importer: user_record = RegUser(hrn=hrn, gid=auth_info.get_gid_object(), authority=get_authority(hrn)) user_record.just_created() - dbsession.add (user_record) - dbsession.commit() + global_dbsession.add (user_record) + global_dbsession.commit() self.logger.info("SfaImporter: importing user (slicemanager) %s " % user_record) @@ -98,8 +100,8 @@ class Importer: interface_record = RegAuthority(type=type, hrn=hrn, gid=gid, authority=get_authority(hrn)) interface_record.just_created() - dbsession.add (interface_record) - dbsession.commit() + global_dbsession.add (interface_record) + global_dbsession.commit() self.logger.info("SfaImporter: imported authority (%s) %s " % (type,interface_record)) def run(self, options=None): diff --git a/sfa/importer/dummyimporter.py b/sfa/importer/dummyimporter.py index 5001849b..d274b279 100644 --- a/sfa/importer/dummyimporter.py +++ b/sfa/importer/dummyimporter.py @@ -23,7 +23,9 @@ from sfa.util.xrn import Xrn, get_leaf, get_authority, hrn_to_urn from sfa.trust.gid import create_uuid from sfa.trust.certificate import convert_public_key, Keypair -from sfa.storage.alchemy import dbsession +# using global alchemy.session() here is fine +# as importer is on standalone one-shot process +from sfa.storage.alchemy import global_dbsession from sfa.storage.model import RegRecord, RegAuthority, RegSlice, RegNode, RegUser, RegKey from sfa.dummy.dummyshell import DummyShell @@ -99,7 +101,7 @@ class DummyImporter: shell = DummyShell (config) ######## retrieve all existing SFA objects - all_records = dbsession.query(RegRecord).all() + all_records = global_dbsession.query(RegRecord).all() # create hash by (type,hrn) # we essentially use this to know if a given record is already known to SFA @@ -159,8 +161,8 @@ class DummyImporter: pointer= -1, authority=get_authority(site_hrn)) site_record.just_created() - dbsession.add(site_record) - dbsession.commit() + global_dbsession.add(site_record) + global_dbsession.commit() self.logger.info("DummyImporter: imported authority (site) : %s" % site_record) self.remember_record (site_record) except: @@ -190,8 +192,8 @@ class DummyImporter: pointer =node['node_id'], authority=get_authority(node_hrn)) node_record.just_created() - dbsession.add(node_record) - dbsession.commit() + global_dbsession.add(node_record) + global_dbsession.commit() self.logger.info("DummyImporter: imported node: %s" % node_record) self.remember_record (node_record) except: @@ -249,8 +251,8 @@ class DummyImporter: else: self.logger.warning("No key found for user %s"%user_record) user_record.just_created() - dbsession.add (user_record) - dbsession.commit() + global_dbsession.add (user_record) + global_dbsession.commit() self.logger.info("DummyImporter: imported person: %s" % user_record) self.remember_record ( user_record ) @@ -277,7 +279,7 @@ class DummyImporter: user_record.reg_keys=[ RegKey (pubkey)] self.logger.info("DummyImporter: updated person: %s" % user_record) user_record.email = user['email'] - dbsession.commit() + global_dbsession.commit() user_record.stale=False except: self.logger.log_exc("DummyImporter: failed to import user %d %s"%(user['user_id'],user['email'])) @@ -296,8 +298,8 @@ class DummyImporter: pointer=slice['slice_id'], authority=get_authority(slice_hrn)) slice_record.just_created() - dbsession.add(slice_record) - dbsession.commit() + global_dbsession.add(slice_record) + global_dbsession.commit() self.logger.info("DummyImporter: imported slice: %s" % slice_record) self.remember_record ( slice_record ) except: @@ -309,7 +311,7 @@ class DummyImporter: # record current users affiliated with the slice slice_record.reg_researchers = \ [ self.locate_by_type_pointer ('user',user_id) for user_id in slice['user_ids'] ] - dbsession.commit() + global_dbsession.commit() slice_record.stale=False ### remove stale records @@ -328,5 +330,5 @@ class DummyImporter: self.logger.warning("stale not found with %s"%record) if stale: self.logger.info("DummyImporter: deleting stale record: %s" % record) - dbsession.delete(record) - dbsession.commit() + global_dbsession.delete(record) + global_dbsession.commit() diff --git a/sfa/importer/iotlabimporter.py b/sfa/importer/iotlabimporter.py index 86874375..bfa094b2 100644 --- a/sfa/importer/iotlabimporter.py +++ b/sfa/importer/iotlabimporter.py @@ -11,7 +11,9 @@ from sfa.iotlab.iotlabpostgres import TestbedAdditionalSfaDB from sfa.trust.certificate import Keypair, convert_public_key from sfa.trust.gid import create_uuid -from sfa.storage.alchemy import dbsession +# using global alchemy.session() here is fine +# as importer is on standalone one-shot process +from sfa.storage.alchemy import global_dbsession from sfa.storage.model import RegRecord, RegAuthority, RegSlice, RegNode, \ RegUser, RegKey @@ -43,7 +45,7 @@ class IotlabImporter: self.logger = loc_logger self.logger.setLevelDebug() #retrieve all existing SFA objects - self.all_records = dbsession.query(RegRecord).all() + self.all_records = global_dbsession.query(RegRecord).all() # initialize record.stale to True by default, # then mark stale=False on the ones that are in use @@ -213,8 +215,8 @@ class IotlabImporter: try: node_record.just_created() - dbsession.add(node_record) - dbsession.commit() + global_dbsession.add(node_record) + global_dbsession.commit() self.logger.info("IotlabImporter: imported node: %s" % node_record) self.update_just_added_records_dict(node_record) @@ -259,8 +261,8 @@ class IotlabImporter: pointer='-1', authority=get_authority(site_hrn)) site_record.just_created() - dbsession.add(site_record) - dbsession.commit() + global_dbsession.add(site_record) + global_dbsession.commit() self.logger.info("IotlabImporter: imported authority \ (site) %s" % site_record) self.update_just_added_records_dict(site_record) @@ -404,8 +406,8 @@ class IotlabImporter: try: user_record.just_created() - dbsession.add (user_record) - dbsession.commit() + global_dbsession.add (user_record) + global_dbsession.commit() self.logger.info("IotlabImporter: imported person \ %s" % (user_record)) self.update_just_added_records_dict(user_record) @@ -440,7 +442,7 @@ class IotlabImporter: user_record.email = person['email'] try: - dbsession.commit() + global_dbsession.commit() user_record.stale = False except SQLAlchemyError: self.logger.log_exc("IotlabImporter: \ @@ -478,8 +480,8 @@ class IotlabImporter: authority=get_authority(slice_hrn)) try: slice_record.just_created() - dbsession.add(slice_record) - dbsession.commit() + global_dbsession.add(slice_record) + global_dbsession.commit() self.update_just_added_records_dict(slice_record) @@ -497,7 +499,7 @@ class IotlabImporter: slice_record.reg_researchers = [user_record] try: - dbsession.commit() + global_dbsession.commit() slice_record.stale = False except SQLAlchemyError: self.logger.log_exc("IotlabImporter: failed to update slice") @@ -551,8 +553,8 @@ class IotlabImporter: % (record)) try: - dbsession.delete(record) - dbsession.commit() + global_dbsession.delete(record) + global_dbsession.commit() except SQLAlchemyError: self.logger.log_exc("IotlabImporter: failed to delete \ stale record %s" % (record)) diff --git a/sfa/importer/nitosimporter.py b/sfa/importer/nitosimporter.py index 78bccc4b..425be778 100644 --- a/sfa/importer/nitosimporter.py +++ b/sfa/importer/nitosimporter.py @@ -7,7 +7,9 @@ from sfa.util.xrn import Xrn, get_leaf, get_authority, hrn_to_urn from sfa.trust.gid import create_uuid from sfa.trust.certificate import convert_public_key, Keypair -from sfa.storage.alchemy import dbsession +# using global alchemy.session() here is fine +# as importer is on standalone one-shot process +from sfa.storage.alchemy import global_dbsession from sfa.storage.model import RegRecord, RegAuthority, RegSlice, RegNode, RegUser, RegKey from sfa.nitos.nitosshell import NitosShell @@ -83,7 +85,7 @@ class NitosImporter: shell = NitosShell (config) ######## retrieve all existing SFA objects - all_records = dbsession.query(RegRecord).all() + all_records = global_dbsession.query(RegRecord).all() # create hash by (type,hrn) # we essentially use this to know if a given record is already known to SFA @@ -146,8 +148,8 @@ class NitosImporter: pointer=0, authority=get_authority(site_hrn)) site_record.just_created() - dbsession.add(site_record) - dbsession.commit() + global_dbsession.add(site_record) + global_dbsession.commit() self.logger.info("NitosImporter: imported authority (site) : %s" % site_record) self.remember_record (site_record) except: @@ -177,8 +179,8 @@ class NitosImporter: pointer =node['node_id'], authority=get_authority(node_hrn)) node_record.just_created() - dbsession.add(node_record) - dbsession.commit() + global_dbsession.add(node_record) + global_dbsession.commit() self.logger.info("NitosImporter: imported node: %s" % node_record) self.remember_record (node_record) except: @@ -236,8 +238,8 @@ class NitosImporter: else: self.logger.warning("No key found for user %s"%user_record) user_record.just_created() - dbsession.add (user_record) - dbsession.commit() + global_dbsession.add (user_record) + global_dbsession.commit() self.logger.info("NitosImporter: imported user: %s" % user_record) self.remember_record ( user_record ) else: @@ -270,7 +272,7 @@ class NitosImporter: user_record.just_updated() self.logger.info("NitosImporter: updated user: %s" % user_record) user_record.email = user['email'] - dbsession.commit() + global_dbsession.commit() user_record.stale=False except: self.logger.log_exc("NitosImporter: failed to import user %s %s"%(user['user_id'],user['email'])) @@ -289,8 +291,8 @@ class NitosImporter: pointer=slice['slice_id'], authority=get_authority(slice_hrn)) slice_record.just_created() - dbsession.add(slice_record) - dbsession.commit() + global_dbsession.add(slice_record) + global_dbsession.commit() self.logger.info("NitosImporter: imported slice: %s" % slice_record) self.remember_record ( slice_record ) except: @@ -302,7 +304,7 @@ class NitosImporter: # record current users affiliated with the slice slice_record.reg_researchers = \ [ self.locate_by_type_pointer ('user',int(user_id)) for user_id in slice['user_ids'] ] - dbsession.commit() + global_dbsession.commit() slice_record.stale=False @@ -322,7 +324,7 @@ class NitosImporter: self.logger.warning("stale not found with %s"%record) if stale: self.logger.info("NitosImporter: deleting stale record: %s" % record) - dbsession.delete(record) - dbsession.commit() + global_dbsession.delete(record) + global_dbsession.commit() diff --git a/sfa/importer/openstackimporter.py b/sfa/importer/openstackimporter.py index 0cf729c3..c8233bda 100644 --- a/sfa/importer/openstackimporter.py +++ b/sfa/importer/openstackimporter.py @@ -4,7 +4,9 @@ from sfa.util.config import Config from sfa.util.xrn import Xrn, get_leaf, get_authority, hrn_to_urn from sfa.trust.gid import create_uuid from sfa.trust.certificate import convert_public_key, Keypair -from sfa.storage.alchemy import dbsession +# using global alchemy.session() here is fine +# as importer is on standalone one-shot process +from sfa.storage.alchemy import global_dbsession from sfa.storage.model import RegRecord, RegAuthority, RegUser, RegSlice, RegNode from sfa.openstack.osxrn import OSXrn from sfa.openstack.shell import Shell @@ -79,8 +81,8 @@ class OpenstackImporter: user_record.hrn=hrn user_record.gid=user_gid user_record.authority=get_authority(hrn) - dbsession.add(user_record) - dbsession.commit() + global_dbsession.add(user_record) + global_dbsession.commit() self.logger.info("OpenstackImporter: imported person %s" % user_record) return users_dict, user_keys @@ -112,8 +114,8 @@ class OpenstackImporter: record.hrn=hrn record.gid=gid record.authority=get_authority(hrn) - dbsession.add(record) - dbsession.commit() + global_dbsession.add(record) + global_dbsession.commit() self.logger.info("OpenstackImporter: imported authority: %s" % record) else: @@ -125,8 +127,8 @@ class OpenstackImporter: record.hrn=hrn record.gid=gid record.authority=get_authority(hrn) - dbsession.add(record) - dbsession.commit() + global_dbsession.add(record) + global_dbsession.commit() self.logger.info("OpenstackImporter: imported slice: %s" % record) return tenants_dict @@ -139,7 +141,7 @@ class OpenstackImporter: existing_records = {} existing_hrns = [] key_ids = [] - for record in dbsession.query(RegRecord): + for record in global_dbsession.query(RegRecord): existing_records[ (record.hrn, record.type,) ] = record existing_hrns.append(record.hrn) @@ -168,8 +170,8 @@ class OpenstackImporter: record_object = existing_records[ (record_hrn, type) ] self.logger.info("OpenstackImporter: removing %s " % record) - dbsession.delete(record_object) - dbsession.commit() + global_dbsession.delete(record_object) + global_dbsession.commit() # save pub keys self.logger.info('OpenstackImporter: saving current pub keys') diff --git a/sfa/importer/plimporter.py b/sfa/importer/plimporter.py index 7f5cf860..d281c614 100644 --- a/sfa/importer/plimporter.py +++ b/sfa/importer/plimporter.py @@ -24,7 +24,9 @@ from sfa.util.xrn import Xrn, get_leaf, get_authority, hrn_to_urn from sfa.trust.gid import create_uuid from sfa.trust.certificate import convert_public_key, Keypair -from sfa.storage.alchemy import dbsession +# using global alchemy.session() here is fine +# as importer is on standalone one-shot process +from sfa.storage.alchemy import global_dbsession from sfa.storage.model import RegRecord, RegAuthority, RegSlice, RegNode, RegUser, RegKey from sfa.planetlab.plshell import PlShell @@ -115,8 +117,8 @@ class PlImporter: pointer=site['site_id'], authority=get_authority(site_hrn)) auth_record.just_created() - dbsession.add(auth_record) - dbsession.commit() + global_dbsession.add(auth_record) + global_dbsession.commit() self.logger.info("PlImporter: Imported authority (vini site) %s"%auth_record) self.remember_record ( site_record ) @@ -127,7 +129,7 @@ class PlImporter: shell = PlShell (config) ######## retrieve all existing SFA objects - all_records = dbsession.query(RegRecord).all() + all_records = global_dbsession.query(RegRecord).all() # create hash by (type,hrn) # we essentially use this to know if a given record is already known to SFA @@ -209,8 +211,8 @@ class PlImporter: pointer=site['site_id'], authority=get_authority(site_hrn)) site_record.just_created() - dbsession.add(site_record) - dbsession.commit() + global_dbsession.add(site_record) + global_dbsession.commit() self.logger.info("PlImporter: imported authority (site) : %s" % site_record) self.remember_record (site_record) except: @@ -245,8 +247,8 @@ class PlImporter: pointer =node['node_id'], authority=get_authority(node_hrn)) node_record.just_created() - dbsession.add(node_record) - dbsession.commit() + global_dbsession.add(node_record) + global_dbsession.commit() self.logger.info("PlImporter: imported node: %s" % node_record) self.remember_record (node_record) except: @@ -310,8 +312,8 @@ class PlImporter: else: self.logger.warning("No key found for user %s"%user_record) user_record.just_created() - dbsession.add (user_record) - dbsession.commit() + global_dbsession.add (user_record) + global_dbsession.commit() self.logger.info("PlImporter: imported person: %s" % user_record) self.remember_record ( user_record ) else: @@ -359,7 +361,7 @@ class PlImporter: user_record.just_updated() self.logger.info("PlImporter: updated person: %s" % user_record) user_record.email = person['email'] - dbsession.commit() + global_dbsession.commit() user_record.stale=False # accumulate PIs - PLCAPI has a limitation that when someone has PI role # this is valid for all sites she is in.. @@ -377,7 +379,7 @@ class PlImporter: # could be performed twice with the same person... # so hopefully we do not need to eliminate duplicates explicitly here anymore site_record.reg_pis = list(set(site_pis)) - dbsession.commit() + global_dbsession.commit() # import slices for slice_id in site['slice_ids']: @@ -396,8 +398,8 @@ class PlImporter: pointer=slice['slice_id'], authority=get_authority(slice_hrn)) slice_record.just_created() - dbsession.add(slice_record) - dbsession.commit() + global_dbsession.add(slice_record) + global_dbsession.commit() self.logger.info("PlImporter: imported slice: %s" % slice_record) self.remember_record ( slice_record ) except: @@ -410,7 +412,7 @@ class PlImporter: # record current users affiliated with the slice slice_record.reg_researchers = \ [ self.locate_by_type_pointer ('user',user_id) for user_id in slice['person_ids'] ] - dbsession.commit() + global_dbsession.commit() slice_record.stale=False ### remove stale records @@ -432,5 +434,5 @@ class PlImporter: self.logger.warning("stale not found with %s"%record) if stale: self.logger.info("PlImporter: deleting stale record: %s" % record) - dbsession.delete(record) - dbsession.commit() + global_dbsession.delete(record) + global_dbsession.commit() diff --git a/sfa/managers/aggregate_manager.py b/sfa/managers/aggregate_manager.py index 792f8243..342cd19c 100644 --- a/sfa/managers/aggregate_manager.py +++ b/sfa/managers/aggregate_manager.py @@ -53,7 +53,7 @@ class AggregateManager: geni_api_versions = ApiVersions().get_versions() geni_api_versions['3'] = 'http://%s:%s' % (api.config.sfa_aggregate_host, api.config.sfa_aggregate_port) version_generic = { - 'testbed': self.driver.testbed_name(), + 'testbed': api.driver.testbed_name(), 'interface':'aggregate', 'hrn':xrn.get_hrn(), 'urn':xrn.get_urn(), @@ -65,7 +65,7 @@ class AggregateManager: } version.update(version_generic) version.update(self.rspec_versions()) - testbed_version = self.driver.aggregate_version() + testbed_version = api.driver.aggregate_version() version.update(testbed_version) return version @@ -80,16 +80,16 @@ class AggregateManager: # look in cache first cached_requested = options.get('cached', True) - if cached_requested and self.driver.cache: - rspec = self.driver.cache.get(version_string) + if cached_requested and api.driver.cache: + rspec = api.driver.cache.get(version_string) if rspec: - logger.debug("%s.ListResources returning cached advertisement" % (self.driver.__module__)) + logger.debug("%s.ListResources returning cached advertisement" % (api.driver.__module__)) return rspec - rspec = self.driver.list_resources (rspec_version, options) - if self.driver.cache: - logger.debug("%s.ListResources stores advertisement in cache" % (self.driver.__module__)) - self.driver.cache.add(version_string, rspec) + rspec = api.driver.list_resources (rspec_version, options) + if api.driver.cache: + logger.debug("%s.ListResources stores advertisement in cache" % (api.driver.__module__)) + api.driver.cache.add(version_string, rspec) return rspec def Describe(self, api, creds, urns, options): @@ -98,13 +98,13 @@ class AggregateManager: version_manager = VersionManager() rspec_version = version_manager.get_version(options.get('geni_rspec_version')) - return self.driver.describe(urns, rspec_version, options) + return api.driver.describe(urns, rspec_version, options) def Status (self, api, urns, creds, options): call_id = options.get('call_id') if Callids().already_handled(call_id): return {} - return self.driver.status (urns, options=options) + return api.driver.status (urns, options=options) def Allocate(self, api, xrn, creds, rspec_string, expiration, options): @@ -114,7 +114,7 @@ class AggregateManager: """ call_id = options.get('call_id') if Callids().already_handled(call_id): return "" - return self.driver.allocate(xrn, rspec_string, expiration, options) + return api.driver.allocate(xrn, rspec_string, expiration, options) def Provision(self, api, xrns, creds, options): """ @@ -134,25 +134,25 @@ class AggregateManager: if not rspec_version: raise InvalidRSpecVersion(options['geni_rspec_version']) - return self.driver.provision(xrns, options) + return api.driver.provision(xrns, options) def Delete(self, api, xrns, creds, options): call_id = options.get('call_id') if Callids().already_handled(call_id): return True - return self.driver.delete(xrns, options) + return api.driver.delete(xrns, options) def Renew(self, api, xrns, creds, expiration_time, options): call_id = options.get('call_id') if Callids().already_handled(call_id): return True - return self.driver.renew(xrns, expiration_time, options) + return api.driver.renew(xrns, expiration_time, options) def PerformOperationalAction(self, api, xrns, creds, action, options={}): call_id = options.get('call_id') if Callids().already_handled(call_id): return True - return self.driver.perform_operational_action(xrns, action, options) + return api.driver.perform_operational_action(xrns, action, options) def Shutdown(self, api, xrn, creds, options={}): call_id = options.get('call_id') if Callids().already_handled(call_id): return True - return self.driver.shutdown(xrn, options) + return api.driver.shutdown(xrn, options) diff --git a/sfa/managers/driver.py b/sfa/managers/driver.py index fa25a83f..0e8b71dc 100644 --- a/sfa/managers/driver.py +++ b/sfa/managers/driver.py @@ -5,9 +5,10 @@ class Driver: - def __init__ (self, config): + def __init__ (self, api): + self.api = api # this is the hrn attached to the running server - self.hrn = config.SFA_INTERFACE_HRN + self.hrn = api.config.SFA_INTERFACE_HRN ######################################## ########## registry oriented diff --git a/sfa/managers/registry_manager.py b/sfa/managers/registry_manager.py index c24c1f51..bbd958ef 100644 --- a/sfa/managers/registry_manager.py +++ b/sfa/managers/registry_manager.py @@ -19,14 +19,14 @@ from sfa.trust.gid import create_uuid from sfa.storage.model import make_record, RegRecord, RegAuthority, RegUser, RegSlice, RegKey, \ augment_with_sfa_builtins -from sfa.storage.alchemy import dbsession ### the types that we need to exclude from sqlobjects before being able to dump # them on the xmlrpc wire from sqlalchemy.orm.collections import InstrumentedList class RegistryManager: - def __init__ (self, config): pass + def __init__ (self, config): + logger.info("Creating RegistryManager[%s]"%id(self)) # The GENI GetVersion call def GetVersion(self, api, options): @@ -41,6 +41,7 @@ class RegistryManager: 'peers':peers}) def GetCredential(self, api, xrn, type, caller_xrn=None): + dbsession = api.dbsession() # convert xrn to hrn if type: hrn = urn_to_hrn(xrn)[0] @@ -50,7 +51,7 @@ class RegistryManager: # Slivers don't have credentials but users should be able to # specify a sliver xrn and receive the slice's credential if type == 'sliver' or '-' in Xrn(hrn).leaf: - slice_xrn = self.driver.sliver_to_slice_xrn(hrn) + slice_xrn = api.driver.sliver_to_slice_xrn(hrn) hrn = slice_xrn.hrn # Is this a root or sub authority @@ -110,6 +111,7 @@ class RegistryManager: # the default for full, which means 'dig into the testbed as well', should be false def Resolve(self, api, xrns, type=None, details=False): + dbsession = api.dbsession() if not isinstance(xrns, types.ListType): # try to infer type if not set and we get a single input if not type: @@ -172,7 +174,7 @@ class RegistryManager: if details: # in details mode we get as much info as we can, which involves contacting the # testbed for getting implementation details about the record - self.driver.augment_records_with_testbed_info(local_dicts) + api.driver.augment_records_with_testbed_info(local_dicts) # also we fill the 'url' field for known authorities # used to be in the driver code, sounds like a poorman thing though def solve_neighbour_url (record): @@ -196,6 +198,7 @@ class RegistryManager: return records def List (self, api, xrn, origin_hrn=None, options={}): + dbsession=api.dbsession() # load all know registry names into a prefix tree and attempt to find # the longest matching prefix hrn, type = urn_to_hrn(xrn) @@ -263,18 +266,19 @@ class RegistryManager: # subject_record describes the subject of the relationships # ref_record contains the target values for the various relationships we need to manage # (to begin with, this is just the slice x person (researcher) and authority x person (pi) relationships) - def update_driver_relations (self, subject_obj, ref_obj): + def update_driver_relations (self, api, subject_obj, ref_obj): type=subject_obj.type #for (k,v) in subject_obj.__dict__.items(): print k,'=',v if type=='slice' and hasattr(ref_obj,'researcher'): - self.update_driver_relation(subject_obj, ref_obj.researcher, 'user', 'researcher') + self.update_driver_relation(api, subject_obj, ref_obj.researcher, 'user', 'researcher') elif type=='authority' and hasattr(ref_obj,'pi'): - self.update_driver_relation(subject_obj,ref_obj.pi, 'user', 'pi') + self.update_driver_relation(api, subject_obj,ref_obj.pi, 'user', 'pi') # field_key is the name of one field in the record, typically 'researcher' for a 'slice' record # hrns is the list of hrns that should be linked to the subject from now on # target_type would be e.g. 'user' in the 'slice' x 'researcher' example - def update_driver_relation (self, record_obj, hrns, target_type, relation_name): + def update_driver_relation (self, api, record_obj, hrns, target_type, relation_name): + dbsession=api.dbsession() # locate the linked objects in our db subject_type=record_obj.type subject_id=record_obj.pointer @@ -282,10 +286,11 @@ class RegistryManager: link_id_tuples = dbsession.query(RegRecord.pointer).filter_by(type=target_type).filter(RegRecord.hrn.in_(hrns)).all() # sqlalchemy returns named tuples for columns link_ids = [ tuple.pointer for tuple in link_id_tuples ] - self.driver.update_relation (subject_type, target_type, relation_name, subject_id, link_ids) + api.driver.update_relation (subject_type, target_type, relation_name, subject_id, link_ids) def Register(self, api, record_dict): + dbsession=api.dbsession() hrn, type = record_dict['hrn'], record_dict['type'] urn = hrn_to_urn(hrn,type) # validate the type @@ -331,11 +336,11 @@ class RegistryManager: # locate objects for relationships pi_hrns = getattr(record,'pi',None) - if pi_hrns is not None: record.update_pis (pi_hrns) + if pi_hrns is not None: record.update_pis (pi_hrns, dbsession) elif isinstance (record, RegSlice): researcher_hrns = getattr(record,'researcher',None) - if researcher_hrns is not None: record.update_researchers (researcher_hrns) + if researcher_hrns is not None: record.update_researchers (researcher_hrns, dbsession) elif isinstance (record, RegUser): # create RegKey objects for incoming keys @@ -344,18 +349,19 @@ class RegistryManager: record.reg_keys = [ RegKey (key) for key in record.keys ] # update testbed-specific data if needed - pointer = self.driver.register (record.__dict__, hrn, pub_key) + pointer = api.driver.register (record.__dict__, hrn, pub_key) record.pointer=pointer dbsession.add(record) dbsession.commit() # update membership for researchers, pis, owners, operators - self.update_driver_relations (record, record) + self.update_driver_relations (api, record, record) return record.get_gid_object().save_to_string(save_parents=True) def Update(self, api, record_dict): + dbsession=api.dbsession() assert ('type' in record_dict) new_record=make_record(dict=record_dict) (type,hrn) = (new_record.type, new_record.hrn) @@ -394,11 +400,11 @@ class RegistryManager: # update native relations if isinstance (record, RegSlice): researcher_hrns = getattr(new_record,'researcher',None) - if researcher_hrns is not None: record.update_researchers (researcher_hrns) + if researcher_hrns is not None: record.update_researchers (researcher_hrns, dbsession) elif isinstance (record, RegAuthority): pi_hrns = getattr(new_record,'pi',None) - if pi_hrns is not None: record.update_pis (pi_hrns) + if pi_hrns is not None: record.update_pis (pi_hrns, dbsession) # update the PLC information that was specified with the record # xxx oddly enough, without this useless statement, @@ -408,7 +414,7 @@ class RegistryManager: print "DO NOT REMOVE ME before driver.update, record=%s"%record new_key_pointer = -1 try: - (pointer, new_key_pointer) = self.driver.update (record.__dict__, new_record.__dict__, hrn, new_key) + (pointer, new_key_pointer) = api.driver.update (record.__dict__, new_record.__dict__, hrn, new_key) except: pass if new_key and new_key_pointer: @@ -417,12 +423,13 @@ class RegistryManager: dbsession.commit() # update membership for researchers, pis, owners, operators - self.update_driver_relations (record, new_record) + self.update_driver_relations (api, record, new_record) return 1 # expecting an Xrn instance def Remove(self, api, xrn, origin_hrn=None): + dbsession=api.dbsession() hrn=xrn.get_hrn() type=xrn.get_type() request=dbsession.query(RegRecord).filter_by(hrn=hrn) @@ -454,7 +461,7 @@ class RegistryManager: # call testbed callback first # IIUC this is done on the local testbed TOO because of the refreshpeer link - if not self.driver.remove(record.__dict__): + if not api.driver.remove(record.__dict__): logger.warning("driver.remove failed") # delete from sfa db @@ -465,13 +472,14 @@ class RegistryManager: # This is a PLC-specific thing, won't work with other platforms def get_key_from_incoming_ip (self, api): + dbsession=api.dbsession() # verify that the callers's ip address exist in the db and is an interface # for a node in the db (ip, port) = api.remote_addr - interfaces = self.driver.shell.GetInterfaces({'ip': ip}, ['node_id']) + interfaces = api.driver.shell.GetInterfaces({'ip': ip}, ['node_id']) if not interfaces: raise NonExistingRecord("no such ip %(ip)s" % locals()) - nodes = self.driver.shell.GetNodes([interfaces[0]['node_id']], ['node_id', 'hostname']) + nodes = api.driver.shell.GetNodes([interfaces[0]['node_id']], ['node_id', 'hostname']) if not nodes: raise NonExistingRecord("no such node using ip %(ip)s" % locals()) node = nodes[0] diff --git a/sfa/managers/v2_to_v3_adapter.py b/sfa/managers/v2_to_v3_adapter.py index 15a8cd83..069ef69e 100644 --- a/sfa/managers/v2_to_v3_adapter.py +++ b/sfa/managers/v2_to_v3_adapter.py @@ -7,6 +7,7 @@ from sfa.util.xrn import Xrn, urn_to_hrn, hrn_to_urn, get_leaf, get_authority from sfa.util.cache import Cache from sfa.rspecs.rspec import RSpec from sfa.storage.model import SliverAllocation +# xxx 1-dbsession-per-request from sfa.storage.alchemy import dbsession class V2ToV3Adapter: diff --git a/sfa/methods/Delete.py b/sfa/methods/Delete.py index e8c5128f..94684b97 100644 --- a/sfa/methods/Delete.py +++ b/sfa/methods/Delete.py @@ -25,7 +25,7 @@ class Delete(Method): def call(self, xrns, creds, options): valid_creds = self.api.auth.checkCredentials(creds, 'deletesliver', xrns, - check_sliver_callback = self.api.manager.driver.check_sliver_credentials) + check_sliver_callback = self.api.driver.check_sliver_credentials) #log the call origin_hrn = Credential(cred=valid_creds[0]).get_gid_caller().get_hrn() diff --git a/sfa/methods/Describe.py b/sfa/methods/Describe.py index b66780a4..8485f796 100644 --- a/sfa/methods/Describe.py +++ b/sfa/methods/Describe.py @@ -38,7 +38,7 @@ class Describe(Method): raise SfaInvalidArgument('Must specify an rspec version option. geni_rspec_version cannot be null') valid_creds = self.api.auth.checkCredentials(creds, 'listnodes', urns, \ - check_sliver_callback = self.api.manager.driver.check_sliver_credentials) + check_sliver_callback = self.api.driver.check_sliver_credentials) # get hrn of the original caller origin_hrn = options.get('origin_hrn', None) diff --git a/sfa/methods/GetSelfCredential.py b/sfa/methods/GetSelfCredential.py index aa53defb..f3a96127 100644 --- a/sfa/methods/GetSelfCredential.py +++ b/sfa/methods/GetSelfCredential.py @@ -1,4 +1,3 @@ - from sfa.util.faults import RecordNotFound, ConnectionKeyGIDMismatch from sfa.util.xrn import urn_to_hrn from sfa.util.method import Method diff --git a/sfa/methods/Provision.py b/sfa/methods/Provision.py index 74ee350e..a94c1bc3 100644 --- a/sfa/methods/Provision.py +++ b/sfa/methods/Provision.py @@ -33,7 +33,7 @@ class Provision(Method): # Find the valid credentials valid_creds = self.api.auth.checkCredentials(creds, 'createsliver', xrns, - check_sliver_callback = self.api.manager.driver.check_sliver_credentials) + check_sliver_callback = self.api.driver.check_sliver_credentials) origin_hrn = Credential(cred=valid_creds[0]).get_gid_caller().get_hrn() self.api.logger.info("interface: %s\tcaller-hrn: %s\ttarget-hrn: %s\tmethod-name: %s"%(self.api.interface, origin_hrn, xrns, self.name)) result = self.api.manager.Provision(self.api, xrns, creds, options) diff --git a/sfa/methods/Renew.py b/sfa/methods/Renew.py index 288e9700..8f31786b 100644 --- a/sfa/methods/Renew.py +++ b/sfa/methods/Renew.py @@ -34,7 +34,7 @@ class Renew(Method): # Find the valid credentials valid_creds = self.api.auth.checkCredentials(creds, 'renewsliver', urns, - check_sliver_callback = self.api.manager.driver.check_sliver_credentials) + check_sliver_callback = self.api.driver.check_sliver_credentials) # Validate that the time does not go beyond the credential's expiration time requested_time = utcparse(expiration_time) diff --git a/sfa/methods/Shutdown.py b/sfa/methods/Shutdown.py index 8641bd0e..3eee8785 100644 --- a/sfa/methods/Shutdown.py +++ b/sfa/methods/Shutdown.py @@ -20,7 +20,7 @@ class Shutdown(Method): def call(self, xrn, creds): valid_creds = self.api.auth.checkCredentials(creds, 'stopslice', xrn, - check_sliver_callback = self.api.manager.driver.check_sliver_credentials) + check_sliver_callback = self.api.driver.check_sliver_credentials) #log the call origin_hrn = Credential(cred=valid_creds[0]).get_gid_caller().get_hrn() self.api.logger.info("interface: %s\tcaller-hrn: %s\ttarget-hrn: %s\tmethod-name: %s"%(self.api.interface, origin_hrn, xrn, self.name)) diff --git a/sfa/methods/Status.py b/sfa/methods/Status.py index 044e2529..164093e7 100644 --- a/sfa/methods/Status.py +++ b/sfa/methods/Status.py @@ -20,7 +20,7 @@ class Status(Method): def call(self, xrns, creds, options): valid_creds = self.api.auth.checkCredentials(creds, 'sliverstatus', xrns, - check_sliver_callback = self.api.manager.driver.check_sliver_credentials) + check_sliver_callback = self.api.driver.check_sliver_credentials) self.api.logger.info("interface: %s\ttarget-hrn: %s\tmethod-name: %s"%(self.api.interface, xrns, self.name)) return self.api.manager.Status(self.api, xrns, creds, options) diff --git a/sfa/nitos/nitosdriver.py b/sfa/nitos/nitosdriver.py index da35ca35..40db2a70 100644 --- a/sfa/nitos/nitosdriver.py +++ b/sfa/nitos/nitosdriver.py @@ -11,7 +11,6 @@ from sfa.util.xrn import Xrn, hrn_to_urn, get_leaf, urn_to_hrn from sfa.util.cache import Cache # one would think the driver should not need to mess with the SFA db, but.. -from sfa.storage.alchemy import dbsession from sfa.storage.model import RegRecord # used to be used in get_ticket @@ -46,8 +45,9 @@ class NitosDriver (Driver): # the cache instance is a class member so it survives across incoming requests cache = None - def __init__ (self, config): - Driver.__init__ (self, config) + def __init__ (self, api): + Driver.__init__ (self, api) + config = api.config self.shell = NitosShell (config) self.cache=None self.testbedInfo = self.shell.getTestbedInfo() @@ -367,7 +367,7 @@ class NitosDriver (Driver): # get the registry records user_list, users = [], {} - user_list = dbsession.query(RegRecord).filter(RegRecord.pointer.in_(user_ids)).all() + user_list = self.api.dbsession().query(RegRecord).filter(RegRecord.pointer.in_(user_ids)).all() # create a hrns keyed on the sfa record's pointer. # Its possible for multiple records to have the same pointer so # the dict's value will be a list of hrns. diff --git a/sfa/openstack/nova_driver.py b/sfa/openstack/nova_driver.py index e0afd07f..e36946e6 100644 --- a/sfa/openstack/nova_driver.py +++ b/sfa/openstack/nova_driver.py @@ -15,7 +15,6 @@ from sfa.trust.credential import Credential #from sfa.trust.sfaticket import SfaTicket from sfa.rspecs.version_manager import VersionManager from sfa.rspecs.rspec import RSpec -from sfa.storage.alchemy import dbsession from sfa.storage.model import RegRecord, SliverAllocation # the driver interface, mostly provides default behaviours @@ -41,8 +40,9 @@ class NovaDriver(Driver): # the cache instance is a class member so it survives across incoming requests cache = None - def __init__ (self, config): - Driver.__init__(self, config) + def __init__ (self, api): + Driver.__init__(self, api) + config = api.config self.shell = Shell(config=config) self.cache=None if config.SFA_AGGREGATE_CACHING: @@ -396,7 +396,8 @@ class NovaDriver(Driver): # update all sliver allocation states setting then to geni_allocated sliver_ids = [sliver.id for sliver in slivers] - SliverAllocation.set_allocations(sliver_ids, 'geni_allocated') + dbsession=self.api.dbsession() + SliverAllocation.set_allocations(sliver_ids, 'geni_provisioned',dbsession) return aggregate.describe(urns=[urn], version=rspec.version) @@ -408,7 +409,8 @@ class NovaDriver(Driver): for instance in instances: sliver_hrn = "%s.%s" % (self.driver.hrn, instance.id) sliver_ids.append(Xrn(sliver_hrn, type='sliver').urn) - SliverAllocation.set_allocations(sliver_ids, 'geni_provisioned') + dbsession=self.api.dbsession() + SliverAllocation.set_allocations(sliver_ids, 'geni_provisioned',dbsession) version_manager = VersionManager() rspec_version = version_manager.get_version(options['geni_rspec_version']) return self.describe(urns, rspec_version, options=options) @@ -427,7 +429,8 @@ class NovaDriver(Driver): aggregate.delete_instance(instance) # delete sliver allocation states - SliverAllocation.delete_allocations(sliver_ids) + dbsession=self.api.dbsession() + SliverAllocation.delete_allocations(sliver_ids, dbsession) # return geni_slivers geni_slivers = [] diff --git a/sfa/openstack/osaggregate.py b/sfa/openstack/osaggregate.py index 16eec41e..d6d73677 100644 --- a/sfa/openstack/osaggregate.py +++ b/sfa/openstack/osaggregate.py @@ -75,7 +75,7 @@ class OSAggregate: # lookup the sliver allocations sliver_ids = [sliver['sliver_id'] for sliver in slivers] constraint = SliverAllocation.sliver_id.in_(sliver_ids) - sliver_allocations = dbsession.query(SliverAllocation).filter(constraint) + sliver_allocations = self.driver.api.dbsession().query(SliverAllocation).filter(constraint) sliver_allocation_dict = {} for sliver_allocation in sliver_allocations: sliver_allocation_dict[sliver_allocation.sliver_id] = sliver_allocation diff --git a/sfa/planetlab/plaggregate.py b/sfa/planetlab/plaggregate.py index 2d26f0f3..399b3a2a 100644 --- a/sfa/planetlab/plaggregate.py +++ b/sfa/planetlab/plaggregate.py @@ -21,7 +21,6 @@ from sfa.rspecs.version_manager import VersionManager from sfa.planetlab.plxrn import PlXrn, hostname_to_urn, hrn_to_pl_slicename, slicename_to_hrn, top_auth, hash_loginbase from sfa.planetlab.vlink import get_tc_rate from sfa.planetlab.topology import Topology -from sfa.storage.alchemy import dbsession from sfa.storage.model import SliverAllocation diff --git a/sfa/planetlab/pldriver.py b/sfa/planetlab/pldriver.py index 27c6b4bd..0978a576 100644 --- a/sfa/planetlab/pldriver.py +++ b/sfa/planetlab/pldriver.py @@ -10,7 +10,6 @@ from sfa.util.xrn import Xrn, hrn_to_urn, get_leaf from sfa.util.cache import Cache # one would think the driver should not need to mess with the SFA db, but.. -from sfa.storage.alchemy import dbsession from sfa.storage.model import RegRecord, SliverAllocation from sfa.trust.credential import Credential @@ -45,8 +44,9 @@ class PlDriver (Driver): # the cache instance is a class member so it survives across incoming requests cache = None - def __init__ (self, config): - Driver.__init__ (self, config) + def __init__ (self, api): + Driver.__init__ (self, api) + config=api.config self.shell = PlShell (config) self.cache=None if config.SFA_AGGREGATE_CACHING: @@ -508,7 +508,7 @@ class PlDriver (Driver): # get the registry records person_list, persons = [], {} - person_list = dbsession.query (RegRecord).filter(RegRecord.pointer.in_(person_ids)) + person_list = self.api.dbsession().query (RegRecord).filter(RegRecord.pointer.in_(person_ids)) # create a hrns keyed on the sfa record's pointer. # Its possible for multiple records to have the same pointer so # the dict's value will be a list of hrns. @@ -687,7 +687,8 @@ class PlDriver (Driver): slices.handle_peer(None, None, persons, peer) # update sliver allocation states and set them to geni_provisioned sliver_ids = [sliver['sliver_id'] for sliver in slivers] - SliverAllocation.set_allocations(sliver_ids, 'geni_provisioned') + dbsession=self.api.dbsession() + SliverAllocation.set_allocations(sliver_ids, 'geni_provisioned',dbsession) version_manager = VersionManager() rspec_version = version_manager.get_version(options['geni_rspec_version']) return self.describe(urns, rspec_version, options=options) @@ -726,7 +727,8 @@ class PlDriver (Driver): self.shell.DeleteLeases(leases_ids) # delete sliver allocation states - SliverAllocation.delete_allocations(sliver_ids) + dbsession=self.api.dbsession() + SliverAllocation.delete_allocations(sliver_ids,dbsession) finally: if peer: self.shell.BindObjectToPeer('slice', slice_id, peer, slice['peer_slice_id']) diff --git a/sfa/planetlab/plslices.py b/sfa/planetlab/plslices.py index 703dac30..b3c6813e 100644 --- a/sfa/planetlab/plslices.py +++ b/sfa/planetlab/plslices.py @@ -10,7 +10,6 @@ from sfa.planetlab.vlink import VLink from sfa.planetlab.topology import Topology from sfa.planetlab.plxrn import PlXrn, hrn_to_pl_slicename, xrn_to_hostname, top_auth, hash_loginbase from sfa.storage.model import SliverAllocation -from sfa.storage.alchemy import dbsession MAXINT = 2L**31-1 @@ -272,7 +271,7 @@ class PlSlices: component_id=component_id, slice_urn = slice_urn, allocation_state='geni_allocated') - record.sync() + record.sync(self.driver.api.dbsession()) return resulting_nodes def free_egre_key(self): diff --git a/sfa/server/sfaapi.py b/sfa/server/sfaapi.py index a0fc7f48..9911e466 100644 --- a/sfa/server/sfaapi.py +++ b/sfa/server/sfaapi.py @@ -14,10 +14,10 @@ from sfa.util.version import version_core from sfa.server.xmlrpcapi import XmlrpcApi from sfa.client.return_value import ReturnValue +from sfa.storage.alchemy import alchemy #################### class SfaApi (XmlrpcApi): - """ An SfaApi instance is a basic xmlrcp service augmented with the local cryptographic material and hrn @@ -31,8 +31,8 @@ class SfaApi (XmlrpcApi): It gets augmented by the generic layer with (*) an instance of manager (actually a manager module for now) - (*) which in turn holds an instance of a testbed driver - For convenience api.manager.driver == api.driver + beware that this is shared among all instances of api + (*) an instance of a testbed driver """ def __init__ (self, encoding="utf-8", methods='sfa.methods', @@ -69,6 +69,7 @@ class SfaApi (XmlrpcApi): # filled later on by generic/Generic self.manager=None + self._dbsession=None def server_proxy(self, interface, cred, timeout=30): """ @@ -89,7 +90,16 @@ class SfaApi (XmlrpcApi): server = interface.server_proxy(key_file, cert_file, timeout) return server - + def dbsession(self): + if self._dbsession is None: + self._dbsession=alchemy.session() + return self._dbsession + + def close_dbsession(self): + if self._dbsession is None: return + alchemy.close_session(self._dbsession) + self._dbsession=None + def getCredential(self, minimumExpiration=0): """ Return a valid credential for this interface. @@ -159,7 +169,8 @@ class SfaApi (XmlrpcApi): if not auth_hrn or hrn == self.config.SFA_INTERFACE_HRN: auth_hrn = hrn auth_info = self.auth.get_auth_info(auth_hrn) - from sfa.storage.alchemy import dbsession + # xxx although unlikely we might want to check for a potential leak + dbsession=self.dbsession() from sfa.storage.model import RegRecord record = dbsession.query(RegRecord).filter_by(type='authority+sa', hrn=hrn).first() if not record: diff --git a/sfa/server/threadedserver.py b/sfa/server/threadedserver.py index 7bc434ce..dbdde3f2 100644 --- a/sfa/server/threadedserver.py +++ b/sfa/server/threadedserver.py @@ -127,16 +127,18 @@ class SecureXMLRpcRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): #self.send_response(500) #self.end_headers() - # got a valid response - self.send_response(200) - self.send_header("Content-type", "text/xml") - self.send_header("Content-length", str(len(response))) - self.end_headers() - self.wfile.write(response) - - # shut down the connection - self.wfile.flush() - self.connection.shutdown() # Modified here! + # avoid session/connection leaks : do this no matter what + finally: + self.send_response(200) + self.send_header("Content-type", "text/xml") + self.send_header("Content-length", str(len(response))) + self.end_headers() + self.wfile.write(response) + self.wfile.flush() + # close db connection + self.api.close_dbsession() + # shut down the connection + self.connection.shutdown() # Modified here! ## # Taken from the web (XXX find reference). Implements an HTTPS xmlrpc server diff --git a/sfa/storage/alchemy.py b/sfa/storage/alchemy.py index e9f96dd6..84c987fa 100644 --- a/sfa/storage/alchemy.py +++ b/sfa/storage/alchemy.py @@ -49,21 +49,34 @@ class Alchemy: def check (self): self.engine.execute ("select 1").scalar() - def session (self): + def global_session (self): if self._session is None: Session=sessionmaker () self._session=Session(bind=self.engine) + logger.info('alchemy.global_session created session %s'%self._session) return self._session - def close_session (self): + def close_global_session (self): if self._session is None: return + logger.info('alchemy.close_global_session %s'%self._session) self._session.close() self._session=None + # create a dbsession to be managed separately + def session (self): + Session=sessionmaker() + session=Session (bind=self.engine) + logger.info('alchemy.session created session %s'%session) + return session + + def close_session (self, session): + logger.info('alchemy.close_session closed session %s'%session) + session.close() + #################### from sfa.util.config import Config alchemy=Alchemy (Config()) engine=alchemy.engine -dbsession=alchemy.session() +global_dbsession=alchemy.global_session() diff --git a/sfa/storage/model.py b/sfa/storage/model.py index b0950429..46e2deed 100644 --- a/sfa/storage/model.py +++ b/sfa/storage/model.py @@ -191,9 +191,7 @@ class RegAuthority (RegRecord): def __repr__ (self): return RegRecord.__repr__(self).replace("Record","Authority") - def update_pis (self, pi_hrns): - # don't ruin the import of that file in a client world - from sfa.storage.alchemy import dbsession + def update_pis (self, pi_hrns, dbsession): # strip that in case we have words pi_hrns = [ x.strip() for x in pi_hrns ] request = dbsession.query (RegUser).filter(RegUser.hrn.in_(pi_hrns)) @@ -221,9 +219,7 @@ class RegSlice (RegRecord): def __repr__ (self): return RegRecord.__repr__(self).replace("Record","Slice") - def update_researchers (self, researcher_hrns): - # don't ruin the import of that file in a client world - from sfa.storage.alchemy import dbsession + def update_researchers (self, researcher_hrns, dbsession): # strip that in case we have words researcher_hrns = [ x.strip() for x in researcher_hrns ] request = dbsession.query (RegUser).filter(RegUser.hrn.in_(researcher_hrns)) @@ -232,9 +228,12 @@ class RegSlice (RegRecord): self.reg_researchers = researchers # when dealing with credentials, we need to retrieve the PIs attached to a slice + # WARNING: with the move to passing dbsessions around, we face a glitch here because this + # helper function is called from the trust/ area that def get_pis (self): - # don't ruin the import of that file in a client world - from sfa.storage.alchemy import dbsession + from sqlalchemy.orm import sessionmaker + Session=sessionmaker() + dbsession=Session.object_session(self) from sfa.util.xrn import get_authority authority_hrn = get_authority(self.hrn) auth_record = dbsession.query(RegAuthority).filter_by(hrn=authority_hrn).first() @@ -344,8 +343,7 @@ class SliverAllocation(Base,AlchemyObj): return state @staticmethod - def set_allocations(sliver_ids, state): - from sfa.storage.alchemy import dbsession + def set_allocations(sliver_ids, state, dbsession): if not isinstance(sliver_ids, list): sliver_ids = [sliver_ids] sliver_state_updated = {} @@ -366,8 +364,7 @@ class SliverAllocation(Base,AlchemyObj): dbsession.commit() @staticmethod - def delete_allocations(sliver_ids): - from sfa.storage.alchemy import dbsession + def delete_allocations(sliver_ids, dbsession): if not isinstance(sliver_ids, list): sliver_ids = [sliver_ids] constraint = SliverAllocation.sliver_id.in_(sliver_ids) @@ -376,9 +373,7 @@ class SliverAllocation(Base,AlchemyObj): dbsession.delete(sliver_allocation) dbsession.commit() - def sync(self): - from sfa.storage.alchemy import dbsession - + def sync(self, dbsession): constraints = [SliverAllocation.sliver_id==self.sliver_id] results = dbsession.query(SliverAllocation).filter(and_(*constraints)) records = []