Changed the import - now using generic sfa-importer.py
authorSandrine Avakian <sandrine.avakian@inria.fr>
Tue, 6 Mar 2012 09:02:16 +0000 (10:02 +0100)
committerSandrine Avakian <sandrine.avakian@inria.fr>
Tue, 6 Mar 2012 09:02:16 +0000 (10:02 +0100)
=>creation of slabimporter.py
=>modification of slab.py to take into account new importer lass.
=> modification of Getsites in OARrestapi .

sfa/generic/slab.py
sfa/importer/slabimporter.py [new file with mode: 0644]
sfa/senslab/OARrestapi.py
sfa/senslab/slabdriver.py

index 5bb8710..16c8a13 100644 (file)
@@ -11,7 +11,12 @@ class slab (Generic):
     # use the standard api class
     def api_class (self):
         return sfa.server.sfaapi.SfaApi
-
+    
+    # the importer class
+    def importer_class (self): 
+        import sfa.importer.slabimporter
+        return sfa.importer.slabimporter.SlabImporter
+    
     # the manager classes for the server-side services
     def registry_manager_class (self) : 
         return sfa.managers.registry_manager.RegistryManager
diff --git a/sfa/importer/slabimporter.py b/sfa/importer/slabimporter.py
new file mode 100644 (file)
index 0000000..7d2cfab
--- /dev/null
@@ -0,0 +1,273 @@
+import os
+import sys
+import datetime
+import time
+
+from sfa.util.config import Config
+from sfa.util.xrn import Xrn, get_leaf, get_authority, hrn_to_urn
+from sfa.util.plxrn import PlXrn, slicename_to_hrn, email_to_hrn, hrn_to_pl_slicename
+
+from sfa.senslab.LDAPapi import LDAPapi
+from sfa.senslab.slabdriver import SlabDriver
+from sfa.senslab.slabpostgres import SlabDB
+
+from sfa.trust.certificate import Keypair,convert_public_key
+from sfa.trust.gid import create_uuid
+
+from sfa.storage.alchemy import dbsession
+from sfa.storage.model import RegRecord, RegAuthority, RegSlice, RegNode, RegUser, RegKey
+
+def _get_site_hrn(site):
+    hrn = site['login_base'] 
+    return hrn
+
+class SlabImporter:
+    
+    def __init__ (self, auth_hierarchy, logger):
+        self.auth_hierarchy = auth_hierarchy
+        self.logger=logger
+       
+    def hostname_to_hrn(self,root_auth,login_base,hostname):
+        return PlXrn(auth=root_auth,hostname=login_base+'_'+hostname).get_hrn()   
+    
+    def slicename_to_hrn(self, person_hrn):
+        return  (person_hrn +'_slice')
+    
+    def add_options (self, parser):
+        # we don't have any options for now
+        pass
+    
+    def find_record_by_type_hrn(self,type,hrn):
+        return self.records_by_type_hrn.get ( (type, hrn), None)
+    
+    def update_just_added_records_dict (self, record):
+        tuple = (record.type, record.hrn)
+        if tuple in self.records_by_type_hrn:
+            self.logger.warning ("SlabImporter.update_just_added_records_dict: duplicate (%s,%s)"%tuple)
+            return
+        self.records_by_type_hrn [ tuple ] = record
+        
+    def run (self, options):
+        config = Config()
+        interface_hrn = config.SFA_INTERFACE_HRN
+        root_auth = config.SFA_REGISTRY_ROOT_AUTH
+        slabdriver = SlabDriver(config)
+        
+        #Create special slice table for senslab 
+        db = SlabDB()
+        if not db.exists('slice_senslab'):
+            db.createtable('slice_senslab')
+       ######## retrieve all existing SFA objects
+        all_records = dbsession.query(RegRecord).all()
+         # create hash by (type,hrn) 
+        # used  to know if a given record is already known to SFA 
+       
+        self.records_by_type_hrn = \
+            dict ( [ ( (record.type, record.hrn) , record ) for record in all_records ] )
+
+        # initialize record.stale to True by default, then mark stale=False on the ones that are in use
+        for record in all_records: 
+            record.stale=True
+        
+        nodes_listdict  = slabdriver.GetNodes()
+        nodes_by_id = dict([(node['node_id'],node) for node in nodes_listdict])
+        sites_listdict  = slabdriver.GetSites()
+    
+        ldap_person_listdict = slabdriver.GetPersons()
+        slices_listdict = slabdriver.GetSlices()
+        for site in sites_listdict:
+            site_hrn = _get_site_hrn(site) 
+            site_record = self.find_record_by_type_hrn ('authority', site_hrn)
+            if not site_record:
+                try:
+                    urn = hrn_to_urn(site_hrn, 'authority')
+                    if not self.auth_hierarchy.auth_exists(urn):
+                        self.auth_hierarchy.create_auth(urn)
+                    auth_info = self.auth_hierarchy.get_auth_info(urn)
+                    site_record = RegAuthority(hrn=site_hrn, gid=auth_info.get_gid_object(),
+                                               pointer='-1',
+                                               authority=get_authority(site_hrn))
+                    site_record.just_created()
+                    dbsession.add(site_record)
+                    dbsession.commit()
+                    self.logger.info("SlabImporter: imported authority (site) : %s" % site_record) 
+                    self.update_just_added_records_dict (site_record)
+                except:
+                    # if the site import fails then there is no point in trying to import the
+                    # site's child records (node, slices, persons), so skip them.
+                    self.logger.log_exc("SlabImporter: failed to import site. Skipping child records") 
+                    continue
+            else:
+                # xxx update the record ...
+                pass
+            site_record.stale=False 
+            
+         # import node records in site
+            for node_id in site['node_ids']:
+                try:
+                    node = nodes_by_id[node_id]
+                except:
+                    self.logger.warning ("SlabImporter: cannot find node_id %s - ignored"%node_id)
+                    continue 
+                site_auth = get_authority(site_hrn)
+                site_name = site['login_base']
+                hrn =  self.hostname_to_hrn(root_auth, site_name, node['hostname'])
+                # xxx this sounds suspicious
+                if len(hrn) > 64: hrn = hrn[:64]
+                node_record = self.find_record_by_type_hrn( 'node', hrn )
+                print >>sys.stderr, " \r\n \r\n SLAB IMPORTER node_record %s " %(node_record)
+                if not node_record:
+                    try:
+                        pkey = Keypair(create=True)
+                        urn = hrn_to_urn(hrn, 'node') 
+                        print>>sys.stderr, "\r\n \r\n SLAB IMPORTER NODE IMPORT urn %s hrn %s" %(urn, hrn)  
+                        node_gid = self.auth_hierarchy.create_gid(urn, create_uuid(), pkey)
+                        node_record = RegNode (hrn=hrn, gid=node_gid, 
+                                                pointer =node['node_id'],
+                                                authority=get_authority(hrn))
+                        node_record.just_created()
+                        dbsession.add(node_record)
+                        dbsession.commit()
+                        self.logger.info("SlabImporter: imported node: %s" % node_record)
+                        print>>sys.stderr, "\r\n \t\t\t SLAB IMPORTER NODE IMPORT NOTnode_record %s " %(node_record)  
+                        self.update_just_added_records_dict(node_record)
+                    except:
+                        self.logger.log_exc("SlabImporter: failed to import node") 
+                else:
+                    # xxx update the record ...
+                    pass
+                node_record.stale=False
+                    
+                    
+            # import persons
+            for person in ldap_person_listdict : 
+            
+                person_hrn = person['hrn']
+                slice_hrn = self.slicename_to_hrn(person['hrn'],'_slice')
+               
+                # xxx suspicious again
+                if len(person_hrn) > 64: person_hrn = person_hrn[:64]
+                person_urn = hrn_to_urn(person_hrn, 'user')
+    
+                user_record = self.find_record_by_type_hrn( 'user', person_hrn)
+                slice_record = self.find_record_by_type_hrn ('slice', slice_hrn)
+                print>>sys.stderr, "\r\n \r\n SLAB IMPORTER PERSON IMPORT user_record %s " %(user_record)
+                
+                
+                # return a tuple pubkey (a plc key object) and pkey (a Keypair object)
+                def init_person_key (person, slab_key):
+                    pubkey=None
+                    if  person['pkey']:
+                        # randomly pick first key in set
+                        pubkey = slab_key
+                        try:
+                            pkey = convert_public_key(pubkey)
+                        except:
+                            self.logger.warn('SlabImporter: unable to convert public key for %s' % person_hrn)
+                            pkey = Keypair(create=True)
+                    else:
+                        # the user has no keys. Creating a random keypair for the user's gid
+                        self.logger.warn("SlabImporter: person %s does not have a PL public key"%person_hrn)
+                        pkey = Keypair(create=True)
+                    return (pubkey, pkey)
+                                
+                 
+                try:
+                    slab_key = person['pkey']
+                    # new person
+                    if not user_record:
+                        (pubkey,pkey) = init_person_key (person, slab_key )
+                        person_gid = self.auth_hierarchy.create_gid(person_urn, create_uuid(), pkey)
+                        person_gid.set_email(person['email'])
+                        user_record = RegUser (hrn=person_hrn, gid=person_gid, 
+                                                 pointer='-1', 
+                                                 authority=get_authority(person_hrn),
+                                                 email=person['email'])
+                        if pubkey: 
+                            user_record.reg_keys=[RegKey (pubkey)]
+                        else:
+                            self.logger.warning("No key found for user %s"%user_record)
+                        user_record.just_created()
+                        dbsession.add (user_record)
+                        dbsession.commit()
+                        self.logger.info("SlabImporter: imported person: %s" % user_record)
+                        print>>sys.stderr, "\r\n \r\n SLAB IMPORTER PERSON IMPORT NOTuser_record %s " %(user_record)
+                        self.update_just_added_records_dict( user_record )
+                    else:
+                        # update the record ?
+                        # if user's primary key has changed then we need to update the 
+                        # users gid by forcing an update here
+                        sfa_keys = user_record.reg_keys
+                        #def key_in_list (key,sfa_keys):
+                            #for reg_key in sfa_keys:
+                                #if reg_key.key==key['key']: return True
+                            #return False
+                        # is there a new key in myplc ?
+                        new_keys=False
+                        if key is not sfa_keys : 
+                            new_keys = True
+                        if new_keys:
+                            (pubkey,pkey) = init_person_key (person, slab_key)
+                            person_gid = self.auth_hierarchy.create_gid(person_urn, create_uuid(), pkey)
+                            if not pubkey:
+                                user_record.reg_keys=[]
+                            else:
+                                user_record.reg_keys=[ RegKey (pubkey)]
+                            self.logger.info("SlabImporter: updated person: %s" % user_record)
+                    user_record.email = person['email']
+                    dbsession.commit()
+                    user_record.stale=False
+                except:
+                    self.logger.log_exc("SlabImporter: failed to import person  %s"%(person) )       
+                    
+                if not slice_record:
+                    try:
+                        pkey = Keypair(create=True)
+                        urn = hrn_to_urn(slice_hrn, 'slice')
+                        slice_gid = self.auth_hierarchy.create_gid(urn, create_uuid(), pkey)
+                        slice_record = RegSlice (hrn=slice_hrn, gid=slice_gid, 
+                                                    pointer='-1',
+                                                    authority=get_authority(slice_hrn))
+                        print>>sys.stderr, "\r\n \r\n SLAB IMPORTER SLICE IMPORT NOTslice_record%s " %(slice_record)
+                        slice_record.just_created()
+                        dbsession.add(slice_record)
+                        dbsession.commit()
+                        self.logger.info("SlabImporter: imported slice: %s" % slice_record)  
+                        self.remember_record ( slice_record )
+                    except:
+                        self.logger.log_exc("SlabImporter: failed to import slice")
+                else:
+                    # xxx update the record ...
+                    self.logger.warning ("Slice update not yet implemented")
+                    pass
+                # record current users affiliated with the slice
+                slice_record.reg_researchers = \
+                    [ self.locate_by_type_pointer ('user',user_id) for user_id in slice['person_ids'] ]
+                dbsession.commit()
+                slice_record.stale=False 
+                       
+  
+                 
+         ### remove stale records
+        # special records must be preserved
+        system_hrns = [interface_hrn, root_auth, interface_hrn + '.slicemanager']
+        for record in all_records: 
+            if record.hrn in system_hrns: 
+                record.stale=False
+            if record.peer_authority:
+                record.stale=False
+          
+
+        for record in all_records:
+            try:        
+                stale=record.stale
+            except:     
+                stale=True
+                self.logger.warning("stale not found with %s"%record)
+            if stale:
+                self.logger.info("SlabImporter: deleting stale record: %s" % record)
+                dbsession.delete(record)
+                dbsession.commit()         
+                 
+
+  
\ No newline at end of file
index b2705c7..9e08418 100644 (file)
@@ -32,6 +32,7 @@ OARrequests_get_uri_dict = { 'GET_version': '/oarapi/version.json',
                        'GET_jobs_details': '/oarapi/jobs/details.json',
                        'GET_resources_full': '/oarapi/resources/full.json',
                        'GET_resources':'/oarapi/resources.json',
+                        'GET_sites' : '/oarapi/resources/full.json',
                         
                         
 }
@@ -278,6 +279,14 @@ class OARGETParser:
         self.ParseNodes()
         self.ParseSites()
         return self.node_dictlist
+        
+    def ParseResourcesFullSites(self ) :
+        if self.version_json_dict['apilib_version'] != "0.2.10" :
+                self.raw_json = self.raw_json['items']
+        self.ParseNodes()
+        self.ParseSites()
+        return self.site_dict
+        
 
     resources_fulljson_dict= {
         'resource_id' : AddNodeId,
@@ -342,17 +351,17 @@ class OARGETParser:
             #if node_id is 1:
                 #print>>sys.stderr, " \r\n \r\n \t \t\t\t OARESTAPI Parse Sites self.node_dictlist %s " %(self.node_dictlist)
             if node['site_login_base'] not in self.site_dict.keys():
-                self.site_dict[node['site_login_base']] = [('login_base', node['site_login_base']),\
-                                                        ('node_ids',nodes_per_site[node['site_login_base']]),\
-                                                        ('latitude',"48.83726"),\
-                                                        ('longitude',"- 2.10336"),('name',"senslab"),\
-                                                        ('pcu_ids', []), ('max_slices', None), ('ext_consortium_id', None),\
-                                                        ('max_slivers', None), ('is_public', True), ('peer_site_id', None),\
-                                                        ('abbreviated_name', "senslab"), ('address_ids', []),\
-                                                        ('url', "http,//www.senslab.info"), ('person_ids', []),\
-                                                        ('site_tag_ids', []), ('enabled', True),  ('slice_ids', []),\
-                                                        ('date_created', None), ('peer_id', None),]
-                self.site_dict[node['site_login_base']] = dict(self.site_dict[node['site_login_base']])
+                self.site_dict[node['site_login_base']] = {'login_base':node['site_login_base'],
+                                                        'node_ids':nodes_per_site[node['site_login_base']],
+                                                        'latitude':"48.83726",
+                                                        'longitude':"- 2.10336",'name':"senslab",
+                                                        'pcu_ids':[], 'max_slices':None, 'ext_consortium_id':None,
+                                                        'max_slivers':None, 'is_public':True, 'peer_site_id': None,
+                                                        'abbreviated_name':"senslab", 'address_ids': [],
+                                                        'url':"http,//www.senslab.info", 'person_ids':[],
+                                                        'site_tag_ids':[], 'enabled': True,  'slice_ids':[],
+                                                        'date_created': None, 'peer_id': None } 
+
                         
 
 
@@ -365,6 +374,7 @@ class OARGETParser:
         'GET_jobs_table': {'uri':'/oarapi/jobs/table.json','parse_func': ParseJobsTable},
         'GET_jobs_details': {'uri':'/oarapi/jobs/details.json','parse_func': ParseJobsDetails},
         'GET_resources_full': {'uri':'/oarapi/resources/full.json','parse_func': ParseResourcesFull},
+        'GET_sites':{'uri':'/oarapi/resources/full.json','parse_func': ParseResourcesFullSites},
         'GET_resources':{'uri':'/oarapi/resources.json' ,'parse_func': ParseResources},
         'DELETE_jobs_id':{'uri':'/oarapi/jobs/id.json' ,'parse_func': ParseDeleteJobs}
         }
index 68b8482..5d7a152 100644 (file)
@@ -509,10 +509,21 @@ class SlabDriver(Driver):
         return return_node_list
     
   
+    def GetSites(self, site_filter = None, return_fields=None):
+        site_dict =self.oar.parser.SendRequest("GET_sites")
+        print>>sys.stderr, "\r\n \r\n \t\t  SLABDRIVER.PY GetSites " 
+        return_site_list = []
+        if not ( site_filter or return_fields):
+                return_site_list = site_dict.values()
+                return return_site_list
+    
+        return_site_list = parse_filter(site_dict.values(), site_filter,'site', return_fields)
+        return return_site_list
+        
     
     def GetSlices(self,slice_filter = None, return_fields=None):
 
-        sliceslist = self.db.find('slice',columns = ['oar_job_id', 'slice_hrn', 'record_id_slice','record_id_user'], record_filter=slice_filter)
+        sliceslist = self.db.find('slice_senslab',columns = ['oar_job_id', 'slice_hrn', 'record_id_slice','record_id_user'], record_filter=slice_filter)
         
         print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY  GetSlices  slices %s slice_filter %s " %(sliceslist,slice_filter)