Merge branch 'master' of git://git.onelab.eu/sfa into upmc
[sfa.git] / sfa / generic / importer.py
1 import os
2
3 from sfa.util.config import Config
4 from sfa.util.xrn import Xrn, get_leaf, get_authority, hrn_to_urn
5
6 from sfa.trust.gid import create_uuid    
7 from sfa.trust.certificate import convert_public_key, Keypair
8
9 from sfa.storage.alchemy import dbsession
10 from sfa.storage.model import RegRecord, RegAuthority, RegSlice, RegNode, RegUser, RegKey
11
12 class Importer:
13
14     def __init__ (self, auth_hierarchy, logger):
15         self.auth_hierarchy = auth_hierarchy
16         self.logger=logger
17
18     def add_options (self, parser):
19         # We don't have any options for now
20         pass
21
22     # hrn hash is initialized from current db
23     # remember just-created records as we go
24     # xxx might make sense to add a UNIQUE constraint in the db itself
25     def remember_record_by_hrn (self, record):
26         tuple = (record.type, record.hrn)
27         if tuple in self.records_by_type_hrn:
28             self.logger.warning ("Importer.remember_record_by_hrn: duplicate (%s,%s)"%tuple)
29             return
30         self.records_by_type_hrn [ tuple ] = record
31
32     # ditto for pointer hash
33     def remember_record_by_pointer (self, record):
34         if record.pointer == -1:
35             self.logger.warning ("Importer.remember_record_by_pointer: pointer is void")
36             return
37         tuple = (record.type, record.pointer)
38         if tuple in self.records_by_type_pointer:
39             self.logger.warning ("Importer.remember_record_by_pointer: duplicate (%s,%s)"%tuple)
40             return
41         self.records_by_type_pointer [ ( record.type, record.pointer,) ] = record
42
43     def remember_record (self, record):
44         self.remember_record_by_hrn (record)
45         self.remember_record_by_pointer (record)
46
47     def locate_by_type_hrn (self, type, hrn):
48         return self.records_by_type_hrn.get ( (type, hrn), None)
49
50     def locate_by_type_pointer (self, type, pointer):
51         return self.records_by_type_pointer.get ( (type, pointer), None)
52
53     ############################################################################
54     # Object import functions (authorities, resources, users, slices)
55     #
56
57     def import_auth(self, auth, parent_auth_hrn):
58         """
59         @return HRN of the newly created authority
60         """
61         auth_hrn = self.get_auth_naming(auth, parent_auth_hrn)
62         auth_urn = hrn_to_urn(auth_hrn, 'authority')
63
64         # import if hrn is not in list of existing hrns or if the hrn exists
65         # but its not a auth record
66         auth_record=self.locate_by_type_hrn ('authority', auth_hrn)
67         if not auth_record:
68             try:
69                 # We ensure the parent is created through the root
70                 #if not self.auth_hierarchy.auth_exists(auth_urn):
71                 #    self.auth_hierarchy.create_auth(auth_urn)
72                 auth_info = self.auth_hierarchy.get_auth_info(auth_urn)
73                 auth_record = RegAuthority(hrn = auth_hrn, gid = auth_info.get_gid_object(),
74                                            pointer = 0,
75                                            authority = get_authority(auth_hrn))
76                 auth_record.just_created()
77                 dbsession.add(auth_record)
78                 dbsession.commit()
79                 self.logger.info("Importer: imported authority (auth) : %s" % auth_record) 
80                 self.remember_record (auth_record)
81             except Exception, e:
82                 # if the auth import fails then there is no point in trying to import the
83                 # auth's child records (node, slices, persons), so skip them.
84                 raise Exception, "Importer: failed to import auth. Skipping child records : %s" % e
85         else:
86             # xxx update the record ...
87             pass
88         auth_record.stale=False
89
90         return auth_hrn
91
92     def import_resource(self, resource, parent_auth_hrn):
93         """
94         @return HRN of the newly created resource
95         """
96         resource_hrn = self.get_resource_naming(resource, parent_auth_hrn)
97         resource_urn = hrn_to_urn(resource_hrn, 'node')
98
99         resource_record = self.locate_by_type_hrn ('node', resource_hrn )
100         if not resource_record:
101             try:
102                 pkey = Keypair(create=True)
103                 resource_gid = self.auth_hierarchy.create_gid(resource_urn, create_uuid(), pkey)
104                 resource_record = RegNode (hrn = resource_hrn, gid = resource_gid, 
105                                        pointer = resource['id'],
106                                        authority = get_authority(resource_hrn))
107                 resource_record.just_created()
108                 dbsession.add(resource_record)
109                 dbsession.commit()
110                 self.logger.info("Importer: imported resource: %s" % resource_record)  
111                 self.remember_record (resource_record)
112             except:
113                    self.logger.log_exc("Importer: failed to import resource")
114         else:
115             # xxx update the record ...
116             pass
117         
118         resource_record.stale=False
119
120         return resource_hrn
121
122     def init_user_key(self, user):
123         pubkey = None
124         pkey = None
125         if user['keys']:
126             # pick first working key in set
127             for pubkey in user['keys']:
128                  try:
129                     pkey = convert_public_key(pubkey)
130                     break
131                  except:
132                     continue
133             if not pkey:
134                 self.logger.warn('Importer: unable to convert public key for %s' % user_hrn)
135                 pkey = Keypair(create=True)
136         else:
137             # the user has no keys. Creating a random keypair for the user's gid
138             self.logger.warn("Importer: user %s does not have a public key on the testbed"%user_hrn)
139             pkey = Keypair(create=True)
140         return (pubkey, pkey)
141
142     def import_user(self, user, parent_auth_hrn):
143         """
144         @return HRN of the newly created user
145         """
146         user_hrn = self.get_user_naming(user, parent_auth_hrn)
147         user_urn = hrn_to_urn(user_hrn, 'user')
148
149         # return a tuple pubkey (a public key) and pkey (a Keypair object)
150
151         user_record = self.locate_by_type_hrn ( 'user', user_hrn)
152         try:
153             if not user_record:
154                 (pubkey,pkey) = self.init_user_key (user)
155                 user_gid = self.auth_hierarchy.create_gid(user_urn, create_uuid(), pkey)
156                 user_gid.set_email(user['email'])
157                 user_record = RegUser(hrn = user_hrn, gid = user_gid, 
158                                          pointer = user['id'], 
159                                          authority = get_authority(user_hrn),
160                                          email = user['email'])
161                 if pubkey: 
162                     user_record.reg_keys=[RegKey(pubkey)]
163                 else:
164                     self.logger.warning("No key found for user %s" % user_record)
165                 user_record.just_created()
166                 dbsession.add (user_record)
167                 dbsession.commit()
168                 self.logger.info("Importer: imported user: %s" % user_record)
169                 self.remember_record ( user_record )
170             else:
171                 # update the record ?
172                 # if user's primary key has changed then we need to update the 
173                 # users gid by forcing an update here
174                 sfa_keys = user_record.reg_keys
175                 def key_in_list (key,sfa_keys):
176                     for reg_key in sfa_keys:
177                         if reg_key.key==key: return True
178                     return False
179                 # is there a new key ? XXX understand ?
180                 new_keys=False
181                 for key in user['keys']:
182                     if not key_in_list (key,sfa_keys):
183                         new_keys = True
184                 if new_keys:
185                     (pubkey,pkey) = init_user_key (user)
186                     user_gid = self.auth_hierarchy.create_gid(user_urn, create_uuid(), pkey)
187                     if not pubkey:
188                         user_record.reg_keys=[]
189                     else:
190                         user_record.reg_keys=[ RegKey (pubkey)]
191                     self.logger.info("Importer: updated user: %s" % user_record)
192             user_record.email = user['email']
193             dbsession.commit()
194             user_record.stale=False
195         except:
196             self.logger.log_exc("Importer: failed to import user %s %s"%(user['id'],user['email']))
197
198         return user_hrn
199
200     def import_slice(self, slice, parent_auth_hrn):
201         """
202         @return HRN of the newly created slice
203         """
204         slice_hrn = self.get_slice_naming(slice, parent_auth_hrn)
205         slice_urn = hrn_to_urn(slice_hrn, 'slice')
206
207         slice_record = self.locate_by_type_hrn ('slice', slice_hrn)
208         if not slice_record:
209             try:
210                 pkey = Keypair(create=True)
211                 slice_gid = self.auth_hierarchy.create_gid(slice_urn, create_uuid(), pkey)
212                 slice_record = RegSlice (hrn=slice_hrn, gid=slice_gid, 
213                                          pointer=slice['id'],
214                                          authority=get_authority(slice_hrn))
215                 slice_record.just_created()
216                 dbsession.add(slice_record)
217                 dbsession.commit()
218                 self.logger.info("Importer: imported slice: %s" % slice_record)  
219                 self.remember_record ( slice_record )
220             except:
221                 self.logger.log_exc("Importer: failed to import slice")
222         else:
223             # xxx update the record ...
224             self.logger.warning ("Slice update not yet implemented")
225             pass
226         # record current users affiliated with the slice
227         slice_record.reg_researchers = \
228               [ self.locate_by_type_pointer ('user',int(id)) for id in slice['user_ids'] ]
229         dbsession.commit()
230         slice_record.stale=False
231
232         return slice_hrn
233
234     ############################################################################
235     # Recursive import
236     #
237     def import_auth_rec(self, auth, parent=None):
238         """
239         Import authority and related objects (resources, users, slices), then
240         recurse through all subauthorities.
241
242         @param auth authority to be processed.
243         @return 1 if successful, exception otherwise
244         """
245
246         # Create entry for current authority
247         try:
248             auth_hrn = self.import_auth(auth, parent)
249
250             # Import objects related to current authority
251             if auth['resource_ids']:
252                 for resource_id in auth['resource_ids']:
253                     self.import_resource(self.resources_by_id[resource_id], auth_hrn)
254             if auth['user_ids']:
255                 for user_id in auth['user_ids']:
256                     self.import_user(self.users_by_id[user_id], auth_hrn)
257             if auth['slice_ids']:
258                 for slice_id in auth['slice_ids']:
259                     self.import_slice(self.slices_by_id[slice_id], auth_hrn)
260
261             # Recursive import of subauthorities
262             if auth['auth_ids']:
263                 for auth_id in auth['auth_ids']:
264                     self.import_auth_rec(self.authorities_by_id[auth_id], auth_hrn)
265         except Exception, e:
266             self.logger.log_exc(e)
267             pass
268
269     def locate_by_type_hrn (self, type, hrn):
270         return self.records_by_type_hrn.get ( (type, hrn), None)
271
272     ############################################################################
273     # Main processing function
274     #
275     def run (self, options):
276         config = Config ()
277         interface_hrn = config.SFA_INTERFACE_HRN
278         root_auth = config.SFA_REGISTRY_ROOT_AUTH
279         # <mytestbed> shell = NitosShell (config)
280
281         ######## retrieve all existing SFA objects
282         all_records = dbsession.query(RegRecord).all()
283
284         # create hash by (type,hrn) 
285         # we essentially use this to know if a given record is already known to SFA 
286         self.records_by_type_hrn = \
287             dict ( [ ( (record.type, record.hrn) , record ) for record in all_records ] )
288         # create hash by (type,pointer) 
289         self.records_by_type_pointer = \
290             dict ( [ ( (record.type, record.pointer) , record ) for record in all_records 
291                      if record.pointer != -1] )
292
293         # initialize record.stale to True by default, then mark stale=False on the ones that are in use
294         for record in all_records: record.stale=True
295
296         ######## Data collection
297
298         # Here we make the adaptation between the testbed API, and dictionaries with required fields
299
300         # AUTHORITIES
301         authorities = self.get_authorities()
302         self.authorities_by_id = {}
303         if authorities:
304             self.authorities_by_id = dict([(auth['id'], auth) for auth in authorities])
305
306         # USERS & KEYS
307         users = self.get_users()
308         self.users_by_id = {}
309         self.keys_by_id = {}
310         if users:
311             self.users_by_id = dict ( [ ( user['id'], user) for user in users ] )
312             self.keys_by_id = dict ( [ ( user['id'], user['keys']) for user in users ] ) 
313
314         # RESOURCES
315         resources = self.get_resources()
316         self.resources_by_id = {}
317         if resources:
318             self.resources_by_id = dict ( [ (resource['id'], resource) for resource in resources ] )
319
320         # SLICES
321         slices = self.get_slices()
322         self.slices_by_id = {}
323         if slices:
324             self.slices_by_id = dict ( [ (slice['id'], slice) for slice in slices ] )
325
326         ######## Import process
327
328         if authorities:
329             # Everybody belongs to sub-authorities, and we rely on the different
330             # subauthorities to give appropriate pointers to objects.
331             root = {
332                 'id': 0,
333                 'name': interface_hrn,
334                 'auth_ids': self.authorities_by_id.keys(),
335                 'user_ids': None,
336                 'resource_ids': None,
337                 'slice_ids': None
338             }
339         else:
340             # We create a root authority with all objects linked to it.
341             root = {
342                 'id': 0,
343                 'name': interface_hrn,
344                 'auth_ids': self.authorities_by_id.keys(),
345                 'user_ids': self.users_by_id.keys(),
346                 'resource_ids': self.resources_by_id.keys(),
347                 'slice_ids': self.slices_by_id.keys()
348             }
349
350         # Recurse through authorities and import the different objects
351         self.import_auth_rec(root)
352
353         ######## Remove stale records
354
355         # special records must be preserved
356         system_hrns = [interface_hrn, root_auth, interface_hrn + '.slicemanager']
357         for record in all_records: 
358             if record.hrn in system_hrns: 
359                 record.stale=False
360             if record.peer_authority:
361                 record.stale=False
362
363         for record in all_records:
364             try:
365                 stale = record.stale
366             except:     
367                 stale = True
368                 self.logger.warning("stale not found with %s"%record)
369             if stale:
370                 self.logger.info("Importer: deleting stale record: %s" % record)
371                 dbsession.delete(record)
372                 dbsession.commit()
373
374     ############################################################################ 
375     # Testbed specific functions
376
377     # OBJECTS
378
379     def get_authorities(self):
380         raise Exception, "Not implemented"
381
382     def get_resources(self):
383         raise Exception, "Not implemented"
384
385     def get_users(self):
386         raise Exception, "Not implemented"
387
388     def get_slices(self):
389         raise Exception, "Not implemented"
390
391     # NAMING
392     
393     def get_auth_naming(self, site, interface_hrn):
394         raise Exception, "Not implemented"
395
396     def get_resource_naming(self, site, node):
397         raise Exception, "Not implemented"
398
399     def get_user_naming(self, site, user):
400         raise Exception, "Not implemented"
401
402     def get_slice_naming(self, site, slice):
403         raise Exception, "Not implemented"
404
405 if __name__ == "__main__":
406        from sfa.util.sfalogging import logger
407        importer = Importer("mytestbed", logger)
408        importer.run(None)