changes to the DB are committed at once, rather than once at the end of the job
[plcapi.git] / PLC / Methods / RefreshPeer.py
1 #
2 # Thierry Parmentelat - INRIA
3
4 # $Id$
5
6 import time
7
8 from PLC.Debug import log
9 from PLC.Faults import *
10 from PLC.Method import Method
11 from PLC.Parameter import Parameter, Mixed
12 from PLC.Auth import Auth
13
14 from PLC.Peers import Peer, Peers
15 from PLC.Sites import Site, Sites
16 from PLC.Persons import Person, Persons
17 from PLC.KeyTypes import KeyType, KeyTypes
18 from PLC.Keys import Key, Keys
19 from PLC.BootStates import BootState, BootStates
20 from PLC.Nodes import Node, Nodes
21 from PLC.SliceInstantiations import SliceInstantiations
22 from PLC.Slices import Slice, Slices
23
24 verbose=False
25
26 # initial version was doing only one final commit
27 # * set commit_mode to False to get that behaviour
28 # * set comit_mode to True to get everything synced at once
29 commit_mode=True
30
31 def message (to_print=None,verbose_only=False):
32     if verbose_only and not verbose:
33         return
34     print >> log, time.strftime("%m-%d-%H-%M-%S:"),
35     if to_print:
36         print >>log, to_print
37
38 def message_verbose(to_print=None):
39     message(to_print,verbose_only=True)
40
41 class RefreshPeer(Method):
42     """
43     Fetches site, node, slice, person and key data from the specified peer
44     and caches it locally; also deletes stale entries.
45     Upon successful completion, returns a dict reporting various timers.
46     Faults otherwise.
47     """
48
49     roles = ['admin']
50
51     accepts = [
52         Auth(),
53         Mixed(Peer.fields['peer_id'],
54               Peer.fields['peername']),
55         ]
56
57     returns = Parameter(int, "1 if successful")
58
59     def call(self, auth, peer_id_or_peername):
60         # Get peer
61         peers = Peers(self.api, [peer_id_or_peername])
62         if not peers:
63             raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
64         peer = peers[0]
65         peer_id = peer['peer_id']
66
67         # Connect to peer API
68         peer.connect()
69
70         timers = {}
71
72         # Get peer data
73         start = time.time()
74         message('RefreshPeer starting up (commit_mode=%r)'%commit_mode)
75         message('Issuing GetPeerData')
76         peer_tables = peer.GetPeerData()
77         timers['transport'] = time.time() - start - peer_tables['db_time']
78         timers['peer_db'] = peer_tables['db_time']
79         message_verbose('GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport']))
80
81         def sync(objects, peer_objects, classobj):
82             """
83             Synchronizes two dictionaries of objects. objects should
84             be a dictionary of local objects keyed on their foreign
85             identifiers. peer_objects should be a dictionary of
86             foreign objects keyed on their local (i.e., foreign to us)
87             identifiers. Returns a final dictionary of local objects
88             keyed on their foreign identifiers.
89             """
90
91             classname=classobj(self.api).__class__.__name__
92             message_verbose('Entering sync on %s'%classname)
93
94             synced = {}
95
96             # Delete stale objects
97             for peer_object_id, object in objects.iteritems():
98                 if peer_object_id not in peer_objects:
99                     object.delete(commit = commit_mode)
100                     message("%s %s %s deleted"%(peer['peername'],classname, object[object.primary_key]))
101
102             total = len(peer_objects)
103             count=1
104             # set this to something realistic to trace down a given object(s)
105             trace_type="Node"
106             trace_ids=[]
107             def trace (message):
108                 if classname == trace_type and peer_object_id in trace_ids:
109                     message_verbose('TRACE>>'+message)
110                 
111             # Add/update new/existing objects
112             for peer_object_id, peer_object in peer_objects.iteritems():
113                 message_verbose ('DBG %s peer_object_id=%d (%d/%d)'%(classname,peer_object_id,count,total))
114                 count += 1
115                 if classname == 'Node':
116                     message_verbose ('DBG>> hostname=%s'%peer_object['hostname'])
117                 elif classname == "Slice":
118                     message_verbose ('DBG>> slicename=%s'%peer_object['name'])
119                 if peer_object_id in objects:
120                     # Update existing object
121                     object = objects[peer_object_id]
122
123                     # Replace foreign identifier with existing local
124                     # identifier temporarily for the purposes of
125                     # comparison.
126                     peer_object[object.primary_key] = object[object.primary_key]
127
128                     # Must use __eq__() instead of == since
129                     # peer_object may be a raw dict instead of a Peer
130                     # object.
131                     trace ("in objects : comparing")
132                     if not object.__eq__(peer_object):
133                         # Only update intrinsic fields
134                         trace ("updating")
135                         object.update(object.db_fields(peer_object))
136                         trace ("updated")
137                         sync = True
138                         dbg = "changed"
139                     else:
140                         trace ("intact")
141                         sync = False
142                         dbg = None
143
144                     # Restore foreign identifier
145                     peer_object[object.primary_key] = peer_object_id
146                 else:
147                     trace ("not in objects -- creating")
148                     # Add new object
149                     object = classobj(self.api, peer_object)
150                     trace ("created")
151                     # Replace foreign identifier with new local identifier
152                     del object[object.primary_key]
153                     trace ("forced clean id")
154                     sync = True
155                     dbg = "added"
156
157                 if sync:
158                     message_verbose("DBG>> syncing %s %d - commit_mode=%r"%(classname,peer_object_id,commit_mode))
159                     try:
160                         object.sync(commit = commit_mode)
161                     except PLCInvalidArgument, err:
162                         # Skip if validation fails
163                         # XXX Log an event instead of printing to logfile
164                         message("Warning: %s Skipping invalid %s %r : %r"%(\
165                                 peer['peername'], classname, peer_object, err))
166                         continue
167
168                 synced[peer_object_id] = object
169
170                 if dbg:
171                     message("%s: %s %d %s"%(peer['peername'], classname, object[object.primary_key], dbg))
172
173             message_verbose("Exiting sync on %s"%classname)
174
175             return synced
176
177         #
178         # Synchronize foreign sites
179         #
180
181         start = time.time()
182
183         message('Dealing with Sites')
184
185         # Compare only the columns returned by the GetPeerData() call
186         if peer_tables['Sites']:
187             columns = peer_tables['Sites'][0].keys()
188         else:
189             columns = None
190
191         # Keyed on foreign site_id
192         old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
193         sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
194
195         # Synchronize new set (still keyed on foreign site_id)
196         peer_sites = sync(old_peer_sites, sites_at_peer, Site)
197
198         for peer_site_id, site in peer_sites.iteritems():
199             # Bind any newly cached sites to peer
200             if peer_site_id not in old_peer_sites:
201                 peer.add_site(site, peer_site_id, commit = commit_mode)
202                 site['peer_id'] = peer_id
203                 site['peer_site_id'] = peer_site_id
204
205         timers['site'] = time.time() - start
206
207         #
208         # XXX Synchronize foreign key types
209         #
210
211         message('Dealing with Keys')
212
213         key_types = KeyTypes(self.api).dict()
214
215         #
216         # Synchronize foreign keys
217         #
218
219         start = time.time()
220
221         # Compare only the columns returned by the GetPeerData() call
222         if peer_tables['Keys']:
223             columns = peer_tables['Keys'][0].keys()
224         else:
225             columns = None
226
227         # Keyed on foreign key_id
228         old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
229         keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
230
231         # Fix up key_type references
232         for peer_key_id, key in keys_at_peer.items():
233             if key['key_type'] not in key_types:
234                 # XXX Log an event instead of printing to logfile
235                 message("Warning: Skipping invalid %s key %r" % ( peer['peername'], key))
236                 del keys_at_peer[peer_key_id]
237                 continue
238
239         # Synchronize new set (still keyed on foreign key_id)
240         peer_keys = sync(old_peer_keys, keys_at_peer, Key)
241         for peer_key_id, key in peer_keys.iteritems():
242             # Bind any newly cached keys to peer
243             if peer_key_id not in old_peer_keys:
244                 peer.add_key(key, peer_key_id, commit = commit_mode)
245                 key['peer_id'] = peer_id
246                 key['peer_key_id'] = peer_key_id
247
248         timers['keys'] = time.time() - start
249
250         #
251         # Synchronize foreign users
252         #
253
254         start = time.time()
255
256         message('Dealing with Persons')
257
258         # Compare only the columns returned by the GetPeerData() call
259         if peer_tables['Persons']:
260             columns = peer_tables['Persons'][0].keys()
261         else:
262             columns = None
263
264         # Keyed on foreign person_id
265         old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
266
267         # artificially attach the persons returned by GetPeerData to the new peer 
268         # this is because validate_email needs peer_id to be correct when checking for duplicates 
269         for person in peer_tables['Persons']: 
270             person['peer_id']=peer_id
271         persons_at_peer = dict([(peer_person['person_id'], peer_person) \
272                                 for peer_person in peer_tables['Persons']])
273
274         # XXX Do we care about membership in foreign site(s)?
275
276         # Synchronize new set (still keyed on foreign person_id)
277         peer_persons = sync(old_peer_persons, persons_at_peer, Person)
278
279         # transcoder : retrieve a local key_id from a peer_key_id
280         key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
281                                   for peer_key_id,key in peer_keys.iteritems()])
282
283         for peer_person_id, person in peer_persons.iteritems():
284             # Bind any newly cached users to peer
285             if peer_person_id not in old_peer_persons:
286                 peer.add_person(person, peer_person_id, commit = commit_mode)
287                 person['peer_id'] = peer_id
288                 person['peer_person_id'] = peer_person_id
289                 person['key_ids'] = []
290
291             # User as viewed by peer
292             peer_person = persons_at_peer[peer_person_id]
293             
294             # Foreign keys currently belonging to the user
295             old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
296                                   if key_transcoder[key_id] in peer_keys]
297
298             # Foreign keys that should belong to the user
299             # this is basically peer_person['key_ids'], we just check it makes sense 
300             # (e.g. we might have failed importing it)
301             person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
302
303             # Remove stale keys from user
304             for key_id in (set(old_person_key_ids) - set(person_key_ids)):
305                 person.remove_key(peer_keys[key_id], commit = commit_mode)
306                 message ("%s Key %d removed from person %s"%(peer['peername'], key_id, person['email']))
307
308             # Add new keys to user
309             for key_id in (set(person_key_ids) - set(old_person_key_ids)):
310                 person.add_key(peer_keys[key_id], commit = commit_mode)
311                 message ("%s Key %d added into person %s"%(peer['peername'],key_id, person['email']))
312
313         timers['persons'] = time.time() - start
314
315         #
316         # XXX Synchronize foreign boot states
317         #
318
319         boot_states = BootStates(self.api).dict()
320
321         #
322         # Synchronize foreign nodes
323         #
324
325         start = time.time()
326
327         message('Dealing with Nodes (1)')
328
329         # Compare only the columns returned by the GetPeerData() call
330         if peer_tables['Nodes']:
331             columns = peer_tables['Nodes'][0].keys()
332         else:
333             columns = None
334
335         # Keyed on foreign node_id
336         old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
337         nodes_at_peer = dict([(node['node_id'], node) \
338                               for node in peer_tables['Nodes']])
339
340         # Fix up site_id and boot_states references
341         for peer_node_id, node in nodes_at_peer.items():
342             errors = []
343             if node['site_id'] not in peer_sites:
344                 errors.append("invalid site %d" % node['site_id'])
345             if node['boot_state'] not in boot_states:
346                 errors.append("invalid boot state %s" % node['boot_state'])
347             if errors:
348                 # XXX Log an event instead of printing to logfile
349                 message ("Warning: Skipping invalid %s node %r : " % (peer['peername'], node)\
350                              + ", ".join(errors))
351                 del nodes_at_peer[peer_node_id]
352                 continue
353             else:
354                 node['site_id'] = peer_sites[node['site_id']]['site_id']
355
356         # Synchronize new set
357         peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
358
359         for peer_node_id, node in peer_nodes.iteritems():
360             # Bind any newly cached foreign nodes to peer
361             if peer_node_id not in old_peer_nodes:
362                 peer.add_node(node, peer_node_id, commit = commit_mode)
363                 node['peer_id'] = peer_id
364                 node['peer_node_id'] = peer_node_id
365
366         timers['nodes'] = time.time() - start
367
368         #
369         # Synchronize local nodes
370         #
371
372         start = time.time()
373         message('Dealing with Nodes (2)')
374
375         # Keyed on local node_id
376         local_nodes = Nodes(self.api).dict()
377
378         for node in peer_tables['PeerNodes']:
379             # Foreign identifier for our node as maintained by peer
380             peer_node_id = node['node_id']
381             # Local identifier for our node as cached by peer
382             node_id = node['peer_node_id']
383             if node_id in local_nodes:
384                 # Still a valid local node, add it to the synchronized
385                 # set of local node objects keyed on foreign node_id.
386                 peer_nodes[peer_node_id] = local_nodes[node_id]
387
388         timers['local_nodes'] = time.time() - start
389
390         #
391         # XXX Synchronize foreign slice instantiation states
392         #
393
394         slice_instantiations = SliceInstantiations(self.api).dict()
395
396         #
397         # Synchronize foreign slices
398         #
399
400         start = time.time()
401
402         message('Dealing with Slices (1)')
403
404         # Compare only the columns returned by the GetPeerData() call
405         if peer_tables['Slices']:
406             columns = peer_tables['Slices'][0].keys()
407         else:
408             columns = None
409
410         # Keyed on foreign slice_id
411         old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
412         slices_at_peer = dict([(slice['slice_id'], slice) \
413                                for slice in peer_tables['Slices']])
414
415         # Fix up site_id, instantiation, and creator_person_id references
416         for peer_slice_id, slice in slices_at_peer.items():
417             errors = []
418             if slice['site_id'] not in peer_sites:
419                 errors.append("invalid site %d" % slice['site_id'])
420             if slice['instantiation'] not in slice_instantiations:
421                 errors.append("invalid instantiation %s" % slice['instantiation'])
422             if slice['creator_person_id'] not in peer_persons:
423                 # Just NULL it out
424                 slice['creator_person_id'] = None
425             else:
426                 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
427             if errors:
428                 message("Warning: Skipping invalid %s slice %r : " % (peer['peername'], slice) \
429                             + ", ".join(errors))
430                 del slices_at_peer[peer_slice_id]
431                 continue
432             else:
433                 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
434
435         # Synchronize new set
436         peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
437
438         message('Dealing with Slices (2)')
439         # transcoder : retrieve a local node_id from a peer_node_id
440         node_transcoder = dict ( [ (node['node_id'],peer_node_id) \
441                                    for peer_node_id,node in peer_nodes.iteritems()])
442         person_transcoder = dict ( [ (person['person_id'],peer_person_id) \
443                                      for peer_person_id,person in peer_persons.iteritems()])
444
445         for peer_slice_id, slice in peer_slices.iteritems():
446             # Bind any newly cached foreign slices to peer
447             if peer_slice_id not in old_peer_slices:
448                 peer.add_slice(slice, peer_slice_id, commit = commit_mode)
449                 slice['peer_id'] = peer_id
450                 slice['peer_slice_id'] = peer_slice_id
451                 slice['node_ids'] = []
452                 slice['person_ids'] = []
453
454             # Slice as viewed by peer
455             peer_slice = slices_at_peer[peer_slice_id]
456
457             # Nodes that are currently part of the slice
458             old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
459                                    if node_transcoder[node_id] in peer_nodes]
460
461             # Nodes that should be part of the slice
462             slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
463
464             # Remove stale nodes from slice
465             for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
466                 slice.remove_node(peer_nodes[node_id], commit = commit_mode)
467                 message ("%s node %s removed from slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
468
469             # Add new nodes to slice
470             for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
471                 slice.add_node(peer_nodes[node_id], commit = commit_mode)
472                 message ("%s node %s added into slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
473
474             # N.B.: Local nodes that may have been added to the slice
475             # by hand, are removed. In other words, don't do this.
476
477             # Foreign users that are currently part of the slice
478             #old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
479             #                if person_transcoder[person_id] in peer_persons]
480             # An issue occurred with a user who registered on both sites (same email)
481             # So the remote person could not get cached locally
482             # The one-line map/filter style is nicer but ineffective here
483             old_slice_person_ids = []
484             for person_id in slice['person_ids']:
485                 if not person_transcoder.has_key(person_id):
486                     message ('WARNING : person_id %d in %s not transcodable (1) - skipped'%(person_id,slice['name']))
487                 elif person_transcoder[person_id] not in peer_persons:
488                     message('WARNING : person_id %d in %s not transcodable (2) - skipped'%(person_id,slice['name']))
489                 else:
490                     old_slice_person_ids += [person_transcoder[person_id]]
491
492             # Foreign users that should be part of the slice
493             slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
494
495             # Remove stale users from slice
496             for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
497                 slice.remove_person(peer_persons[person_id], commit = commit_mode)
498                 message ("%s user %s removed from slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
499
500             # Add new users to slice
501             for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
502                 slice.add_person(peer_persons[person_id], commit = commit_mode)
503                 message ("%s user %s added into slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
504
505             # N.B.: Local users that may have been added to the slice
506             # by hand, are not touched.
507
508         timers['slices'] = time.time() - start
509
510         # Update peer itself and commit
511         peer.sync(commit = True)
512
513         return timers