cleaning slabimporter.py
[sfa.git] / sfa / importer / slabimporter.py
1 import sys
2
3 from sfa.util.config import Config
4 from sfa.util.xrn import Xrn, get_authority, hrn_to_urn
5
6 from sfa.senslab.slabdriver import SlabDriver
7
8 from sfa.trust.certificate import Keypair, convert_public_key
9 from sfa.trust.gid import create_uuid
10
11 from sfa.storage.alchemy import dbsession
12 from sfa.storage.model import RegRecord, RegAuthority, RegSlice, RegNode, \
13                                                     RegUser, RegKey
14
15
16 from sqlalchemy.exc import SQLAlchemyError
17
18
19 def _get_site_hrn(site):
20     hrn = site['name'] 
21     return hrn
22
23 class SlabImporter:
24     
25     def __init__ (self, auth_hierarchy, loc_logger):
26         self.auth_hierarchy = auth_hierarchy
27         self.logger = loc_logger
28         self.logger.setLevelDebug()
29         
30     @staticmethod
31     def hostname_to_hrn_escaped(root_auth, hostname):
32         return '.'.join( [root_auth, Xrn.escape(hostname)] )
33
34
35     @staticmethod
36     def slicename_to_hrn(person_hrn):
37         return  (person_hrn +'_slice')
38     
39     def add_options (self, parser):
40         # we don't have any options for now
41         pass
42     
43     def find_record_by_type_hrn(self, record_type, hrn):
44         return self.records_by_type_hrn.get ( (record_type, hrn), None)
45     
46     def locate_by_type_pointer (self, record_type, pointer):
47         ret = self.records_by_type_pointer.get ( (record_type, pointer), None)
48         return ret
49     
50     def update_just_added_records_dict (self, record):
51         rec_tuple = (record.type, record.hrn)
52         if rec_tuple in self.records_by_type_hrn:
53             self.logger.warning ("SlabImporter.update_just_added_records_dict:\
54                         duplicate (%s,%s)"%rec_tuple)
55             return
56         self.records_by_type_hrn [ rec_tuple ] = record
57         
58     def import_sites(self) :
59             
60     def run (self, options):
61         config = Config()
62
63         slabdriver = SlabDriver(config)
64         
65         #Create special slice table for senslab 
66         
67         if not slabdriver.db.exists('slab_xp'):
68             slabdriver.db.createtable()
69             self.logger.info ("SlabImporter.run:  slab_xp table created ")
70
71         #retrieve all existing SFA objects
72         all_records = dbsession.query(RegRecord).all()
73         
74         # initialize record.stale to True by default, 
75         # then mark stale=False on the ones that are in use
76         for record in all_records: 
77             record.stale = True
78         #create hash by (type,hrn) 
79         #used  to know if a given record is already known to SFA 
80        
81         self.records_by_type_hrn = \
82             dict([( (record.type,record.hrn), record) \
83                                         for record in all_records])
84
85         self.users_rec_by_email = \
86             dict([ (record.email, record) \
87                 for record in all_records if record.type == 'user'])
88             
89         # create hash by (type,pointer) 
90         self.records_by_type_pointer = \
91             dict([ ( (str(record.type), record.pointer) , record) \
92                 for record in all_records  if record.pointer != -1])
93
94         
95         nodes_listdict  = slabdriver.slab_api.GetNodes()
96         nodes_by_id = dict([(node['node_id'],node) for node in nodes_listdict])
97         sites_listdict  = slabdriver.slab_api.GetSites()
98         
99         ldap_person_listdict = slabdriver.slab_api.GetPersons()
100         print>>sys.stderr,"\r\n SLABIMPORT \t ldap_person_listdict %s \r\n" \
101                 %(ldap_person_listdict)
102         slices_listdict = slabdriver.slab_api.GetSlices()
103         #try:
104             #slices_by_userid = \
105                 #dict([(one_slice['reg_researchers']['record_id'], one_slice ) \
106                 #for one_slice in slices_listdict ])
107         #except TypeError:
108             #self.logger.log_exc("SlabImporter: failed to create list \
109                         #of slices by user id.") 
110             #pass
111  
112         for site in sites_listdict:
113             site_hrn = _get_site_hrn(site) 
114             site_record = self.find_record_by_type_hrn ('authority', site_hrn)
115             if not site_record:
116                 try:
117                     urn = hrn_to_urn(site_hrn, 'authority') 
118                     if not self.auth_hierarchy.auth_exists(urn):
119                         self.auth_hierarchy.create_auth(urn)
120                         
121                     auth_info = self.auth_hierarchy.get_auth_info(urn)
122                     site_record = RegAuthority(hrn=site_hrn, \
123                                             gid=auth_info.get_gid_object(),
124                                             pointer='-1',
125                                             authority=get_authority(site_hrn))
126                     site_record.just_created()
127                     dbsession.add(site_record)
128                     dbsession.commit()
129                     self.logger.info("SlabImporter: imported authority (site) \
130                          %s" % site_record) 
131                     self.update_just_added_records_dict(site_record)
132                 except SQLAlchemyError:
133                     # if the site import fails then there is no point in 
134                     # trying to import the
135                     # site's child records (node, slices, persons), so skip them.
136                     self.logger.log_exc("SlabImporter: failed to import site. \
137                         Skipping child records") 
138                     continue
139             else:
140                 # xxx update the record ...
141                 pass
142             site_record.stale = False 
143             
144          # import node records in site
145             for node_id in site['node_ids']:
146                 try:
147                     node = nodes_by_id[node_id]
148                 except KeyError:
149                     self.logger.warning ("SlabImporter: cannot find node_id %s \
150                             - ignored" %(node_id))
151                     continue             
152                 escaped_hrn =  \
153                 self.hostname_to_hrn_escaped(slabdriver.slab_api.root_auth, \
154                 node['hostname'])
155                 print>>sys.stderr, "\r\n \r\n SLABIMPORTER node %s " %(node)               
156                 hrn =  node['hrn']
157
158
159                 # xxx this sounds suspicious
160                 if len(hrn) > 64: hrn = hrn[:64]
161                 node_record = self.find_record_by_type_hrn( 'node', hrn )
162                 if not node_record:
163                     pkey = Keypair(create=True)
164                     urn = hrn_to_urn(escaped_hrn, 'node') 
165                     node_gid = \
166                         self.auth_hierarchy.create_gid(urn, \
167                         create_uuid(), pkey)
168                         
169                     def slab_get_authority(hrn):
170                         return hrn.split(".")[0]
171                         
172                     node_record = RegNode(hrn=hrn, gid=node_gid, 
173                                         pointer = '-1',
174                                         authority=slab_get_authority(hrn))
175                     try:
176                          
177                         node_record.just_created()
178                         dbsession.add(node_record)
179                         dbsession.commit()
180                         self.logger.info("SlabImporter: imported node: %s" \
181                                     % node_record)  
182                         self.update_just_added_records_dict(node_record)
183                     except SQLAlchemyError:
184                         self.logger.log_exc("SlabImporter: \
185                                         failed to import node") 
186                 else:
187                     # xxx update the record ...
188                     pass
189                 node_record.stale=False
190                     
191                     
192         # import persons
193         for person in ldap_person_listdict : 
194             
195             self.logger.info("SlabImporter: person :" %(person))
196             if 'ssh-rsa' not in person['pkey']:
197                 #people with invalid ssh key (ssh-dss, empty, bullshit keys...)
198                 #won't be imported
199                 continue
200             person_hrn = person['hrn']
201             slice_hrn = self.slicename_to_hrn(person['hrn'])
202             
203             # xxx suspicious again
204             if len(person_hrn) > 64: person_hrn = person_hrn[:64]
205             person_urn = hrn_to_urn(person_hrn, 'user')
206             
207             
208             self.logger.info("SlabImporter: users_rec_by_email %s " \
209                                             %(self.users_rec_by_email))
210             
211             #Check if user using person['email'] form LDAP is already registered
212             #in SFA. One email = one person. In this case, do not create another
213             #record for this person
214             #person_hrn returned by GetPerson based on senslab root auth + uid ldap
215             user_record = self.find_record_by_type_hrn('user', person_hrn)
216             
217             if not user_record and  person['email'] in self.users_rec_by_email:
218                 user_record = self.users_rec_by_email[person['email']]
219                 person_hrn = user_record.hrn
220                 person_urn = hrn_to_urn(person_hrn, 'user')
221                 
222             
223             slice_record = self.find_record_by_type_hrn ('slice', slice_hrn)
224             
225             # return a tuple pubkey (a plc key object) and pkey (a Keypair object)
226             def init_person_key (person, slab_key):
227                 pubkey = None
228                 if  person['pkey']:
229                     # randomly pick first key in set
230                     pubkey = slab_key
231                     
232                     try:
233                         pkey = convert_public_key(pubkey)
234                     except TypeError:
235                         #key not good. create another pkey
236                         self.logger.warn('SlabImporter: \
237                                             unable to convert public \
238                                             key for %s' % person_hrn)
239                         pkey = Keypair(create=True)
240                     
241                 else:
242                     # the user has no keys. 
243                     #Creating a random keypair for the user's gid
244                     self.logger.warn("SlabImporter: person %s does not have a  \
245                                 public key" %(person_hrn))
246                     pkey = Keypair(create=True) 
247                 return (pubkey, pkey)
248                             
249                 
250            
251             slab_key = person['pkey']
252             # new person
253             if not user_record:
254                 (pubkey,pkey) = init_person_key (person, slab_key )
255                 if pubkey is not None and pkey is not None :
256                     person_gid = \
257                     self.auth_hierarchy.create_gid(person_urn, \
258                     create_uuid(), pkey)
259                     if person['email']:
260                         print>>sys.stderr, "\r\n \r\n SLAB IMPORTER \
261                             PERSON EMAIL OK email %s " %(person['email'])
262                         person_gid.set_email(person['email'])
263                         user_record = RegUser(hrn=person_hrn, \
264                                                 gid=person_gid, 
265                                                 pointer='-1', 
266                                                 authority=get_authority(person_hrn),
267                                                 email=person['email'])
268                     else:
269                         user_record = RegUser(hrn=person_hrn, \
270                                                 gid=person_gid, 
271                                                 pointer='-1', 
272                                                 authority=get_authority(person_hrn))
273                         
274                     if pubkey: 
275                         user_record.reg_keys = [RegKey(pubkey)]
276                     else:
277                         self.logger.warning("No key found for user %s" \
278                         %(user_record))
279                             
280                         try:    
281                             user_record.just_created()
282                             dbsession.add (user_record)
283                             dbsession.commit()
284                             self.logger.info("SlabImporter: imported person: %s"\
285                             %(user_record))
286                             self.update_just_added_records_dict( user_record )
287                             
288                         except SQLAlchemyError:
289                             self.logger.log_exc("SlabImporter: \
290                                 failed to import person  %s"%(person))       
291             else:
292                 # update the record ?
293                 # if user's primary key has changed then we need to update 
294                 # the users gid by forcing an update here
295                 sfa_keys = user_record.reg_keys
296                 
297                 new_key=False
298                 if slab_key is not sfa_keys : 
299                     new_key = True
300                 if new_key:
301                     print>>sys.stderr,"SlabImporter: \t \t USER UPDATE \
302                         person: %s" %(person['hrn'])
303                     (pubkey,pkey) = init_person_key (person, slab_key)
304                     person_gid = \
305                         self.auth_hierarchy.create_gid(person_urn, \
306                         create_uuid(), pkey)
307                     if not pubkey:
308                         user_record.reg_keys = []
309                     else:
310                         user_record.reg_keys = [RegKey(pubkey)]
311                     self.logger.info("SlabImporter: updated person: %s" \
312                     % (user_record))
313                     
314                 if person['email']:
315                     user_record.email = person['email']
316                    
317             try:       
318                 dbsession.commit()
319                 user_record.stale = False
320             except SQLAlchemyError:
321                 self.logger.log_exc("SlabImporter: \
322                 failed to update person  %s"%(person)) 
323             
324             
325             #try:
326                 #single_slice = slices_by_userid[user_record.record_id]
327             #except KeyError:
328                 #self.logger.warning ("SlabImporter: \
329                 #cannot locate slices_by_userid[user_record.record_id] %s - \
330                 #ignored" %(user_record))  
331                     
332             if not slice_record :
333                 
334                 pkey = Keypair(create=True)
335                 urn = hrn_to_urn(slice_hrn, 'slice')
336                 slice_gid = \
337                     self.auth_hierarchy.create_gid(urn, \
338                     create_uuid(), pkey)
339                 slice_record = RegSlice (hrn=slice_hrn, gid=slice_gid, 
340                                             pointer='-1',
341                                             authority=get_authority(slice_hrn))
342                 try:
343                     slice_record.just_created()
344                     dbsession.add(slice_record)
345                     dbsession.commit()
346                     
347                     #Serial id created after commit
348                     #Get it
349                     sl_rec = dbsession.query(RegSlice).filter(RegSlice.hrn.match(slice_hrn)).all()
350                     
351                    
352                     self.update_just_added_records_dict ( slice_record )
353
354                 except SQLAlchemyError:
355                     self.logger.log_exc("SlabImporter: failed to import slice")
356                     
357             #No slice update upon import in senslab 
358             else:
359                 # xxx update the record ...
360                 self.logger.warning ("Slice update not yet implemented")
361                 pass
362             # record current users affiliated with the slice
363
364
365             slice_record.reg_researchers =  [user_record]
366             try:
367                 dbsession.commit()
368                 slice_record.stale = False 
369             except SQLAlchemyError:
370                 self.logger.log_exc("SlabImporter: failed to update slice")
371                        
372   
373                  
374          ### remove stale records
375         # special records must be preserved
376         system_hrns = [slabdriver.hrn, slabdriver.slab_api.root_auth,  \
377                                         slabdriver.hrn+ '.slicemanager']
378         for record in all_records: 
379             if record.hrn in system_hrns: 
380                 record.stale = False
381             if record.peer_authority:
382                 record.stale = False
383           
384
385         for record in all_records: 
386             if record.type == 'user':
387                 print>>sys.stderr,"SlabImporter: stale records: hrn %s %s" \
388                                             %(record.hrn,record.stale)
389             try:        
390                 stale = record.stale
391             except :     
392                 stale = True
393                 self.logger.warning("stale not found with %s"%record)
394             if stale:
395                 self.logger.info("SlabImporter: deleting stale record: %s" \
396                 %(record))
397                 
398                 try:
399                     dbsession.delete(record)
400                     dbsession.commit()         
401                 except SQLAlchemyError:
402                     self.logger.log_exc("SlabImporter: failed to delete stale \
403                     record %s" %(record) )
404
405