docstring OK in slabimporter.py
[sfa.git] / sfa / importer / slabimporter.py
1 from sfa.util.config import Config
2 from sfa.util.xrn import Xrn, get_authority, hrn_to_urn
3
4 from sfa.senslab.slabdriver import SlabDriver
5
6 from sfa.trust.certificate import Keypair, convert_public_key
7 from sfa.trust.gid import create_uuid
8
9 from sfa.storage.alchemy import dbsession
10 from sfa.storage.model import RegRecord, RegAuthority, RegSlice, RegNode, \
11                                                     RegUser, RegKey
12
13
14 from sqlalchemy.exc import SQLAlchemyError
15
16
17
18 class SlabImporter:
19     """ 
20     SlabImporter class, generic importer_class. Used to populate the SFA DB
21     with senslab resources' records. 
22     Used to update records when new resources, users or nodes, are added
23     or deleted.
24     """
25      
26     def __init__ (self, auth_hierarchy, loc_logger):
27         """ 
28         Sets and defines import logger and the authority name. Gathers all the 
29         records already registerd in the SFA DB, broke them into 3 dicts,
30         by type and hrn, by email and by type and pointer.
31         
32         :param auth_hierarchy: authority name
33         :type auth_hierarchy: string
34         :param loc_logger: local logger
35         :type loc_logger: _SfaLogger
36         
37         """
38         self.auth_hierarchy = auth_hierarchy
39         self.logger = loc_logger
40         self.logger.setLevelDebug()
41         #retrieve all existing SFA objects
42         self.all_records = dbsession.query(RegRecord).all()
43         
44         # initialize record.stale to True by default, 
45         # then mark stale=False on the ones that are in use
46         for record in self.all_records: 
47             record.stale = True
48         #create hash by (type,hrn) 
49         #used  to know if a given record is already known to SFA 
50         self.records_by_type_hrn = \
51             dict([( (record.type,record.hrn), record) \
52                                         for record in self.all_records])
53
54         self.users_rec_by_email = \
55             dict([ (record.email, record) \
56                 for record in self.all_records if record.type == 'user'])
57             
58         # create hash by (type,pointer) 
59         self.records_by_type_pointer = \
60             dict([ ( (str(record.type), record.pointer) , record) \
61                 for record in self.all_records  if record.pointer != -1])
62             
63             
64         
65     @staticmethod
66     def hostname_to_hrn_escaped(root_auth, hostname):
67         """ Returns a node's hrn based on its hostname and the root 
68         authority and by removing special caracters from the hostname.
69         
70         :param root_auth: root authority name
71         :param hostname: nodes's hostname
72         :type  root_auth: string
73         :type hostname: string
74         :rtype: string
75         """
76         return '.'.join( [root_auth, Xrn.escape(hostname)] )
77
78
79     @staticmethod
80     def slicename_to_hrn(person_hrn):
81         """Returns the slicename associated to a given person's hrn.
82         
83         :param person_hrn: user's hrn
84         :type person_hrn: string
85         :rtype: string
86         """
87         return  (person_hrn +'_slice')
88     
89     def add_options (self, parser):
90         # we don't have any options for now
91         pass
92     
93     def find_record_by_type_hrn(self, record_type, hrn):
94         """Returns the record associated with a given hrn and hrn type.
95         Returns None if the key tuple is not in dictionary. 
96         
97         :param record_type: the record's type (slice, node, authority...)
98         :type  record_type: string
99         :param hrn: Human readable name of the object's record
100         :type hrn: string
101         :rtype: RegUser if user, RegSlice if slice, RegNode if node...
102                 or None if record does not exist.
103         
104         """
105         return self.records_by_type_hrn.get ( (record_type, hrn), None)
106     
107     def locate_by_type_pointer (self, record_type, pointer):
108         """Returns the record corresponding to the key pointer and record
109         type. Returns None if the record does not exist and is not in the
110         records_by_type_pointer dictionnary.
111         
112         :param record_type: the record's type (slice, node, authority...)
113         :type  record_type: string
114         :param pointer:Pointer to where the record is in the origin db, 
115         used in case the record comes from a trusted authority.
116         :type pointer: integer
117         :rtype: RegUser if user, RegSlice if slice, RegNode if node...
118                 or None if record does not exist.
119         """
120         return self.records_by_type_pointer.get ( (record_type, pointer), None)
121         
122     
123     def update_just_added_records_dict (self, record):
124         """Updates the records_by_type_hrn dictionnary if the record has 
125         just been created.
126         
127         :param record: Record to add in the records_by_type_hrn dict.
128         :type record: dictionary
129         """
130         rec_tuple = (record.type, record.hrn)
131         if rec_tuple in self.records_by_type_hrn:
132             self.logger.warning ("SlabImporter.update_just_added_records_dict:\
133                         duplicate (%s,%s)"%rec_tuple)
134             return
135         self.records_by_type_hrn [ rec_tuple ] = record
136         
137     def import_sites_and_nodes(self, slabdriver):
138         """ Gets all the sites and nodes from OAR, process the information,
139         creates hrns and RegAuthority for sites, and feed them to the database.
140         For each site, import the site's nodes to the DB by calling 
141         import_nodes.
142         
143         :param slabdriver: SlabDriver object, used to have access to slabdriver
144         methods and fetching info on sites and nodes.
145         :type slabdriver: SlabDriver  
146         """
147         
148         sites_listdict  = slabdriver.slab_api.GetSites()  
149         nodes_listdict  = slabdriver.slab_api.GetNodes()
150         nodes_by_id = dict([(node['node_id'], node) for node in nodes_listdict])
151         for site in sites_listdict:
152             site_hrn = site['name']
153             site_record = self.find_record_by_type_hrn ('authority', site_hrn)
154             if not site_record:
155                 try:
156                     urn = hrn_to_urn(site_hrn, 'authority') 
157                     if not self.auth_hierarchy.auth_exists(urn):
158                         self.auth_hierarchy.create_auth(urn)
159                         
160                     auth_info = self.auth_hierarchy.get_auth_info(urn)
161                     site_record = RegAuthority(hrn=site_hrn, \
162                                             gid=auth_info.get_gid_object(),
163                                             pointer='-1',
164                                             authority=get_authority(site_hrn))
165                     site_record.just_created()
166                     dbsession.add(site_record)
167                     dbsession.commit()
168                     self.logger.info("SlabImporter: imported authority (site) \
169                          %s" % site_record) 
170                     self.update_just_added_records_dict(site_record)
171                 except SQLAlchemyError:
172                     # if the site import fails then there is no point in 
173                     # trying to import the
174                     # site's child records(node, slices, persons), so skip them.
175                     self.logger.log_exc("SlabImporter: failed to import site. \
176                         Skipping child records") 
177                     continue
178             else:
179                 # xxx update the record ...
180                 pass
181             
182             
183             site_record.stale = False
184             self.import_nodes(site['node_ids'], nodes_by_id, slabdriver)
185             
186             return 
187         
188     def import_nodes(self, site_node_ids, nodes_by_id, slabdriver) :
189         """  Creates appropriate hostnames and RegNode records for
190         each node in site_node_ids, based on the information given by the
191         dict nodes_by_id that was made from data from OAR. 
192         Saves the records to the DB.
193         
194         :param site_node_ids: site's node ids
195         :type site_node_ids: list of integers
196         :param nodes_by_id: dictionary , key is the node id, value is the a dict
197         with node information.
198         :type nodes_by_id: dictionary
199         :param slabdriver:SlabDriver object, used to have access to slabdriver
200         attributes.
201         :type slabdriver:SlabDriver
202         
203         """
204        
205         for node_id in site_node_ids:
206             try:
207                 node = nodes_by_id[node_id]
208             except KeyError:
209                 self.logger.warning ("SlabImporter: cannot find node_id %s \
210                         - ignored" %(node_id))
211                 continue             
212             escaped_hrn =  \
213             self.hostname_to_hrn_escaped(slabdriver.slab_api.root_auth, \
214             node['hostname'])
215             self.logger.info("SLABIMPORTER node %s " %(node))               
216             hrn =  node['hrn']
217
218
219             # xxx this sounds suspicious
220             if len(hrn) > 64: 
221                 hrn = hrn[:64]
222             node_record = self.find_record_by_type_hrn( 'node', hrn )
223             if not node_record:
224                 pkey = Keypair(create=True)
225                 urn = hrn_to_urn(escaped_hrn, 'node') 
226                 node_gid = \
227                     self.auth_hierarchy.create_gid(urn, \
228                     create_uuid(), pkey)
229                     
230                 def slab_get_authority(hrn):
231                     return hrn.split(".")[0]
232                     
233                 node_record = RegNode(hrn=hrn, gid=node_gid, 
234                                     pointer = '-1',
235                                     authority=slab_get_authority(hrn))
236                 try:
237                         
238                     node_record.just_created()
239                     dbsession.add(node_record)
240                     dbsession.commit()
241                     self.logger.info("SlabImporter: imported node: %s" \
242                                 % node_record)  
243                     self.update_just_added_records_dict(node_record)
244                 except SQLAlchemyError:
245                     self.logger.log_exc("SlabImporter: \
246                                     failed to import node") 
247             else:
248                 #TODO:  xxx update the record ...
249                 pass
250             node_record.stale = False
251                     
252
253     def init_person_key (self, person, slab_key):
254         """ Returns a tuple pubkey and pkey.
255         
256         :param person Person's data.
257         :type person: dict
258         :param slab_key: SSH public key, from LDAP user's data. 
259         RSA type supported.
260         :type slab_key: string
261         :rtype (string, Keypair)
262         """
263         pubkey = None
264         if  person['pkey']:
265             # randomly pick first key in set
266             pubkey = slab_key
267             
268             try:
269                 pkey = convert_public_key(pubkey)
270             except TypeError:
271                 #key not good. create another pkey
272                 self.logger.warn('SlabImporter: \
273                                     unable to convert public \
274                                     key for %s' %person['hrn'])
275                 pkey = Keypair(create=True)
276             
277         else:
278             # the user has no keys. 
279             #Creating a random keypair for the user's gid
280             self.logger.warn("SlabImporter: person %s does not have a  \
281                         public key" %(person['hrn']))
282             pkey = Keypair(create=True) 
283         return (pubkey, pkey)
284                                 
285         
286     def import_persons_and_slices(self, slabdriver):
287         """
288         Gets user data from LDAP, process the information.
289         Creates hrn for the user's slice, the user's gid, creates
290         the RegUser record associated with user. Creates the RegKey record
291         associated nwith the user's key.
292         Saves those records into the SFA DB.
293         import the user's slice onto the database as well by calling
294         import_slice.
295       
296         :param slabdriver:SlabDriver object, used to have access to slabdriver
297         attributes.
298         :type slabdriver:SlabDriver
299         """
300         ldap_person_listdict = slabdriver.slab_api.GetPersons()
301         self.logger.info("SLABIMPORT \t ldap_person_listdict %s \r\n" \
302                 %(ldap_person_listdict)) 
303         
304          # import persons
305         for person in ldap_person_listdict : 
306             
307             self.logger.info("SlabImporter: person :" %(person))
308             if 'ssh-rsa' not in person['pkey']:
309                 #people with invalid ssh key (ssh-dss, empty, bullshit keys...)
310                 #won't be imported
311                 continue
312             person_hrn = person['hrn']
313             slice_hrn = self.slicename_to_hrn(person['hrn'])
314             
315             # xxx suspicious again
316             if len(person_hrn) > 64: 
317                 person_hrn = person_hrn[:64]
318             person_urn = hrn_to_urn(person_hrn, 'user')
319             
320             
321             self.logger.info("SlabImporter: users_rec_by_email %s " \
322                                             %(self.users_rec_by_email))
323             
324             #Check if user using person['email'] from LDAP is already registered
325             #in SFA. One email = one person. In this case, do not create another
326             #record for this person
327             #person_hrn returned by GetPerson based on senslab root auth + 
328             #uid ldap
329             user_record = self.find_record_by_type_hrn('user', person_hrn)
330             
331             if not user_record and  person['email'] in self.users_rec_by_email:
332                 user_record = self.users_rec_by_email[person['email']]
333                 person_hrn = user_record.hrn
334                 person_urn = hrn_to_urn(person_hrn, 'user')
335                 
336             
337             slice_record = self.find_record_by_type_hrn ('slice', slice_hrn)
338                     
339             slab_key = person['pkey']
340             # new person
341             if not user_record:
342                 (pubkey, pkey) = self.init_person_key(person, slab_key)
343                 if pubkey is not None and pkey is not None :
344                     person_gid = \
345                     self.auth_hierarchy.create_gid(person_urn, \
346                     create_uuid(), pkey)
347                     if person['email']:
348                         self.logger.debug( "SLAB IMPORTER \
349                             PERSON EMAIL OK email %s " %(person['email']))
350                         person_gid.set_email(person['email'])
351                         user_record = RegUser(hrn=person_hrn, \
352                                             gid=person_gid, 
353                                             pointer='-1', 
354                                             authority=get_authority(person_hrn),
355                                             email=person['email'])
356                     else:
357                         user_record = RegUser(hrn=person_hrn, \
358                                             gid=person_gid, 
359                                             pointer='-1', 
360                                             authority=get_authority(person_hrn))
361                         
362                     if pubkey: 
363                         user_record.reg_keys = [RegKey(pubkey)]
364                     else:
365                         self.logger.warning("No key found for user %s" \
366                         %(user_record))
367                             
368                         try:    
369                             user_record.just_created()
370                             dbsession.add (user_record)
371                             dbsession.commit()
372                             self.logger.info("SlabImporter: imported person %s"\
373                             %(user_record))
374                             self.update_just_added_records_dict( user_record )
375                             
376                         except SQLAlchemyError:
377                             self.logger.log_exc("SlabImporter: \
378                                 failed to import person  %s"%(person))       
379             else:
380                 # update the record ?
381                 # if user's primary key has changed then we need to update 
382                 # the users gid by forcing an update here
383                 sfa_keys = user_record.reg_keys
384                 
385                 new_key = False
386                 if slab_key is not sfa_keys : 
387                     new_key = True
388                 if new_key:
389                     self.logger.info("SlabImporter: \t \t USER UPDATE \
390                         person: %s" %(person['hrn']))
391                     (pubkey, pkey) = self.init_person_key (person, slab_key)
392                     person_gid = \
393                         self.auth_hierarchy.create_gid(person_urn, \
394                         create_uuid(), pkey)
395                     if not pubkey:
396                         user_record.reg_keys = []
397                     else:
398                         user_record.reg_keys = [RegKey(pubkey)]
399                     self.logger.info("SlabImporter: updated person: %s" \
400                     % (user_record))
401                     
402                 if person['email']:
403                     user_record.email = person['email']
404                    
405             try:       
406                 dbsession.commit()
407                 user_record.stale = False
408             except SQLAlchemyError:
409                 self.logger.log_exc("SlabImporter: \
410                 failed to update person  %s"%(person)) 
411             
412             self.import_slice(slice_hrn, slice_record, user_record)
413            
414                        
415     def import_slice(self, slice_hrn, slice_record, user_record):
416         """
417          Create RegSlice record according to the slice hrn if the slice
418          does not exist yet.Creates a relationship with the user record 
419          associated with the slice.
420          Commit the record to the database.
421          TODO: Update the record if a slice record already exists.
422          
423         :param slice_hrn: Human readable name of the slice.
424         :type slice_hrn: string
425         :param slice_record: record of the slice found in the DB, if any.
426         :type slice_record: RegSlice or None
427         :param user_record: user record found in the DB if any.
428         :type user_record: RegUser
429         
430         """
431         if not slice_record :           
432             pkey = Keypair(create=True)
433             urn = hrn_to_urn(slice_hrn, 'slice')
434             slice_gid = \
435                 self.auth_hierarchy.create_gid(urn, \
436                 create_uuid(), pkey)
437             slice_record = RegSlice (hrn=slice_hrn, gid=slice_gid, 
438                                         pointer='-1',
439                                         authority=get_authority(slice_hrn))
440             try:
441                 slice_record.just_created()
442                 dbsession.add(slice_record)
443                 dbsession.commit()
444                 
445                
446                 self.update_just_added_records_dict ( slice_record )
447
448             except SQLAlchemyError:
449                 self.logger.log_exc("SlabImporter: failed to import slice")
450                 
451         #No slice update upon import in senslab 
452         else:
453             # xxx update the record ...
454             self.logger.warning ("Slice update not yet implemented")
455             pass
456         # record current users affiliated with the slice
457
458
459         slice_record.reg_researchers =  [user_record]
460         try:
461             dbsession.commit()
462             slice_record.stale = False 
463         except SQLAlchemyError:
464             self.logger.log_exc("SlabImporter: failed to update slice")
465             
466              
467     def run (self, options):
468         """
469         Create the special senslab table, slab_xp, in the senslab database.
470         Import everything (users, slices, nodes and sites from OAR
471         and LDAP) into the SFA database.
472         Delete stale records that are no longer in OAR or LDAP.
473         :param options:
474         :type options: 
475         """
476         config = Config()
477         
478         slabdriver = SlabDriver(config)
479         
480         #Create special slice table for senslab 
481         
482         if not slabdriver.db.exists('slab_xp'):
483             slabdriver.db.createtable()
484             self.logger.info ("SlabImporter.run:  slab_xp table created ")
485
486             
487         # import site and node records in site into the SFA db.
488         self.import_sites_and_nodes(slabdriver)
489         #import users and slice into the SFA DB.   
490         self.import_persons_and_slices(slabdriver)
491       
492          ### remove stale records
493         # special records must be preserved
494         system_hrns = [slabdriver.hrn, slabdriver.slab_api.root_auth,  \
495                                         slabdriver.hrn+ '.slicemanager']
496         for record in self.all_records: 
497             if record.hrn in system_hrns: 
498                 record.stale = False
499             if record.peer_authority:
500                 record.stale = False
501           
502
503         for record in self.all_records: 
504             if record.type == 'user':
505                 self.logger.info("SlabImporter: stale records: hrn %s %s" \
506                                             %(record.hrn,record.stale) )
507             try:        
508                 stale = record.stale
509             except :     
510                 stale = True
511                 self.logger.warning("stale not found with %s"%record)
512             if stale:
513                 self.logger.info("SlabImporter: deleting stale record: %s" \
514                 %(record))
515                 
516                 try:
517                     dbsession.delete(record)
518                     dbsession.commit()         
519                 except SQLAlchemyError:
520                     self.logger.log_exc("SlabImporter: failed to delete stale \
521                     record %s" %(record) )
522
523