From 9540278b2cde0fc662542ed03059c8d6667acf8e Mon Sep 17 00:00:00 2001 From: Sandrine Avakian Date: Tue, 6 Mar 2012 10:02:16 +0100 Subject: [PATCH] Changed the import - now using generic sfa-importer.py =>creation of slabimporter.py =>modification of slab.py to take into account new importer lass. => modification of Getsites in OARrestapi . --- sfa/generic/slab.py | 7 +- sfa/importer/slabimporter.py | 273 +++++++++++++++++++++++++++++++++++ sfa/senslab/OARrestapi.py | 32 ++-- sfa/senslab/slabdriver.py | 13 +- 4 files changed, 312 insertions(+), 13 deletions(-) create mode 100644 sfa/importer/slabimporter.py diff --git a/sfa/generic/slab.py b/sfa/generic/slab.py index 5bb8710b..16c8a13b 100644 --- a/sfa/generic/slab.py +++ b/sfa/generic/slab.py @@ -11,7 +11,12 @@ class slab (Generic): # use the standard api class def api_class (self): return sfa.server.sfaapi.SfaApi - + + # the importer class + def importer_class (self): + import sfa.importer.slabimporter + return sfa.importer.slabimporter.SlabImporter + # the manager classes for the server-side services def registry_manager_class (self) : return sfa.managers.registry_manager.RegistryManager diff --git a/sfa/importer/slabimporter.py b/sfa/importer/slabimporter.py new file mode 100644 index 00000000..7d2cfabf --- /dev/null +++ b/sfa/importer/slabimporter.py @@ -0,0 +1,273 @@ +import os +import sys +import datetime +import time + +from sfa.util.config import Config +from sfa.util.xrn import Xrn, get_leaf, get_authority, hrn_to_urn +from sfa.util.plxrn import PlXrn, slicename_to_hrn, email_to_hrn, hrn_to_pl_slicename + +from sfa.senslab.LDAPapi import LDAPapi +from sfa.senslab.slabdriver import SlabDriver +from sfa.senslab.slabpostgres import SlabDB + +from sfa.trust.certificate import Keypair,convert_public_key +from sfa.trust.gid import create_uuid + +from sfa.storage.alchemy import dbsession +from sfa.storage.model import RegRecord, RegAuthority, RegSlice, RegNode, RegUser, RegKey + +def _get_site_hrn(site): + hrn = site['login_base'] + return hrn + +class SlabImporter: + + def __init__ (self, auth_hierarchy, logger): + self.auth_hierarchy = auth_hierarchy + self.logger=logger + + def hostname_to_hrn(self,root_auth,login_base,hostname): + return PlXrn(auth=root_auth,hostname=login_base+'_'+hostname).get_hrn() + + def slicename_to_hrn(self, person_hrn): + return (person_hrn +'_slice') + + def add_options (self, parser): + # we don't have any options for now + pass + + def find_record_by_type_hrn(self,type,hrn): + return self.records_by_type_hrn.get ( (type, hrn), None) + + def update_just_added_records_dict (self, record): + tuple = (record.type, record.hrn) + if tuple in self.records_by_type_hrn: + self.logger.warning ("SlabImporter.update_just_added_records_dict: duplicate (%s,%s)"%tuple) + return + self.records_by_type_hrn [ tuple ] = record + + def run (self, options): + config = Config() + interface_hrn = config.SFA_INTERFACE_HRN + root_auth = config.SFA_REGISTRY_ROOT_AUTH + slabdriver = SlabDriver(config) + + #Create special slice table for senslab + db = SlabDB() + if not db.exists('slice_senslab'): + db.createtable('slice_senslab') + ######## retrieve all existing SFA objects + all_records = dbsession.query(RegRecord).all() + # create hash by (type,hrn) + # used to know if a given record is already known to SFA + + self.records_by_type_hrn = \ + dict ( [ ( (record.type, record.hrn) , record ) for record in all_records ] ) + + # initialize record.stale to True by default, then mark stale=False on the ones that are in use + for record in all_records: + record.stale=True + + nodes_listdict = slabdriver.GetNodes() + nodes_by_id = dict([(node['node_id'],node) for node in nodes_listdict]) + sites_listdict = slabdriver.GetSites() + + ldap_person_listdict = slabdriver.GetPersons() + slices_listdict = slabdriver.GetSlices() + for site in sites_listdict: + site_hrn = _get_site_hrn(site) + site_record = self.find_record_by_type_hrn ('authority', site_hrn) + if not site_record: + try: + urn = hrn_to_urn(site_hrn, 'authority') + if not self.auth_hierarchy.auth_exists(urn): + self.auth_hierarchy.create_auth(urn) + auth_info = self.auth_hierarchy.get_auth_info(urn) + site_record = RegAuthority(hrn=site_hrn, gid=auth_info.get_gid_object(), + pointer='-1', + authority=get_authority(site_hrn)) + site_record.just_created() + dbsession.add(site_record) + dbsession.commit() + self.logger.info("SlabImporter: imported authority (site) : %s" % site_record) + self.update_just_added_records_dict (site_record) + except: + # if the site import fails then there is no point in trying to import the + # site's child records (node, slices, persons), so skip them. + self.logger.log_exc("SlabImporter: failed to import site. Skipping child records") + continue + else: + # xxx update the record ... + pass + site_record.stale=False + + # import node records in site + for node_id in site['node_ids']: + try: + node = nodes_by_id[node_id] + except: + self.logger.warning ("SlabImporter: cannot find node_id %s - ignored"%node_id) + continue + site_auth = get_authority(site_hrn) + site_name = site['login_base'] + hrn = self.hostname_to_hrn(root_auth, site_name, node['hostname']) + # xxx this sounds suspicious + if len(hrn) > 64: hrn = hrn[:64] + node_record = self.find_record_by_type_hrn( 'node', hrn ) + print >>sys.stderr, " \r\n \r\n SLAB IMPORTER node_record %s " %(node_record) + if not node_record: + try: + pkey = Keypair(create=True) + urn = hrn_to_urn(hrn, 'node') + print>>sys.stderr, "\r\n \r\n SLAB IMPORTER NODE IMPORT urn %s hrn %s" %(urn, hrn) + node_gid = self.auth_hierarchy.create_gid(urn, create_uuid(), pkey) + node_record = RegNode (hrn=hrn, gid=node_gid, + pointer =node['node_id'], + authority=get_authority(hrn)) + node_record.just_created() + dbsession.add(node_record) + dbsession.commit() + self.logger.info("SlabImporter: imported node: %s" % node_record) + print>>sys.stderr, "\r\n \t\t\t SLAB IMPORTER NODE IMPORT NOTnode_record %s " %(node_record) + self.update_just_added_records_dict(node_record) + except: + self.logger.log_exc("SlabImporter: failed to import node") + else: + # xxx update the record ... + pass + node_record.stale=False + + + # import persons + for person in ldap_person_listdict : + + person_hrn = person['hrn'] + slice_hrn = self.slicename_to_hrn(person['hrn'],'_slice') + + # xxx suspicious again + if len(person_hrn) > 64: person_hrn = person_hrn[:64] + person_urn = hrn_to_urn(person_hrn, 'user') + + user_record = self.find_record_by_type_hrn( 'user', person_hrn) + slice_record = self.find_record_by_type_hrn ('slice', slice_hrn) + print>>sys.stderr, "\r\n \r\n SLAB IMPORTER PERSON IMPORT user_record %s " %(user_record) + + + # return a tuple pubkey (a plc key object) and pkey (a Keypair object) + def init_person_key (person, slab_key): + pubkey=None + if person['pkey']: + # randomly pick first key in set + pubkey = slab_key + try: + pkey = convert_public_key(pubkey) + except: + self.logger.warn('SlabImporter: unable to convert public key for %s' % person_hrn) + pkey = Keypair(create=True) + else: + # the user has no keys. Creating a random keypair for the user's gid + self.logger.warn("SlabImporter: person %s does not have a PL public key"%person_hrn) + pkey = Keypair(create=True) + return (pubkey, pkey) + + + try: + slab_key = person['pkey'] + # new person + if not user_record: + (pubkey,pkey) = init_person_key (person, slab_key ) + person_gid = self.auth_hierarchy.create_gid(person_urn, create_uuid(), pkey) + person_gid.set_email(person['email']) + user_record = RegUser (hrn=person_hrn, gid=person_gid, + pointer='-1', + authority=get_authority(person_hrn), + email=person['email']) + if pubkey: + user_record.reg_keys=[RegKey (pubkey)] + else: + self.logger.warning("No key found for user %s"%user_record) + user_record.just_created() + dbsession.add (user_record) + dbsession.commit() + self.logger.info("SlabImporter: imported person: %s" % user_record) + print>>sys.stderr, "\r\n \r\n SLAB IMPORTER PERSON IMPORT NOTuser_record %s " %(user_record) + self.update_just_added_records_dict( user_record ) + else: + # update the record ? + # if user's primary key has changed then we need to update the + # users gid by forcing an update here + sfa_keys = user_record.reg_keys + #def key_in_list (key,sfa_keys): + #for reg_key in sfa_keys: + #if reg_key.key==key['key']: return True + #return False + # is there a new key in myplc ? + new_keys=False + if key is not sfa_keys : + new_keys = True + if new_keys: + (pubkey,pkey) = init_person_key (person, slab_key) + person_gid = self.auth_hierarchy.create_gid(person_urn, create_uuid(), pkey) + if not pubkey: + user_record.reg_keys=[] + else: + user_record.reg_keys=[ RegKey (pubkey)] + self.logger.info("SlabImporter: updated person: %s" % user_record) + user_record.email = person['email'] + dbsession.commit() + user_record.stale=False + except: + self.logger.log_exc("SlabImporter: failed to import person %s"%(person) ) + + if not slice_record: + try: + pkey = Keypair(create=True) + urn = hrn_to_urn(slice_hrn, 'slice') + slice_gid = self.auth_hierarchy.create_gid(urn, create_uuid(), pkey) + slice_record = RegSlice (hrn=slice_hrn, gid=slice_gid, + pointer='-1', + authority=get_authority(slice_hrn)) + print>>sys.stderr, "\r\n \r\n SLAB IMPORTER SLICE IMPORT NOTslice_record%s " %(slice_record) + slice_record.just_created() + dbsession.add(slice_record) + dbsession.commit() + self.logger.info("SlabImporter: imported slice: %s" % slice_record) + self.remember_record ( slice_record ) + except: + self.logger.log_exc("SlabImporter: failed to import slice") + else: + # xxx update the record ... + self.logger.warning ("Slice update not yet implemented") + pass + # record current users affiliated with the slice + slice_record.reg_researchers = \ + [ self.locate_by_type_pointer ('user',user_id) for user_id in slice['person_ids'] ] + dbsession.commit() + slice_record.stale=False + + + + ### remove stale records + # special records must be preserved + system_hrns = [interface_hrn, root_auth, interface_hrn + '.slicemanager'] + for record in all_records: + if record.hrn in system_hrns: + record.stale=False + if record.peer_authority: + record.stale=False + + + for record in all_records: + try: + stale=record.stale + except: + stale=True + self.logger.warning("stale not found with %s"%record) + if stale: + self.logger.info("SlabImporter: deleting stale record: %s" % record) + dbsession.delete(record) + dbsession.commit() + + + \ No newline at end of file diff --git a/sfa/senslab/OARrestapi.py b/sfa/senslab/OARrestapi.py index b2705c7d..9e08418d 100644 --- a/sfa/senslab/OARrestapi.py +++ b/sfa/senslab/OARrestapi.py @@ -32,6 +32,7 @@ OARrequests_get_uri_dict = { 'GET_version': '/oarapi/version.json', 'GET_jobs_details': '/oarapi/jobs/details.json', 'GET_resources_full': '/oarapi/resources/full.json', 'GET_resources':'/oarapi/resources.json', + 'GET_sites' : '/oarapi/resources/full.json', } @@ -278,6 +279,14 @@ class OARGETParser: self.ParseNodes() self.ParseSites() return self.node_dictlist + + def ParseResourcesFullSites(self ) : + if self.version_json_dict['apilib_version'] != "0.2.10" : + self.raw_json = self.raw_json['items'] + self.ParseNodes() + self.ParseSites() + return self.site_dict + resources_fulljson_dict= { 'resource_id' : AddNodeId, @@ -342,17 +351,17 @@ class OARGETParser: #if node_id is 1: #print>>sys.stderr, " \r\n \r\n \t \t\t\t OARESTAPI Parse Sites self.node_dictlist %s " %(self.node_dictlist) if node['site_login_base'] not in self.site_dict.keys(): - self.site_dict[node['site_login_base']] = [('login_base', node['site_login_base']),\ - ('node_ids',nodes_per_site[node['site_login_base']]),\ - ('latitude',"48.83726"),\ - ('longitude',"- 2.10336"),('name',"senslab"),\ - ('pcu_ids', []), ('max_slices', None), ('ext_consortium_id', None),\ - ('max_slivers', None), ('is_public', True), ('peer_site_id', None),\ - ('abbreviated_name', "senslab"), ('address_ids', []),\ - ('url', "http,//www.senslab.info"), ('person_ids', []),\ - ('site_tag_ids', []), ('enabled', True), ('slice_ids', []),\ - ('date_created', None), ('peer_id', None),] - self.site_dict[node['site_login_base']] = dict(self.site_dict[node['site_login_base']]) + self.site_dict[node['site_login_base']] = {'login_base':node['site_login_base'], + 'node_ids':nodes_per_site[node['site_login_base']], + 'latitude':"48.83726", + 'longitude':"- 2.10336",'name':"senslab", + 'pcu_ids':[], 'max_slices':None, 'ext_consortium_id':None, + 'max_slivers':None, 'is_public':True, 'peer_site_id': None, + 'abbreviated_name':"senslab", 'address_ids': [], + 'url':"http,//www.senslab.info", 'person_ids':[], + 'site_tag_ids':[], 'enabled': True, 'slice_ids':[], + 'date_created': None, 'peer_id': None } + @@ -365,6 +374,7 @@ class OARGETParser: 'GET_jobs_table': {'uri':'/oarapi/jobs/table.json','parse_func': ParseJobsTable}, 'GET_jobs_details': {'uri':'/oarapi/jobs/details.json','parse_func': ParseJobsDetails}, 'GET_resources_full': {'uri':'/oarapi/resources/full.json','parse_func': ParseResourcesFull}, + 'GET_sites':{'uri':'/oarapi/resources/full.json','parse_func': ParseResourcesFullSites}, 'GET_resources':{'uri':'/oarapi/resources.json' ,'parse_func': ParseResources}, 'DELETE_jobs_id':{'uri':'/oarapi/jobs/id.json' ,'parse_func': ParseDeleteJobs} } diff --git a/sfa/senslab/slabdriver.py b/sfa/senslab/slabdriver.py index 68b84828..5d7a1522 100644 --- a/sfa/senslab/slabdriver.py +++ b/sfa/senslab/slabdriver.py @@ -509,10 +509,21 @@ class SlabDriver(Driver): return return_node_list + def GetSites(self, site_filter = None, return_fields=None): + site_dict =self.oar.parser.SendRequest("GET_sites") + print>>sys.stderr, "\r\n \r\n \t\t SLABDRIVER.PY GetSites " + return_site_list = [] + if not ( site_filter or return_fields): + return_site_list = site_dict.values() + return return_site_list + + return_site_list = parse_filter(site_dict.values(), site_filter,'site', return_fields) + return return_site_list + def GetSlices(self,slice_filter = None, return_fields=None): - sliceslist = self.db.find('slice',columns = ['oar_job_id', 'slice_hrn', 'record_id_slice','record_id_user'], record_filter=slice_filter) + sliceslist = self.db.find('slice_senslab',columns = ['oar_job_id', 'slice_hrn', 'record_id_slice','record_id_user'], record_filter=slice_filter) print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices slices %s slice_filter %s " %(sliceslist,slice_filter) -- 2.47.0