3 from sfa.util.config import Config
4 from sfa.util.xrn import Xrn, get_authority, hrn_to_urn
6 from sfa.senslab.slabdriver import SlabDriver
8 from sfa.trust.certificate import Keypair, convert_public_key
9 from sfa.trust.gid import create_uuid
11 from sfa.storage.alchemy import dbsession
12 from sfa.storage.model import RegRecord, RegAuthority, RegSlice, RegNode, \
16 from sqlalchemy.exc import SQLAlchemyError
19 def _get_site_hrn(site):
25 def __init__ (self, auth_hierarchy, loc_logger):
26 self.auth_hierarchy = auth_hierarchy
27 self.logger = loc_logger
28 self.logger.setLevelDebug()
29 #retrieve all existing SFA objects
30 self.all_records = dbsession.query(RegRecord).all()
32 # initialize record.stale to True by default,
33 # then mark stale=False on the ones that are in use
34 for record in self.all_records:
36 #create hash by (type,hrn)
37 #used to know if a given record is already known to SFA
38 self.records_by_type_hrn = \
39 dict([( (record.type,record.hrn), record) \
40 for record in self.all_records])
42 self.users_rec_by_email = \
43 dict([ (record.email, record) \
44 for record in self.all_records if record.type == 'user'])
46 # create hash by (type,pointer)
47 self.records_by_type_pointer = \
48 dict([ ( (str(record.type), record.pointer) , record) \
49 for record in self.all_records if record.pointer != -1])
54 def hostname_to_hrn_escaped(root_auth, hostname):
55 return '.'.join( [root_auth, Xrn.escape(hostname)] )
59 def slicename_to_hrn(person_hrn):
60 return (person_hrn +'_slice')
62 def add_options (self, parser):
63 # we don't have any options for now
66 def find_record_by_type_hrn(self, record_type, hrn):
67 return self.records_by_type_hrn.get ( (record_type, hrn), None)
69 def locate_by_type_pointer (self, record_type, pointer):
70 ret = self.records_by_type_pointer.get ( (record_type, pointer), None)
73 def update_just_added_records_dict (self, record):
74 rec_tuple = (record.type, record.hrn)
75 if rec_tuple in self.records_by_type_hrn:
76 self.logger.warning ("SlabImporter.update_just_added_records_dict:\
77 duplicate (%s,%s)"%rec_tuple)
79 self.records_by_type_hrn [ rec_tuple ] = record
81 def import_sites_and_nodes(self, slabdriver):
82 sites_listdict = slabdriver.slab_api.GetSites()
83 nodes_listdict = slabdriver.slab_api.GetNodes()
84 nodes_by_id = dict([(node['node_id'],node) for node in nodes_listdict])
85 for site in sites_listdict:
86 site_hrn = _get_site_hrn(site)
87 site_record = self.find_record_by_type_hrn ('authority', site_hrn)
90 urn = hrn_to_urn(site_hrn, 'authority')
91 if not self.auth_hierarchy.auth_exists(urn):
92 self.auth_hierarchy.create_auth(urn)
94 auth_info = self.auth_hierarchy.get_auth_info(urn)
95 site_record = RegAuthority(hrn=site_hrn, \
96 gid=auth_info.get_gid_object(),
98 authority=get_authority(site_hrn))
99 site_record.just_created()
100 dbsession.add(site_record)
102 self.logger.info("SlabImporter: imported authority (site) \
104 self.update_just_added_records_dict(site_record)
105 except SQLAlchemyError:
106 # if the site import fails then there is no point in
107 # trying to import the
108 # site's child records(node, slices, persons), so skip them.
109 self.logger.log_exc("SlabImporter: failed to import site. \
110 Skipping child records")
113 # xxx update the record ...
117 site_record.stale = False
118 self.import_nodes(site['node_ids'], nodes_by_id, slabdriver)
122 def import_nodes(self, node_ids, nodes_by_id, slabdriver) :
124 for node_id in node_ids:
126 node = nodes_by_id[node_id]
128 self.logger.warning ("SlabImporter: cannot find node_id %s \
129 - ignored" %(node_id))
132 self.hostname_to_hrn_escaped(slabdriver.slab_api.root_auth, \
134 print>>sys.stderr, "\r\n \r\n SLABIMPORTER node %s " %(node)
138 # xxx this sounds suspicious
139 if len(hrn) > 64: hrn = hrn[:64]
140 node_record = self.find_record_by_type_hrn( 'node', hrn )
142 pkey = Keypair(create=True)
143 urn = hrn_to_urn(escaped_hrn, 'node')
145 self.auth_hierarchy.create_gid(urn, \
148 def slab_get_authority(hrn):
149 return hrn.split(".")[0]
151 node_record = RegNode(hrn=hrn, gid=node_gid,
153 authority=slab_get_authority(hrn))
156 node_record.just_created()
157 dbsession.add(node_record)
159 self.logger.info("SlabImporter: imported node: %s" \
161 self.update_just_added_records_dict(node_record)
162 except SQLAlchemyError:
163 self.logger.log_exc("SlabImporter: \
164 failed to import node")
166 # xxx update the record ...
168 node_record.stale=False
170 # return a tuple pubkey (a plc key object) and pkey (a Keypair object)
172 def init_person_key (self, person, slab_key):
175 # randomly pick first key in set
179 pkey = convert_public_key(pubkey)
181 #key not good. create another pkey
182 self.logger.warn('SlabImporter: \
183 unable to convert public \
184 key for %s' %person['hrn'])
185 pkey = Keypair(create=True)
188 # the user has no keys.
189 #Creating a random keypair for the user's gid
190 self.logger.warn("SlabImporter: person %s does not have a \
191 public key" %(person['hrn']))
192 pkey = Keypair(create=True)
193 return (pubkey, pkey)
196 def import_persons_and_slices(self, slabdriver):
197 ldap_person_listdict = slabdriver.slab_api.GetPersons()
198 print>>sys.stderr,"\r\n SLABIMPORT \t ldap_person_listdict %s \r\n" \
199 %(ldap_person_listdict)
202 for person in ldap_person_listdict :
204 self.logger.info("SlabImporter: person :" %(person))
205 if 'ssh-rsa' not in person['pkey']:
206 #people with invalid ssh key (ssh-dss, empty, bullshit keys...)
209 person_hrn = person['hrn']
210 slice_hrn = self.slicename_to_hrn(person['hrn'])
212 # xxx suspicious again
213 if len(person_hrn) > 64: person_hrn = person_hrn[:64]
214 person_urn = hrn_to_urn(person_hrn, 'user')
217 self.logger.info("SlabImporter: users_rec_by_email %s " \
218 %(self.users_rec_by_email))
220 #Check if user using person['email'] form LDAP is already registered
221 #in SFA. One email = one person. In this case, do not create another
222 #record for this person
223 #person_hrn returned by GetPerson based on senslab root auth +
225 user_record = self.find_record_by_type_hrn('user', person_hrn)
227 if not user_record and person['email'] in self.users_rec_by_email:
228 user_record = self.users_rec_by_email[person['email']]
229 person_hrn = user_record.hrn
230 person_urn = hrn_to_urn(person_hrn, 'user')
233 slice_record = self.find_record_by_type_hrn ('slice', slice_hrn)
235 slab_key = person['pkey']
238 (pubkey,pkey) = self.init_person_key(person, slab_key)
239 if pubkey is not None and pkey is not None :
241 self.auth_hierarchy.create_gid(person_urn, \
244 print>>sys.stderr, "\r\n \r\n SLAB IMPORTER \
245 PERSON EMAIL OK email %s " %(person['email'])
246 person_gid.set_email(person['email'])
247 user_record = RegUser(hrn=person_hrn, \
250 authority=get_authority(person_hrn),
251 email=person['email'])
253 user_record = RegUser(hrn=person_hrn, \
256 authority=get_authority(person_hrn))
259 user_record.reg_keys = [RegKey(pubkey)]
261 self.logger.warning("No key found for user %s" \
265 user_record.just_created()
266 dbsession.add (user_record)
268 self.logger.info("SlabImporter: imported person: %s"\
270 self.update_just_added_records_dict( user_record )
272 except SQLAlchemyError:
273 self.logger.log_exc("SlabImporter: \
274 failed to import person %s"%(person))
276 # update the record ?
277 # if user's primary key has changed then we need to update
278 # the users gid by forcing an update here
279 sfa_keys = user_record.reg_keys
282 if slab_key is not sfa_keys :
285 print>>sys.stderr,"SlabImporter: \t \t USER UPDATE \
286 person: %s" %(person['hrn'])
287 (pubkey,pkey) = self.init_person_key (person, slab_key)
289 self.auth_hierarchy.create_gid(person_urn, \
292 user_record.reg_keys = []
294 user_record.reg_keys = [RegKey(pubkey)]
295 self.logger.info("SlabImporter: updated person: %s" \
299 user_record.email = person['email']
303 user_record.stale = False
304 except SQLAlchemyError:
305 self.logger.log_exc("SlabImporter: \
306 failed to update person %s"%(person))
308 self.import_slice(slice_hrn, slice_record,user_record)
311 def import_slice(self, slice_hrn, slice_record, user_record):
313 if not slice_record :
315 pkey = Keypair(create=True)
316 urn = hrn_to_urn(slice_hrn, 'slice')
318 self.auth_hierarchy.create_gid(urn, \
320 slice_record = RegSlice (hrn=slice_hrn, gid=slice_gid,
322 authority=get_authority(slice_hrn))
324 slice_record.just_created()
325 dbsession.add(slice_record)
328 #Serial id created after commit
330 #sl_rec = dbsession.query(RegSlice).filter(RegSlice.hrn.match(slice_hrn)).all()
333 self.update_just_added_records_dict ( slice_record )
335 except SQLAlchemyError:
336 self.logger.log_exc("SlabImporter: failed to import slice")
338 #No slice update upon import in senslab
340 # xxx update the record ...
341 self.logger.warning ("Slice update not yet implemented")
343 # record current users affiliated with the slice
346 slice_record.reg_researchers = [user_record]
349 slice_record.stale = False
350 except SQLAlchemyError:
351 self.logger.log_exc("SlabImporter: failed to update slice")
354 def run (self, options):
357 slabdriver = SlabDriver(config)
359 #Create special slice table for senslab
361 if not slabdriver.db.exists('slab_xp'):
362 slabdriver.db.createtable()
363 self.logger.info ("SlabImporter.run: slab_xp table created ")
366 self.import_sites_and_nodes(slabdriver)
368 self.import_persons_and_slices(slabdriver)
369 #slices_listdict = slabdriver.slab_api.GetSlices()
371 #slices_by_userid = \
372 #dict([(one_slice['reg_researchers']['record_id'], one_slice ) \
373 #for one_slice in slices_listdict ])
375 #self.logger.log_exc("SlabImporter: failed to create list \
376 #of slices by user id.")
381 # import node records in site
388 ### remove stale records
389 # special records must be preserved
390 system_hrns = [slabdriver.hrn, slabdriver.slab_api.root_auth, \
391 slabdriver.hrn+ '.slicemanager']
392 for record in self.all_records:
393 if record.hrn in system_hrns:
395 if record.peer_authority:
399 for record in self.all_records:
400 if record.type == 'user':
401 print>>sys.stderr,"SlabImporter: stale records: hrn %s %s" \
402 %(record.hrn,record.stale)
407 self.logger.warning("stale not found with %s"%record)
409 self.logger.info("SlabImporter: deleting stale record: %s" \
413 dbsession.delete(record)
415 except SQLAlchemyError:
416 self.logger.log_exc("SlabImporter: failed to delete stale \
417 record %s" %(record) )