810af1616b18db4c946249a7c61315d3d690f45a
[sfa.git] / sfa / importer / slabimporter.py
1 import sys
2
3 from sfa.util.config import Config
4 from sfa.util.xrn import Xrn, get_authority, hrn_to_urn
5
6 from sfa.senslab.slabdriver import SlabDriver
7
8 from sfa.trust.certificate import Keypair, convert_public_key
9 from sfa.trust.gid import create_uuid
10
11 from sfa.storage.alchemy import dbsession
12 from sfa.storage.model import RegRecord, RegAuthority, RegSlice, RegNode, \
13                                                     RegUser, RegKey
14
15
16 from sqlalchemy.exc import SQLAlchemyError
17
18
19 def _get_site_hrn(site):
20     hrn = site['name'] 
21     return hrn
22
23 class SlabImporter:
24     
25     def __init__ (self, auth_hierarchy, loc_logger):
26         self.auth_hierarchy = auth_hierarchy
27         self.logger = loc_logger
28         self.logger.setLevelDebug()
29         #retrieve all existing SFA objects
30         self.all_records = dbsession.query(RegRecord).all()
31         
32         # initialize record.stale to True by default, 
33         # then mark stale=False on the ones that are in use
34         for record in self.all_records: 
35             record.stale = True
36         #create hash by (type,hrn) 
37         #used  to know if a given record is already known to SFA 
38         self.records_by_type_hrn = \
39             dict([( (record.type,record.hrn), record) \
40                                         for record in self.all_records])
41
42         self.users_rec_by_email = \
43             dict([ (record.email, record) \
44                 for record in self.all_records if record.type == 'user'])
45             
46         # create hash by (type,pointer) 
47         self.records_by_type_pointer = \
48             dict([ ( (str(record.type), record.pointer) , record) \
49                 for record in self.all_records  if record.pointer != -1])
50             
51             
52         
53     @staticmethod
54     def hostname_to_hrn_escaped(root_auth, hostname):
55         return '.'.join( [root_auth, Xrn.escape(hostname)] )
56
57
58     @staticmethod
59     def slicename_to_hrn(person_hrn):
60         return  (person_hrn +'_slice')
61     
62     def add_options (self, parser):
63         # we don't have any options for now
64         pass
65     
66     def find_record_by_type_hrn(self, record_type, hrn):
67         return self.records_by_type_hrn.get ( (record_type, hrn), None)
68     
69     def locate_by_type_pointer (self, record_type, pointer):
70         ret = self.records_by_type_pointer.get ( (record_type, pointer), None)
71         return ret
72     
73     def update_just_added_records_dict (self, record):
74         rec_tuple = (record.type, record.hrn)
75         if rec_tuple in self.records_by_type_hrn:
76             self.logger.warning ("SlabImporter.update_just_added_records_dict:\
77                         duplicate (%s,%s)"%rec_tuple)
78             return
79         self.records_by_type_hrn [ rec_tuple ] = record
80         
81     def import_sites_and_nodes(self, slabdriver):
82         sites_listdict  = slabdriver.slab_api.GetSites()  
83         nodes_listdict  = slabdriver.slab_api.GetNodes()
84         nodes_by_id = dict([(node['node_id'],node) for node in nodes_listdict])
85         for site in sites_listdict:
86             site_hrn = _get_site_hrn(site) 
87             site_record = self.find_record_by_type_hrn ('authority', site_hrn)
88             if not site_record:
89                 try:
90                     urn = hrn_to_urn(site_hrn, 'authority') 
91                     if not self.auth_hierarchy.auth_exists(urn):
92                         self.auth_hierarchy.create_auth(urn)
93                         
94                     auth_info = self.auth_hierarchy.get_auth_info(urn)
95                     site_record = RegAuthority(hrn=site_hrn, \
96                                             gid=auth_info.get_gid_object(),
97                                             pointer='-1',
98                                             authority=get_authority(site_hrn))
99                     site_record.just_created()
100                     dbsession.add(site_record)
101                     dbsession.commit()
102                     self.logger.info("SlabImporter: imported authority (site) \
103                          %s" % site_record) 
104                     self.update_just_added_records_dict(site_record)
105                 except SQLAlchemyError:
106                     # if the site import fails then there is no point in 
107                     # trying to import the
108                     # site's child records(node, slices, persons), so skip them.
109                     self.logger.log_exc("SlabImporter: failed to import site. \
110                         Skipping child records") 
111                     continue
112             else:
113                 # xxx update the record ...
114                 pass
115             
116             
117             site_record.stale = False
118             self.import_nodes(site['node_ids'], nodes_by_id, slabdriver)
119             
120             return 
121         
122     def import_nodes(self, node_ids, nodes_by_id, slabdriver) :
123         
124         for node_id in node_ids:
125             try:
126                 node = nodes_by_id[node_id]
127             except KeyError:
128                 self.logger.warning ("SlabImporter: cannot find node_id %s \
129                         - ignored" %(node_id))
130                 continue             
131             escaped_hrn =  \
132             self.hostname_to_hrn_escaped(slabdriver.slab_api.root_auth, \
133             node['hostname'])
134             print>>sys.stderr, "\r\n \r\n SLABIMPORTER node %s " %(node)               
135             hrn =  node['hrn']
136
137
138             # xxx this sounds suspicious
139             if len(hrn) > 64: hrn = hrn[:64]
140             node_record = self.find_record_by_type_hrn( 'node', hrn )
141             if not node_record:
142                 pkey = Keypair(create=True)
143                 urn = hrn_to_urn(escaped_hrn, 'node') 
144                 node_gid = \
145                     self.auth_hierarchy.create_gid(urn, \
146                     create_uuid(), pkey)
147                     
148                 def slab_get_authority(hrn):
149                     return hrn.split(".")[0]
150                     
151                 node_record = RegNode(hrn=hrn, gid=node_gid, 
152                                     pointer = '-1',
153                                     authority=slab_get_authority(hrn))
154                 try:
155                         
156                     node_record.just_created()
157                     dbsession.add(node_record)
158                     dbsession.commit()
159                     self.logger.info("SlabImporter: imported node: %s" \
160                                 % node_record)  
161                     self.update_just_added_records_dict(node_record)
162                 except SQLAlchemyError:
163                     self.logger.log_exc("SlabImporter: \
164                                     failed to import node") 
165             else:
166                 # xxx update the record ...
167                 pass
168             node_record.stale=False
169                     
170      # return a tuple pubkey (a plc key object) and pkey (a Keypair object)
171
172     def init_person_key (self, person, slab_key):
173         pubkey = None
174         if  person['pkey']:
175             # randomly pick first key in set
176             pubkey = slab_key
177             
178             try:
179                 pkey = convert_public_key(pubkey)
180             except TypeError:
181                 #key not good. create another pkey
182                 self.logger.warn('SlabImporter: \
183                                     unable to convert public \
184                                     key for %s' %person['hrn'])
185                 pkey = Keypair(create=True)
186             
187         else:
188             # the user has no keys. 
189             #Creating a random keypair for the user's gid
190             self.logger.warn("SlabImporter: person %s does not have a  \
191                         public key" %(person['hrn']))
192             pkey = Keypair(create=True) 
193         return (pubkey, pkey)
194                                 
195         
196     def import_persons_and_slices(self, slabdriver):
197         ldap_person_listdict = slabdriver.slab_api.GetPersons()
198         print>>sys.stderr,"\r\n SLABIMPORT \t ldap_person_listdict %s \r\n" \
199                 %(ldap_person_listdict)   
200         
201          # import persons
202         for person in ldap_person_listdict : 
203             
204             self.logger.info("SlabImporter: person :" %(person))
205             if 'ssh-rsa' not in person['pkey']:
206                 #people with invalid ssh key (ssh-dss, empty, bullshit keys...)
207                 #won't be imported
208                 continue
209             person_hrn = person['hrn']
210             slice_hrn = self.slicename_to_hrn(person['hrn'])
211             
212             # xxx suspicious again
213             if len(person_hrn) > 64: person_hrn = person_hrn[:64]
214             person_urn = hrn_to_urn(person_hrn, 'user')
215             
216             
217             self.logger.info("SlabImporter: users_rec_by_email %s " \
218                                             %(self.users_rec_by_email))
219             
220             #Check if user using person['email'] form LDAP is already registered
221             #in SFA. One email = one person. In this case, do not create another
222             #record for this person
223             #person_hrn returned by GetPerson based on senslab root auth + 
224             #uid ldap
225             user_record = self.find_record_by_type_hrn('user', person_hrn)
226             
227             if not user_record and  person['email'] in self.users_rec_by_email:
228                 user_record = self.users_rec_by_email[person['email']]
229                 person_hrn = user_record.hrn
230                 person_urn = hrn_to_urn(person_hrn, 'user')
231                 
232             
233             slice_record = self.find_record_by_type_hrn ('slice', slice_hrn)
234                     
235             slab_key = person['pkey']
236             # new person
237             if not user_record:
238                 (pubkey,pkey) = self.init_person_key(person, slab_key)
239                 if pubkey is not None and pkey is not None :
240                     person_gid = \
241                     self.auth_hierarchy.create_gid(person_urn, \
242                     create_uuid(), pkey)
243                     if person['email']:
244                         print>>sys.stderr, "\r\n \r\n SLAB IMPORTER \
245                             PERSON EMAIL OK email %s " %(person['email'])
246                         person_gid.set_email(person['email'])
247                         user_record = RegUser(hrn=person_hrn, \
248                                                 gid=person_gid, 
249                                                 pointer='-1', 
250                                                 authority=get_authority(person_hrn),
251                                                 email=person['email'])
252                     else:
253                         user_record = RegUser(hrn=person_hrn, \
254                                                 gid=person_gid, 
255                                                 pointer='-1', 
256                                                 authority=get_authority(person_hrn))
257                         
258                     if pubkey: 
259                         user_record.reg_keys = [RegKey(pubkey)]
260                     else:
261                         self.logger.warning("No key found for user %s" \
262                         %(user_record))
263                             
264                         try:    
265                             user_record.just_created()
266                             dbsession.add (user_record)
267                             dbsession.commit()
268                             self.logger.info("SlabImporter: imported person: %s"\
269                             %(user_record))
270                             self.update_just_added_records_dict( user_record )
271                             
272                         except SQLAlchemyError:
273                             self.logger.log_exc("SlabImporter: \
274                                 failed to import person  %s"%(person))       
275             else:
276                 # update the record ?
277                 # if user's primary key has changed then we need to update 
278                 # the users gid by forcing an update here
279                 sfa_keys = user_record.reg_keys
280                 
281                 new_key=False
282                 if slab_key is not sfa_keys : 
283                     new_key = True
284                 if new_key:
285                     print>>sys.stderr,"SlabImporter: \t \t USER UPDATE \
286                         person: %s" %(person['hrn'])
287                     (pubkey,pkey) = self.init_person_key (person, slab_key)
288                     person_gid = \
289                         self.auth_hierarchy.create_gid(person_urn, \
290                         create_uuid(), pkey)
291                     if not pubkey:
292                         user_record.reg_keys = []
293                     else:
294                         user_record.reg_keys = [RegKey(pubkey)]
295                     self.logger.info("SlabImporter: updated person: %s" \
296                     % (user_record))
297                     
298                 if person['email']:
299                     user_record.email = person['email']
300                    
301             try:       
302                 dbsession.commit()
303                 user_record.stale = False
304             except SQLAlchemyError:
305                 self.logger.log_exc("SlabImporter: \
306                 failed to update person  %s"%(person)) 
307             
308             self.import_slice(slice_hrn, slice_record,user_record)
309            
310                        
311     def import_slice(self, slice_hrn, slice_record, user_record):
312         
313         if not slice_record :
314             
315             pkey = Keypair(create=True)
316             urn = hrn_to_urn(slice_hrn, 'slice')
317             slice_gid = \
318                 self.auth_hierarchy.create_gid(urn, \
319                 create_uuid(), pkey)
320             slice_record = RegSlice (hrn=slice_hrn, gid=slice_gid, 
321                                         pointer='-1',
322                                         authority=get_authority(slice_hrn))
323             try:
324                 slice_record.just_created()
325                 dbsession.add(slice_record)
326                 dbsession.commit()
327                 
328                 #Serial id created after commit
329                 #Get it
330                 #sl_rec = dbsession.query(RegSlice).filter(RegSlice.hrn.match(slice_hrn)).all()
331                 
332                 
333                 self.update_just_added_records_dict ( slice_record )
334
335             except SQLAlchemyError:
336                 self.logger.log_exc("SlabImporter: failed to import slice")
337                 
338         #No slice update upon import in senslab 
339         else:
340             # xxx update the record ...
341             self.logger.warning ("Slice update not yet implemented")
342             pass
343         # record current users affiliated with the slice
344
345
346         slice_record.reg_researchers =  [user_record]
347         try:
348             dbsession.commit()
349             slice_record.stale = False 
350         except SQLAlchemyError:
351             self.logger.log_exc("SlabImporter: failed to update slice")
352             
353              
354     def run (self, options):
355         config = Config()
356
357         slabdriver = SlabDriver(config)
358         
359         #Create special slice table for senslab 
360         
361         if not slabdriver.db.exists('slab_xp'):
362             slabdriver.db.createtable()
363             self.logger.info ("SlabImporter.run:  slab_xp table created ")
364
365
366         self.import_sites_and_nodes(slabdriver)
367             
368         self.import_persons_and_slices(slabdriver)
369         #slices_listdict = slabdriver.slab_api.GetSlices()
370         #try:
371             #slices_by_userid = \
372                 #dict([(one_slice['reg_researchers']['record_id'], one_slice ) \
373                 #for one_slice in slices_listdict ])
374         #except TypeError:
375             #self.logger.log_exc("SlabImporter: failed to create list \
376                         #of slices by user id.") 
377             #pass
378  
379          
380             
381          # import node records in site
382             
383                     
384        
385            
386             
387                  
388          ### remove stale records
389         # special records must be preserved
390         system_hrns = [slabdriver.hrn, slabdriver.slab_api.root_auth,  \
391                                         slabdriver.hrn+ '.slicemanager']
392         for record in self.all_records: 
393             if record.hrn in system_hrns: 
394                 record.stale = False
395             if record.peer_authority:
396                 record.stale = False
397           
398
399         for record in self.all_records: 
400             if record.type == 'user':
401                 print>>sys.stderr,"SlabImporter: stale records: hrn %s %s" \
402                                             %(record.hrn,record.stale)
403             try:        
404                 stale = record.stale
405             except :     
406                 stale = True
407                 self.logger.warning("stale not found with %s"%record)
408             if stale:
409                 self.logger.info("SlabImporter: deleting stale record: %s" \
410                 %(record))
411                 
412                 try:
413                     dbsession.delete(record)
414                     dbsession.commit()         
415                 except SQLAlchemyError:
416                     self.logger.log_exc("SlabImporter: failed to delete stale \
417                     record %s" %(record) )
418
419