1 from sfa.util.config import Config
2 from sfa.util.xrn import Xrn, get_authority, hrn_to_urn
4 from sfa.senslab.slabdriver import SlabDriver
6 from sfa.trust.certificate import Keypair, convert_public_key
7 from sfa.trust.gid import create_uuid
9 from sfa.storage.alchemy import dbsession
10 from sfa.storage.model import RegRecord, RegAuthority, RegSlice, RegNode, \
14 from sqlalchemy.exc import SQLAlchemyError
20 SlabImporter class, generic importer_class. Used to populate the SFA DB
21 with senslab resources' records.
22 Used to update records when new resources, users or nodes, are added
26 def __init__ (self, auth_hierarchy, loc_logger):
28 Sets and defines import logger and the authority name. Gathers all the
29 records already registerd in the SFA DB, broke them into 3 dicts,
30 by type and hrn, by email and by type and pointer.
32 :param auth_hierarchy: authority name
33 :type auth_hierarchy: string
34 :param loc_logger: local logger
35 :type loc_logger: _SfaLogger
38 self.auth_hierarchy = auth_hierarchy
39 self.logger = loc_logger
40 self.logger.setLevelDebug()
41 #retrieve all existing SFA objects
42 self.all_records = dbsession.query(RegRecord).all()
44 # initialize record.stale to True by default,
45 # then mark stale=False on the ones that are in use
46 for record in self.all_records:
48 #create hash by (type,hrn)
49 #used to know if a given record is already known to SFA
50 self.records_by_type_hrn = \
51 dict([( (record.type,record.hrn), record) \
52 for record in self.all_records])
54 self.users_rec_by_email = \
55 dict([ (record.email, record) \
56 for record in self.all_records if record.type == 'user'])
58 # create hash by (type,pointer)
59 self.records_by_type_pointer = \
60 dict([ ( (str(record.type), record.pointer) , record) \
61 for record in self.all_records if record.pointer != -1])
66 def hostname_to_hrn_escaped(root_auth, hostname):
69 Returns a node's hrn based on its hostname and the root
70 authority and by removing special caracters from the hostname.
72 :param root_auth: root authority name
73 :param hostname: nodes's hostname
74 :type root_auth: string
75 :type hostname: string
78 return '.'.join( [root_auth, Xrn.escape(hostname)] )
82 def slicename_to_hrn(person_hrn):
85 Returns the slicename associated to a given person's hrn.
87 :param person_hrn: user's hrn
88 :type person_hrn: string
91 return (person_hrn +'_slice')
93 def add_options (self, parser):
94 # we don't have any options for now
97 def find_record_by_type_hrn(self, record_type, hrn):
100 Returns the record associated with a given hrn and hrn type.
101 Returns None if the key tuple is not in dictionary.
103 :param record_type: the record's type (slice, node, authority...)
104 :type record_type: string
105 :param hrn: Human readable name of the object's record
107 :rtype: RegUser if user, RegSlice if slice, RegNode if node...
108 or None if record does not exist.
111 return self.records_by_type_hrn.get ( (record_type, hrn), None)
113 def locate_by_type_pointer (self, record_type, pointer):
116 Returns the record corresponding to the key pointer and record
117 type. Returns None if the record does not exist and is not in the
118 records_by_type_pointer dictionnary.
120 :param record_type: the record's type (slice, node, authority...)
121 :type record_type: string
122 :param pointer:Pointer to where the record is in the origin db,
123 used in case the record comes from a trusted authority.
124 :type pointer: integer
125 :rtype: RegUser if user, RegSlice if slice, RegNode if node...
126 or None if record does not exist.
128 return self.records_by_type_pointer.get ( (record_type, pointer), None)
131 def update_just_added_records_dict (self, record):
134 Updates the records_by_type_hrn dictionnary if the record has
137 :param record: Record to add in the records_by_type_hrn dict.
138 :type record: dictionary
140 rec_tuple = (record.type, record.hrn)
141 if rec_tuple in self.records_by_type_hrn:
142 self.logger.warning ("SlabImporter.update_just_added_records_dict:\
143 duplicate (%s,%s)"%rec_tuple)
145 self.records_by_type_hrn [ rec_tuple ] = record
147 def import_sites_and_nodes(self, slabdriver):
150 Gets all the sites and nodes from OAR, process the information,
151 creates hrns and RegAuthority for sites, and feed them to the database.
152 For each site, import the site's nodes to the DB by calling
155 :param slabdriver: SlabDriver object, used to have access to slabdriver
156 methods and fetching info on sites and nodes.
157 :type slabdriver: SlabDriver
160 sites_listdict = slabdriver.slab_api.GetSites()
161 nodes_listdict = slabdriver.slab_api.GetNodes()
162 nodes_by_id = dict([(node['node_id'], node) for node in nodes_listdict])
163 for site in sites_listdict:
164 site_hrn = site['name']
165 site_record = self.find_record_by_type_hrn ('authority', site_hrn)
168 urn = hrn_to_urn(site_hrn, 'authority')
169 if not self.auth_hierarchy.auth_exists(urn):
170 self.auth_hierarchy.create_auth(urn)
172 auth_info = self.auth_hierarchy.get_auth_info(urn)
173 site_record = RegAuthority(hrn=site_hrn, \
174 gid=auth_info.get_gid_object(),
176 authority=get_authority(site_hrn))
177 site_record.just_created()
178 dbsession.add(site_record)
180 self.logger.info("SlabImporter: imported authority (site) \
182 self.update_just_added_records_dict(site_record)
183 except SQLAlchemyError:
184 # if the site import fails then there is no point in
185 # trying to import the
186 # site's child records(node, slices, persons), so skip them.
187 self.logger.log_exc("SlabImporter: failed to import site. \
188 Skipping child records")
191 # xxx update the record ...
195 site_record.stale = False
196 self.import_nodes(site['node_ids'], nodes_by_id, slabdriver)
200 def import_nodes(self, site_node_ids, nodes_by_id, slabdriver) :
203 Creates appropriate hostnames and RegNode records for
204 each node in site_node_ids, based on the information given by the
205 dict nodes_by_id that was made from data from OAR.
206 Saves the records to the DB.
208 :param site_node_ids: site's node ids
209 :type site_node_ids: list of integers
210 :param nodes_by_id: dictionary , key is the node id, value is the a dict
211 with node information.
212 :type nodes_by_id: dictionary
213 :param slabdriver:SlabDriver object, used to have access to slabdriver
215 :type slabdriver:SlabDriver
219 for node_id in site_node_ids:
221 node = nodes_by_id[node_id]
223 self.logger.warning ("SlabImporter: cannot find node_id %s \
224 - ignored" %(node_id))
227 self.hostname_to_hrn_escaped(slabdriver.slab_api.root_auth, \
229 self.logger.info("SLABIMPORTER node %s " %(node))
233 # xxx this sounds suspicious
236 node_record = self.find_record_by_type_hrn( 'node', hrn )
238 pkey = Keypair(create=True)
239 urn = hrn_to_urn(escaped_hrn, 'node')
241 self.auth_hierarchy.create_gid(urn, \
244 def slab_get_authority(hrn):
245 return hrn.split(".")[0]
247 node_record = RegNode(hrn=hrn, gid=node_gid,
249 authority=slab_get_authority(hrn))
252 node_record.just_created()
253 dbsession.add(node_record)
255 self.logger.info("SlabImporter: imported node: %s" \
257 self.update_just_added_records_dict(node_record)
258 except SQLAlchemyError:
259 self.logger.log_exc("SlabImporter: \
260 failed to import node")
262 #TODO: xxx update the record ...
264 node_record.stale = False
267 def init_person_key (self, person, slab_key):
270 Returns a tuple pubkey and pkey.
272 :param person Person's data.
274 :param slab_key: SSH public key, from LDAP user's data.
276 :type slab_key: string
277 :rtype (string, Keypair)
281 # randomly pick first key in set
285 pkey = convert_public_key(pubkey)
287 #key not good. create another pkey
288 self.logger.warn('SlabImporter: \
289 unable to convert public \
290 key for %s' %person['hrn'])
291 pkey = Keypair(create=True)
294 # the user has no keys.
295 #Creating a random keypair for the user's gid
296 self.logger.warn("SlabImporter: person %s does not have a \
297 public key" %(person['hrn']))
298 pkey = Keypair(create=True)
299 return (pubkey, pkey)
302 def import_persons_and_slices(self, slabdriver):
305 Gets user data from LDAP, process the information.
306 Creates hrn for the user's slice, the user's gid, creates
307 the RegUser record associated with user. Creates the RegKey record
308 associated nwith the user's key.
309 Saves those records into the SFA DB.
310 import the user's slice onto the database as well by calling
313 :param slabdriver:SlabDriver object, used to have access to slabdriver
315 :type slabdriver:SlabDriver
317 ldap_person_listdict = slabdriver.slab_api.GetPersons()
318 self.logger.info("SLABIMPORT \t ldap_person_listdict %s \r\n" \
319 %(ldap_person_listdict))
322 for person in ldap_person_listdict :
324 self.logger.info("SlabImporter: person :" %(person))
325 if 'ssh-rsa' not in person['pkey']:
326 #people with invalid ssh key (ssh-dss, empty, bullshit keys...)
329 person_hrn = person['hrn']
330 slice_hrn = self.slicename_to_hrn(person['hrn'])
332 # xxx suspicious again
333 if len(person_hrn) > 64:
334 person_hrn = person_hrn[:64]
335 person_urn = hrn_to_urn(person_hrn, 'user')
338 self.logger.info("SlabImporter: users_rec_by_email %s " \
339 %(self.users_rec_by_email))
341 #Check if user using person['email'] from LDAP is already registered
342 #in SFA. One email = one person. In this case, do not create another
343 #record for this person
344 #person_hrn returned by GetPerson based on senslab root auth +
346 user_record = self.find_record_by_type_hrn('user', person_hrn)
348 if not user_record and person['email'] in self.users_rec_by_email:
349 user_record = self.users_rec_by_email[person['email']]
350 person_hrn = user_record.hrn
351 person_urn = hrn_to_urn(person_hrn, 'user')
354 slice_record = self.find_record_by_type_hrn ('slice', slice_hrn)
356 slab_key = person['pkey']
359 (pubkey, pkey) = self.init_person_key(person, slab_key)
360 if pubkey is not None and pkey is not None :
362 self.auth_hierarchy.create_gid(person_urn, \
365 self.logger.debug( "SLAB IMPORTER \
366 PERSON EMAIL OK email %s " %(person['email']))
367 person_gid.set_email(person['email'])
368 user_record = RegUser(hrn=person_hrn, \
371 authority=get_authority(person_hrn),
372 email=person['email'])
374 user_record = RegUser(hrn=person_hrn, \
377 authority=get_authority(person_hrn))
380 user_record.reg_keys = [RegKey(pubkey)]
382 self.logger.warning("No key found for user %s" \
386 user_record.just_created()
387 dbsession.add (user_record)
389 self.logger.info("SlabImporter: imported person %s"\
391 self.update_just_added_records_dict( user_record )
393 except SQLAlchemyError:
394 self.logger.log_exc("SlabImporter: \
395 failed to import person %s"%(person))
397 # update the record ?
398 # if user's primary key has changed then we need to update
399 # the users gid by forcing an update here
400 sfa_keys = user_record.reg_keys
403 if slab_key is not sfa_keys :
406 self.logger.info("SlabImporter: \t \t USER UPDATE \
407 person: %s" %(person['hrn']))
408 (pubkey, pkey) = self.init_person_key (person, slab_key)
410 self.auth_hierarchy.create_gid(person_urn, \
413 user_record.reg_keys = []
415 user_record.reg_keys = [RegKey(pubkey)]
416 self.logger.info("SlabImporter: updated person: %s" \
420 user_record.email = person['email']
424 user_record.stale = False
425 except SQLAlchemyError:
426 self.logger.log_exc("SlabImporter: \
427 failed to update person %s"%(person))
429 self.import_slice(slice_hrn, slice_record, user_record)
432 def import_slice(self, slice_hrn, slice_record, user_record):
435 Create RegSlice record according to the slice hrn if the slice
436 does not exist yet.Creates a relationship with the user record
437 associated with the slice.
438 Commit the record to the database.
441 :param slice_hrn: Human readable name of the slice.
442 :type slice_hrn: string
443 :param slice_record: record of the slice found in the DB, if any.
444 :type slice_record: RegSlice or None
445 :param user_record: user record found in the DB if any.
446 :type user_record: RegUser
448 .. todo::Update the record if a slice record already exists.
450 if not slice_record :
451 pkey = Keypair(create=True)
452 urn = hrn_to_urn(slice_hrn, 'slice')
454 self.auth_hierarchy.create_gid(urn, \
456 slice_record = RegSlice (hrn=slice_hrn, gid=slice_gid,
458 authority=get_authority(slice_hrn))
460 slice_record.just_created()
461 dbsession.add(slice_record)
465 self.update_just_added_records_dict ( slice_record )
467 except SQLAlchemyError:
468 self.logger.log_exc("SlabImporter: failed to import slice")
470 #No slice update upon import in senslab
472 # xxx update the record ...
473 self.logger.warning ("Slice update not yet implemented")
475 # record current users affiliated with the slice
478 slice_record.reg_researchers = [user_record]
481 slice_record.stale = False
482 except SQLAlchemyError:
483 self.logger.log_exc("SlabImporter: failed to update slice")
486 def run (self, options):
488 Create the special senslab table, slab_xp, in the senslab database.
489 Import everything (users, slices, nodes and sites from OAR
490 and LDAP) into the SFA database.
491 Delete stale records that are no longer in OAR or LDAP.
497 slabdriver = SlabDriver(config)
499 #Create special slice table for senslab
501 if not slabdriver.db.exists('slab_xp'):
502 slabdriver.db.createtable()
503 self.logger.info ("SlabImporter.run: slab_xp table created ")
506 # import site and node records in site into the SFA db.
507 self.import_sites_and_nodes(slabdriver)
508 #import users and slice into the SFA DB.
509 self.import_persons_and_slices(slabdriver)
511 ### remove stale records
512 # special records must be preserved
513 system_hrns = [slabdriver.hrn, slabdriver.slab_api.root_auth, \
514 slabdriver.hrn+ '.slicemanager']
515 for record in self.all_records:
516 if record.hrn in system_hrns:
518 if record.peer_authority:
522 for record in self.all_records:
523 if record.type == 'user':
524 self.logger.info("SlabImporter: stale records: hrn %s %s" \
525 %(record.hrn,record.stale) )
530 self.logger.warning("stale not found with %s"%record)
532 self.logger.info("SlabImporter: deleting stale record: %s" \
536 dbsession.delete(record)
538 except SQLAlchemyError:
539 self.logger.log_exc("SlabImporter: failed to delete stale \
540 record %s" %(record) )