guard RefreshPeer():sync() not to insert the same record twice.
[plcapi.git] / PLC / Methods / RefreshPeer.py
1 #
2 # Thierry Parmentelat - INRIA
3
4 # $Id$
5
6 import time
7
8 from PLC.Debug import log
9 from PLC.Faults import *
10 from PLC.Method import Method
11 from PLC.Parameter import Parameter, Mixed
12 from PLC.Auth import Auth
13
14 from PLC.Peers import Peer, Peers
15 from PLC.Sites import Site, Sites
16 from PLC.Persons import Person, Persons
17 from PLC.KeyTypes import KeyType, KeyTypes
18 from PLC.Keys import Key, Keys
19 from PLC.BootStates import BootState, BootStates
20 from PLC.Nodes import Node, Nodes
21 from PLC.SliceInstantiations import SliceInstantiations
22 from PLC.Slices import Slice, Slices
23
24 verbose=False
25
26 # initial version was doing only one final commit
27 # * set commit_mode to False to get that behaviour
28 # * set comit_mode to True to get everything synced at once
29 commit_mode=True
30
31 def message (to_print=None,verbose_only=False):
32     if verbose_only and not verbose:
33         return
34     print >> log, time.strftime("%m-%d-%H-%M-%S:"),
35     if to_print:
36         print >>log, to_print
37
38 def message_verbose(to_print=None):
39     message(to_print,verbose_only=True)
40
41 class RefreshPeer(Method):
42     """
43     Fetches site, node, slice, person and key data from the specified peer
44     and caches it locally; also deletes stale entries.
45     Upon successful completion, returns a dict reporting various timers.
46     Faults otherwise.
47     """
48
49     roles = ['admin']
50
51     accepts = [
52         Auth(),
53         Mixed(Peer.fields['peer_id'],
54               Peer.fields['peername']),
55         ]
56
57     returns = Parameter(int, "1 if successful")
58
59     def call(self, auth, peer_id_or_peername):
60         # Get peer
61         peers = Peers(self.api, [peer_id_or_peername])
62         if not peers:
63             raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
64         peer = peers[0]
65         peer_id = peer['peer_id']
66
67         # Connect to peer API
68         peer.connect()
69
70         timers = {}
71
72         # Get peer data
73         start = time.time()
74         message('RefreshPeer starting up (commit_mode=%r)'%commit_mode)
75         message('Issuing GetPeerData')
76         peer_tables = peer.GetPeerData()
77         # for smooth federation with 4.2 - ignore fields that are useless anyway, and rewrite boot_state
78         boot_state_rewrite={'dbg':'safeboot','diag':'safeboot','disable':'disabled',
79                             'inst':'reinstall','rins':'reinstall','new':'reinstall','rcnf':'reinstall'}
80         for node in peer_tables['Nodes']:
81             for key in ['nodenetwork_ids','dummybox_id']:
82                 if key in node:
83                     del node[key]
84             if node['boot_state'] in boot_state_rewrite: node['boot_state']=boot_state_rewrite[node['boot_state']]
85         for slice in peer_tables['Slices']:
86             for key in ['slice_attribute_ids']:
87                 if key in slice:
88                     del slice[key]
89         timers['transport'] = time.time() - start - peer_tables['db_time']
90         timers['peer_db'] = peer_tables['db_time']
91         message_verbose('GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport']))
92
93         def sync(objects, peer_objects, classobj):
94             """
95             Synchronizes two dictionaries of objects. objects should
96             be a dictionary of local objects keyed on their foreign
97             identifiers. peer_objects should be a dictionary of
98             foreign objects keyed on their local (i.e., foreign to us)
99             identifiers. Returns a final dictionary of local objects
100             keyed on their foreign identifiers.
101             """
102
103             classname=classobj(self.api).__class__.__name__
104             message_verbose('Entering sync on %s'%classname)
105
106             synced = {}
107
108             # Delete stale objects
109             for peer_object_id, object in objects.iteritems():
110                 if peer_object_id not in peer_objects:
111                     object.delete(commit = commit_mode)
112                     message("%s %s %s deleted"%(peer['peername'],classname, object[object.primary_key]))
113
114             total = len(peer_objects)
115             count=1
116             # set this to something realistic to trace down a given object(s)
117             trace_type="Node"
118             trace_ids=[]
119             def trace (message):
120                 if classname == trace_type and peer_object_id in trace_ids:
121                     message_verbose('TRACE>>'+message)
122                 
123             # Add/update new/existing objects
124             for peer_object_id, peer_object in peer_objects.iteritems():
125                 message_verbose ('DBG %s peer_object_id=%d (%d/%d)'%(classname,peer_object_id,count,total))
126                 count += 1
127                 if peer_object_id in synced:
128                     message("Warning: %s Skipping already added %s: %r"%(
129                             peer['peername'], classname, peer_object))
130                     continue
131                 if classname == 'Node':
132                     message_verbose ('DBG>> hostname=%s'%peer_object['hostname'])
133                 elif classname == "Slice":
134                     message_verbose ('DBG>> slicename=%s'%peer_object['name'])
135                 if peer_object_id in objects:
136                     # Update existing object
137                     object = objects[peer_object_id]
138
139                     # Replace foreign identifier with existing local
140                     # identifier temporarily for the purposes of
141                     # comparison.
142                     peer_object[object.primary_key] = object[object.primary_key]
143
144                     # Must use __eq__() instead of == since
145                     # peer_object may be a raw dict instead of a Peer
146                     # object.
147                     trace ("in objects : comparing")
148                     if not object.__eq__(peer_object):
149                         # Only update intrinsic fields
150                         trace ("updating")
151                         object.update(object.db_fields(peer_object))
152                         trace ("updated")
153                         sync = True
154                         dbg = "changed"
155                     else:
156                         trace ("intact")
157                         sync = False
158                         dbg = None
159
160                     # Restore foreign identifier
161                     peer_object[object.primary_key] = peer_object_id
162                 else:
163                     trace ("not in objects -- creating")
164                     # Add new object
165                     object = classobj(self.api, peer_object)
166                     trace ("created")
167                     # Replace foreign identifier with new local identifier
168                     del object[object.primary_key]
169                     trace ("forced clean id")
170                     sync = True
171                     dbg = "added"
172
173                 if sync:
174                     message_verbose("DBG>> syncing %s %d - commit_mode=%r"%(classname,peer_object_id,commit_mode))
175                     try:
176                         object.sync(commit = commit_mode)
177                     except PLCInvalidArgument, err:
178                         # Skip if validation fails
179                         # XXX Log an event instead of printing to logfile
180                         message("Warning: %s Skipping invalid %s %r : %r"%(\
181                                 peer['peername'], classname, peer_object, err))
182                         continue
183
184                 synced[peer_object_id] = object
185
186                 if dbg:
187                     message("%s: %s %d %s"%(peer['peername'], classname, object[object.primary_key], dbg))
188
189             message_verbose("Exiting sync on %s"%classname)
190
191             return synced
192
193         #
194         # Synchronize foreign sites
195         #
196
197         start = time.time()
198
199         message('Dealing with Sites')
200
201         # Compare only the columns returned by the GetPeerData() call
202         if peer_tables['Sites']:
203             columns = peer_tables['Sites'][0].keys()
204         else:
205             columns = None
206
207         # Keyed on foreign site_id
208         old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
209         sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
210
211         # Synchronize new set (still keyed on foreign site_id)
212         peer_sites = sync(old_peer_sites, sites_at_peer, Site)
213
214         for peer_site_id, site in peer_sites.iteritems():
215             # Bind any newly cached sites to peer
216             if peer_site_id not in old_peer_sites:
217                 peer.add_site(site, peer_site_id, commit = commit_mode)
218                 site['peer_id'] = peer_id
219                 site['peer_site_id'] = peer_site_id
220
221         timers['site'] = time.time() - start
222
223         #
224         # XXX Synchronize foreign key types
225         #
226
227         message('Dealing with Keys')
228
229         key_types = KeyTypes(self.api).dict()
230
231         #
232         # Synchronize foreign keys
233         #
234
235         start = time.time()
236
237         # Compare only the columns returned by the GetPeerData() call
238         if peer_tables['Keys']:
239             columns = peer_tables['Keys'][0].keys()
240         else:
241             columns = None
242
243         # Keyed on foreign key_id
244         old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
245         keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
246
247         # Fix up key_type references
248         for peer_key_id, key in keys_at_peer.items():
249             if key['key_type'] not in key_types:
250                 # XXX Log an event instead of printing to logfile
251                 message("Warning: Skipping invalid %s key %r" % ( peer['peername'], key))
252                 del keys_at_peer[peer_key_id]
253                 continue
254
255         # Synchronize new set (still keyed on foreign key_id)
256         peer_keys = sync(old_peer_keys, keys_at_peer, Key)
257         for peer_key_id, key in peer_keys.iteritems():
258             # Bind any newly cached keys to peer
259             if peer_key_id not in old_peer_keys:
260                 peer.add_key(key, peer_key_id, commit = commit_mode)
261                 key['peer_id'] = peer_id
262                 key['peer_key_id'] = peer_key_id
263
264         timers['keys'] = time.time() - start
265
266         #
267         # Synchronize foreign users
268         #
269
270         start = time.time()
271
272         message('Dealing with Persons')
273
274         # Compare only the columns returned by the GetPeerData() call
275         if peer_tables['Persons']:
276             columns = peer_tables['Persons'][0].keys()
277         else:
278             columns = None
279
280         # Keyed on foreign person_id
281         old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
282
283         # artificially attach the persons returned by GetPeerData to the new peer 
284         # this is because validate_email needs peer_id to be correct when checking for duplicates 
285         for person in peer_tables['Persons']: 
286             person['peer_id']=peer_id
287         persons_at_peer = dict([(peer_person['person_id'], peer_person) \
288                                 for peer_person in peer_tables['Persons']])
289
290         # XXX Do we care about membership in foreign site(s)?
291
292         # Synchronize new set (still keyed on foreign person_id)
293         peer_persons = sync(old_peer_persons, persons_at_peer, Person)
294
295         # transcoder : retrieve a local key_id from a peer_key_id
296         key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
297                                   for peer_key_id,key in peer_keys.iteritems()])
298
299         for peer_person_id, person in peer_persons.iteritems():
300             # Bind any newly cached users to peer
301             if peer_person_id not in old_peer_persons:
302                 peer.add_person(person, peer_person_id, commit = commit_mode)
303                 person['peer_id'] = peer_id
304                 person['peer_person_id'] = peer_person_id
305                 person['key_ids'] = []
306
307             # User as viewed by peer
308             peer_person = persons_at_peer[peer_person_id]
309             
310             # Foreign keys currently belonging to the user
311             old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
312                                   if key_transcoder[key_id] in peer_keys]
313
314             # Foreign keys that should belong to the user
315             # this is basically peer_person['key_ids'], we just check it makes sense 
316             # (e.g. we might have failed importing it)
317             person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
318
319             # Remove stale keys from user
320             for key_id in (set(old_person_key_ids) - set(person_key_ids)):
321                 person.remove_key(peer_keys[key_id], commit = commit_mode)
322                 message ("%s Key %d removed from person %s"%(peer['peername'], key_id, person['email']))
323
324             # Add new keys to user
325             for key_id in (set(person_key_ids) - set(old_person_key_ids)):
326                 person.add_key(peer_keys[key_id], commit = commit_mode)
327                 message ("%s Key %d added into person %s"%(peer['peername'],key_id, person['email']))
328
329         timers['persons'] = time.time() - start
330
331         #
332         # XXX Synchronize foreign boot states
333         #
334
335         boot_states = BootStates(self.api).dict()
336
337         #
338         # Synchronize foreign nodes
339         #
340
341         start = time.time()
342
343         message('Dealing with Nodes (1)')
344
345         # Compare only the columns returned by the GetPeerData() call
346         if peer_tables['Nodes']:
347             columns = peer_tables['Nodes'][0].keys()
348         else:
349             # smooth federation with a 4.2 peer - ignore these fields that are useless anyway
350             columns = Node.fields
351             if 'interface_ids' in columns: columns.remove('interface_ids')
352             if 'dummybox_id' in columns: columns.remove('dummybox_id')
353
354         # Keyed on foreign node_id
355         old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
356         nodes_at_peer = dict([(node['node_id'], node) \
357                               for node in peer_tables['Nodes']])
358
359         # Fix up site_id and boot_states references
360         for peer_node_id, node in nodes_at_peer.items():
361             errors = []
362             if node['site_id'] not in peer_sites:
363                 errors.append("invalid site %d" % node['site_id'])
364             if node['boot_state'] not in boot_states:
365                 errors.append("invalid boot state %s" % node['boot_state'])
366             if errors:
367                 # XXX Log an event instead of printing to logfile
368                 message ("Warning: Skipping invalid %s node %r : " % (peer['peername'], node)\
369                              + ", ".join(errors))
370                 del nodes_at_peer[peer_node_id]
371                 continue
372             else:
373                 node['site_id'] = peer_sites[node['site_id']]['site_id']
374
375         # Synchronize new set
376         peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
377
378         for peer_node_id, node in peer_nodes.iteritems():
379             # Bind any newly cached foreign nodes to peer
380             if peer_node_id not in old_peer_nodes:
381                 peer.add_node(node, peer_node_id, commit = commit_mode)
382                 node['peer_id'] = peer_id
383                 node['peer_node_id'] = peer_node_id
384
385         timers['nodes'] = time.time() - start
386
387         #
388         # Synchronize local nodes
389         #
390
391         start = time.time()
392         message('Dealing with Nodes (2)')
393
394         # Keyed on local node_id
395         local_nodes = Nodes(self.api).dict()
396
397         for node in peer_tables['PeerNodes']:
398             # Foreign identifier for our node as maintained by peer
399             peer_node_id = node['node_id']
400             # Local identifier for our node as cached by peer
401             node_id = node['peer_node_id']
402             if node_id in local_nodes:
403                 # Still a valid local node, add it to the synchronized
404                 # set of local node objects keyed on foreign node_id.
405                 peer_nodes[peer_node_id] = local_nodes[node_id]
406
407         timers['local_nodes'] = time.time() - start
408
409         #
410         # XXX Synchronize foreign slice instantiation states
411         #
412
413         slice_instantiations = SliceInstantiations(self.api).dict()
414
415         #
416         # Synchronize foreign slices
417         #
418
419         start = time.time()
420
421         message('Dealing with Slices (1)')
422
423         # Compare only the columns returned by the GetPeerData() call
424         if peer_tables['Slices']:
425             columns = peer_tables['Slices'][0].keys()
426         else:
427             columns = None
428
429         # Keyed on foreign slice_id
430         old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
431         slices_at_peer = dict([(slice['slice_id'], slice) \
432                                for slice in peer_tables['Slices']])
433
434         # Fix up site_id, instantiation, and creator_person_id references
435         for peer_slice_id, slice in slices_at_peer.items():
436             errors = []
437             if slice['site_id'] not in peer_sites:
438                 errors.append("invalid site %d" % slice['site_id'])
439             if slice['instantiation'] not in slice_instantiations:
440                 errors.append("invalid instantiation %s" % slice['instantiation'])
441             if slice['creator_person_id'] not in peer_persons:
442                 # Just NULL it out
443                 slice['creator_person_id'] = None
444             else:
445                 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
446             if errors:
447                 message("Warning: Skipping invalid %s slice %r : " % (peer['peername'], slice) \
448                             + ", ".join(errors))
449                 del slices_at_peer[peer_slice_id]
450                 continue
451             else:
452                 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
453
454         # Synchronize new set
455         peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
456
457         message('Dealing with Slices (2)')
458         # transcoder : retrieve a local node_id from a peer_node_id
459         node_transcoder = dict ( [ (node['node_id'],peer_node_id) \
460                                    for peer_node_id,node in peer_nodes.iteritems()])
461         person_transcoder = dict ( [ (person['person_id'],peer_person_id) \
462                                      for peer_person_id,person in peer_persons.iteritems()])
463
464         for peer_slice_id, slice in peer_slices.iteritems():
465             # Bind any newly cached foreign slices to peer
466             if peer_slice_id not in old_peer_slices:
467                 peer.add_slice(slice, peer_slice_id, commit = commit_mode)
468                 slice['peer_id'] = peer_id
469                 slice['peer_slice_id'] = peer_slice_id
470                 slice['node_ids'] = []
471                 slice['person_ids'] = []
472
473             # Slice as viewed by peer
474             peer_slice = slices_at_peer[peer_slice_id]
475
476             # Nodes that are currently part of the slice
477             old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
478                                    if node_id in node_transcoder and node_transcoder[node_id] in peer_nodes]
479
480             # Nodes that should be part of the slice
481             slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
482
483             # Remove stale nodes from slice
484             for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
485                 slice.remove_node(peer_nodes[node_id], commit = commit_mode)
486                 message ("%s node %s removed from slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
487
488             # Add new nodes to slice
489             for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
490                 slice.add_node(peer_nodes[node_id], commit = commit_mode)
491                 message ("%s node %s added into slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
492
493             # N.B.: Local nodes that may have been added to the slice
494             # by hand, are removed. In other words, don't do this.
495
496             # Foreign users that are currently part of the slice
497             #old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
498             #                if person_transcoder[person_id] in peer_persons]
499             # An issue occurred with a user who registered on both sites (same email)
500             # So the remote person could not get cached locally
501             # The one-line map/filter style is nicer but ineffective here
502             old_slice_person_ids = []
503             for person_id in slice['person_ids']:
504                 if not person_transcoder.has_key(person_id):
505                     message ('WARNING : person_id %d in %s not transcodable (1) - skipped'%(person_id,slice['name']))
506                 elif person_transcoder[person_id] not in peer_persons:
507                     message('WARNING : person_id %d in %s not transcodable (2) - skipped'%(person_id,slice['name']))
508                 else:
509                     old_slice_person_ids += [person_transcoder[person_id]]
510
511             # Foreign users that should be part of the slice
512             slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
513
514             # Remove stale users from slice
515             for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
516                 slice.remove_person(peer_persons[person_id], commit = commit_mode)
517                 message ("%s user %s removed from slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
518
519             # Add new users to slice
520             for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
521                 slice.add_person(peer_persons[person_id], commit = commit_mode)
522                 message ("%s user %s added into slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
523
524             # N.B.: Local users that may have been added to the slice
525             # by hand, are not touched.
526
527         timers['slices'] = time.time() - start
528
529         # Update peer itself and commit
530         peer.sync(commit = True)
531
532         return timers