1 from sfa.util.config import Config
2 from sfa.util.xrn import Xrn, get_authority, hrn_to_urn
4 from sfa.senslab.slabdriver import SlabDriver
6 from sfa.trust.certificate import Keypair, convert_public_key
7 from sfa.trust.gid import create_uuid
9 from sfa.storage.alchemy import dbsession
10 from sfa.storage.model import RegRecord, RegAuthority, RegSlice, RegNode, \
14 from sqlalchemy.exc import SQLAlchemyError
20 SlabImporter class, generic importer_class. Used to populate the SFA DB
21 with senslab resources' records.
22 Used to update records when new resources, users or nodes, are added
26 def __init__ (self, auth_hierarchy, loc_logger):
28 Sets and defines import logger and the authority name. Gathers all the
29 records already registerd in the SFA DB, broke them into 3 dicts,
30 by type and hrn, by email and by type and pointer.
32 :param auth_hierarchy: authority name
33 :type auth_hierarchy: string
34 :param loc_logger: local logger
35 :type loc_logger: _SfaLogger
38 self.auth_hierarchy = auth_hierarchy
39 self.logger = loc_logger
40 self.logger.setLevelDebug()
41 #retrieve all existing SFA objects
42 self.all_records = dbsession.query(RegRecord).all()
44 # initialize record.stale to True by default,
45 # then mark stale=False on the ones that are in use
46 for record in self.all_records:
48 #create hash by (type,hrn)
49 #used to know if a given record is already known to SFA
50 self.records_by_type_hrn = \
51 dict([( (record.type,record.hrn), record) \
52 for record in self.all_records])
54 self.users_rec_by_email = \
55 dict([ (record.email, record) \
56 for record in self.all_records if record.type == 'user'])
58 # create hash by (type,pointer)
59 self.records_by_type_pointer = \
60 dict([ ( (str(record.type), record.pointer) , record) \
61 for record in self.all_records if record.pointer != -1])
66 def hostname_to_hrn_escaped(root_auth, hostname):
67 """ Returns a node's hrn based on its hostname and the root
68 authority and by removing special caracters from the hostname.
70 :param root_auth: root authority name
71 :param hostname: nodes's hostname
72 :type root_auth: string
73 :type hostname: string
76 return '.'.join( [root_auth, Xrn.escape(hostname)] )
80 def slicename_to_hrn(person_hrn):
81 """Returns the slicename associated to a given person's hrn.
83 :param person_hrn: user's hrn
84 :type person_hrn: string
87 return (person_hrn +'_slice')
89 def add_options (self, parser):
90 # we don't have any options for now
93 def find_record_by_type_hrn(self, record_type, hrn):
94 """Returns the record associated with a given hrn and hrn type.
95 Returns None if the key tuple is not in dictionary.
97 :param record_type: the record's type (slice, node, authority...)
98 :type record_type: string
99 :param hrn: Human readable name of the object's record
101 :rtype: RegUser if user, RegSlice if slice, RegNode if node...
102 or None if record does not exist.
105 return self.records_by_type_hrn.get ( (record_type, hrn), None)
107 def locate_by_type_pointer (self, record_type, pointer):
108 """Returns the record corresponding to the key pointer and record
109 type. Returns None if the record does not exist and is not in the
110 records_by_type_pointer dictionnary.
112 :param record_type: the record's type (slice, node, authority...)
113 :type record_type: string
114 :param pointer:Pointer to where the record is in the origin db,
115 used in case the record comes from a trusted authority.
116 :type pointer: integer
117 :rtype: RegUser if user, RegSlice if slice, RegNode if node...
118 or None if record does not exist.
120 return self.records_by_type_pointer.get ( (record_type, pointer), None)
123 def update_just_added_records_dict (self, record):
124 """Updates the records_by_type_hrn dictionnary if the record has
127 :param record: Record to add in the records_by_type_hrn dict.
128 :type record: dictionary
130 rec_tuple = (record.type, record.hrn)
131 if rec_tuple in self.records_by_type_hrn:
132 self.logger.warning ("SlabImporter.update_just_added_records_dict:\
133 duplicate (%s,%s)"%rec_tuple)
135 self.records_by_type_hrn [ rec_tuple ] = record
137 def import_sites_and_nodes(self, slabdriver):
138 """ Gets all the sites and nodes from OAR, process the information,
139 creates hrns and RegAuthority for sites, and feed them to the database.
140 For each site, import the site's nodes to the DB by calling
143 :param slabdriver: SlabDriver object, used to have access to slabdriver
144 methods and fetching info on sites and nodes.
145 :type slabdriver: SlabDriver
148 sites_listdict = slabdriver.slab_api.GetSites()
149 nodes_listdict = slabdriver.slab_api.GetNodes()
150 nodes_by_id = dict([(node['node_id'], node) for node in nodes_listdict])
151 for site in sites_listdict:
152 site_hrn = site['name']
153 site_record = self.find_record_by_type_hrn ('authority', site_hrn)
156 urn = hrn_to_urn(site_hrn, 'authority')
157 if not self.auth_hierarchy.auth_exists(urn):
158 self.auth_hierarchy.create_auth(urn)
160 auth_info = self.auth_hierarchy.get_auth_info(urn)
161 site_record = RegAuthority(hrn=site_hrn, \
162 gid=auth_info.get_gid_object(),
164 authority=get_authority(site_hrn))
165 site_record.just_created()
166 dbsession.add(site_record)
168 self.logger.info("SlabImporter: imported authority (site) \
170 self.update_just_added_records_dict(site_record)
171 except SQLAlchemyError:
172 # if the site import fails then there is no point in
173 # trying to import the
174 # site's child records(node, slices, persons), so skip them.
175 self.logger.log_exc("SlabImporter: failed to import site. \
176 Skipping child records")
179 # xxx update the record ...
183 site_record.stale = False
184 self.import_nodes(site['node_ids'], nodes_by_id, slabdriver)
188 def import_nodes(self, site_node_ids, nodes_by_id, slabdriver) :
189 """ Creates appropriate hostnames and RegNode records for
190 each node in site_node_ids, based on the information given by the
191 dict nodes_by_id that was made from data from OAR.
192 Saves the records to the DB.
194 :param site_node_ids: site's node ids
195 :type site_node_ids: list of integers
196 :param nodes_by_id: dictionary , key is the node id, value is the a dict
197 with node information.
198 :type nodes_by_id: dictionary
199 :param slabdriver:SlabDriver object, used to have access to slabdriver
201 :type slabdriver:SlabDriver
205 for node_id in site_node_ids:
207 node = nodes_by_id[node_id]
209 self.logger.warning ("SlabImporter: cannot find node_id %s \
210 - ignored" %(node_id))
213 self.hostname_to_hrn_escaped(slabdriver.slab_api.root_auth, \
215 self.logger.info("SLABIMPORTER node %s " %(node))
219 # xxx this sounds suspicious
222 node_record = self.find_record_by_type_hrn( 'node', hrn )
224 pkey = Keypair(create=True)
225 urn = hrn_to_urn(escaped_hrn, 'node')
227 self.auth_hierarchy.create_gid(urn, \
230 def slab_get_authority(hrn):
231 return hrn.split(".")[0]
233 node_record = RegNode(hrn=hrn, gid=node_gid,
235 authority=slab_get_authority(hrn))
238 node_record.just_created()
239 dbsession.add(node_record)
241 self.logger.info("SlabImporter: imported node: %s" \
243 self.update_just_added_records_dict(node_record)
244 except SQLAlchemyError:
245 self.logger.log_exc("SlabImporter: \
246 failed to import node")
248 #TODO: xxx update the record ...
250 node_record.stale = False
253 def init_person_key (self, person, slab_key):
254 """ Returns a tuple pubkey and pkey.
256 :param person Person's data.
258 :param slab_key: SSH public key, from LDAP user's data.
260 :type slab_key: string
261 :rtype (string, Keypair)
265 # randomly pick first key in set
269 pkey = convert_public_key(pubkey)
271 #key not good. create another pkey
272 self.logger.warn('SlabImporter: \
273 unable to convert public \
274 key for %s' %person['hrn'])
275 pkey = Keypair(create=True)
278 # the user has no keys.
279 #Creating a random keypair for the user's gid
280 self.logger.warn("SlabImporter: person %s does not have a \
281 public key" %(person['hrn']))
282 pkey = Keypair(create=True)
283 return (pubkey, pkey)
286 def import_persons_and_slices(self, slabdriver):
288 Gets user data from LDAP, process the information.
289 Creates hrn for the user's slice, the user's gid, creates
290 the RegUser record associated with user. Creates the RegKey record
291 associated nwith the user's key.
292 Saves those records into the SFA DB.
293 import the user's slice onto the database as well by calling
296 :param slabdriver:SlabDriver object, used to have access to slabdriver
298 :type slabdriver:SlabDriver
300 ldap_person_listdict = slabdriver.slab_api.GetPersons()
301 self.logger.info("SLABIMPORT \t ldap_person_listdict %s \r\n" \
302 %(ldap_person_listdict))
305 for person in ldap_person_listdict :
307 self.logger.info("SlabImporter: person :" %(person))
308 if 'ssh-rsa' not in person['pkey']:
309 #people with invalid ssh key (ssh-dss, empty, bullshit keys...)
312 person_hrn = person['hrn']
313 slice_hrn = self.slicename_to_hrn(person['hrn'])
315 # xxx suspicious again
316 if len(person_hrn) > 64:
317 person_hrn = person_hrn[:64]
318 person_urn = hrn_to_urn(person_hrn, 'user')
321 self.logger.info("SlabImporter: users_rec_by_email %s " \
322 %(self.users_rec_by_email))
324 #Check if user using person['email'] from LDAP is already registered
325 #in SFA. One email = one person. In this case, do not create another
326 #record for this person
327 #person_hrn returned by GetPerson based on senslab root auth +
329 user_record = self.find_record_by_type_hrn('user', person_hrn)
331 if not user_record and person['email'] in self.users_rec_by_email:
332 user_record = self.users_rec_by_email[person['email']]
333 person_hrn = user_record.hrn
334 person_urn = hrn_to_urn(person_hrn, 'user')
337 slice_record = self.find_record_by_type_hrn ('slice', slice_hrn)
339 slab_key = person['pkey']
342 (pubkey, pkey) = self.init_person_key(person, slab_key)
343 if pubkey is not None and pkey is not None :
345 self.auth_hierarchy.create_gid(person_urn, \
348 self.logger.debug( "SLAB IMPORTER \
349 PERSON EMAIL OK email %s " %(person['email']))
350 person_gid.set_email(person['email'])
351 user_record = RegUser(hrn=person_hrn, \
354 authority=get_authority(person_hrn),
355 email=person['email'])
357 user_record = RegUser(hrn=person_hrn, \
360 authority=get_authority(person_hrn))
363 user_record.reg_keys = [RegKey(pubkey)]
365 self.logger.warning("No key found for user %s" \
369 user_record.just_created()
370 dbsession.add (user_record)
372 self.logger.info("SlabImporter: imported person %s"\
374 self.update_just_added_records_dict( user_record )
376 except SQLAlchemyError:
377 self.logger.log_exc("SlabImporter: \
378 failed to import person %s"%(person))
380 # update the record ?
381 # if user's primary key has changed then we need to update
382 # the users gid by forcing an update here
383 sfa_keys = user_record.reg_keys
386 if slab_key is not sfa_keys :
389 self.logger.info("SlabImporter: \t \t USER UPDATE \
390 person: %s" %(person['hrn']))
391 (pubkey, pkey) = self.init_person_key (person, slab_key)
393 self.auth_hierarchy.create_gid(person_urn, \
396 user_record.reg_keys = []
398 user_record.reg_keys = [RegKey(pubkey)]
399 self.logger.info("SlabImporter: updated person: %s" \
403 user_record.email = person['email']
407 user_record.stale = False
408 except SQLAlchemyError:
409 self.logger.log_exc("SlabImporter: \
410 failed to update person %s"%(person))
412 self.import_slice(slice_hrn, slice_record, user_record)
415 def import_slice(self, slice_hrn, slice_record, user_record):
417 Create RegSlice record according to the slice hrn if the slice
418 does not exist yet.Creates a relationship with the user record
419 associated with the slice.
420 Commit the record to the database.
421 TODO: Update the record if a slice record already exists.
423 :param slice_hrn: Human readable name of the slice.
424 :type slice_hrn: string
425 :param slice_record: record of the slice found in the DB, if any.
426 :type slice_record: RegSlice or None
427 :param user_record: user record found in the DB if any.
428 :type user_record: RegUser
431 if not slice_record :
432 pkey = Keypair(create=True)
433 urn = hrn_to_urn(slice_hrn, 'slice')
435 self.auth_hierarchy.create_gid(urn, \
437 slice_record = RegSlice (hrn=slice_hrn, gid=slice_gid,
439 authority=get_authority(slice_hrn))
441 slice_record.just_created()
442 dbsession.add(slice_record)
446 self.update_just_added_records_dict ( slice_record )
448 except SQLAlchemyError:
449 self.logger.log_exc("SlabImporter: failed to import slice")
451 #No slice update upon import in senslab
453 # xxx update the record ...
454 self.logger.warning ("Slice update not yet implemented")
456 # record current users affiliated with the slice
459 slice_record.reg_researchers = [user_record]
462 slice_record.stale = False
463 except SQLAlchemyError:
464 self.logger.log_exc("SlabImporter: failed to update slice")
467 def run (self, options):
469 Create the special senslab table, slab_xp, in the senslab database.
470 Import everything (users, slices, nodes and sites from OAR
471 and LDAP) into the SFA database.
472 Delete stale records that are no longer in OAR or LDAP.
478 slabdriver = SlabDriver(config)
480 #Create special slice table for senslab
482 if not slabdriver.db.exists('slab_xp'):
483 slabdriver.db.createtable()
484 self.logger.info ("SlabImporter.run: slab_xp table created ")
487 # import site and node records in site into the SFA db.
488 self.import_sites_and_nodes(slabdriver)
489 #import users and slice into the SFA DB.
490 self.import_persons_and_slices(slabdriver)
492 ### remove stale records
493 # special records must be preserved
494 system_hrns = [slabdriver.hrn, slabdriver.slab_api.root_auth, \
495 slabdriver.hrn+ '.slicemanager']
496 for record in self.all_records:
497 if record.hrn in system_hrns:
499 if record.peer_authority:
503 for record in self.all_records:
504 if record.type == 'user':
505 self.logger.info("SlabImporter: stale records: hrn %s %s" \
506 %(record.hrn,record.stale) )
511 self.logger.warning("stale not found with %s"%record)
513 self.logger.info("SlabImporter: deleting stale record: %s" \
517 dbsession.delete(record)
519 except SQLAlchemyError:
520 self.logger.log_exc("SlabImporter: failed to delete stale \
521 record %s" %(record) )