2 # Thierry Parmentelat - INRIA
8 from PLC.Debug import log
9 from PLC.Faults import *
10 from PLC.Method import Method
11 from PLC.Parameter import Parameter, Mixed
12 from PLC.Auth import Auth
14 from PLC.Peers import Peer, Peers
15 from PLC.Sites import Site, Sites
16 from PLC.Persons import Person, Persons
17 from PLC.KeyTypes import KeyType, KeyTypes
18 from PLC.Keys import Key, Keys
19 from PLC.BootStates import BootState, BootStates
20 from PLC.Nodes import Node, Nodes
21 from PLC.SliceInstantiations import SliceInstantiations
22 from PLC.Slices import Slice, Slices
26 # initial version was doing only one final commit
27 # * set commit_mode to False to get that behaviour
28 # * set comit_mode to True to get everything synced at once
31 def message (to_print=None,verbose_only=False):
32 if verbose_only and not verbose:
34 print >> log, time.strftime("%m-%d-%H-%M-%S:"),
38 def message_verbose(to_print=None):
39 message(to_print,verbose_only=True)
41 class RefreshPeer(Method):
43 Fetches site, node, slice, person and key data from the specified peer
44 and caches it locally; also deletes stale entries.
45 Upon successful completion, returns a dict reporting various timers.
53 Mixed(Peer.fields['peer_id'],
54 Peer.fields['peername']),
57 returns = Parameter(int, "1 if successful")
59 def call(self, auth, peer_id_or_peername):
61 peers = Peers(self.api, [peer_id_or_peername])
63 raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
65 peer_id = peer['peer_id']
74 message('RefreshPeer starting up (commit_mode=%r)'%commit_mode)
75 message('Issuing GetPeerData')
76 peer_tables = peer.GetPeerData()
77 timers['transport'] = time.time() - start - peer_tables['db_time']
78 timers['peer_db'] = peer_tables['db_time']
79 message_verbose('GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport']))
81 def sync(objects, peer_objects, classobj):
83 Synchronizes two dictionaries of objects. objects should
84 be a dictionary of local objects keyed on their foreign
85 identifiers. peer_objects should be a dictionary of
86 foreign objects keyed on their local (i.e., foreign to us)
87 identifiers. Returns a final dictionary of local objects
88 keyed on their foreign identifiers.
91 classname=classobj(self.api).__class__.__name__
92 message_verbose('Entering sync on %s'%classname)
96 # Delete stale objects
97 for peer_object_id, object in objects.iteritems():
98 if peer_object_id not in peer_objects:
99 object.delete(commit = commit_mode)
100 message("%s %s %s deleted"%(peer['peername'],classname, object[object.primary_key]))
102 total = len(peer_objects)
104 # set this to something realistic to trace down a given object(s)
108 if classname == trace_type and peer_object_id in trace_ids:
109 message_verbose('TRACE>>'+message)
111 # Add/update new/existing objects
112 for peer_object_id, peer_object in peer_objects.iteritems():
113 message_verbose ('DBG %s peer_object_id=%d (%d/%d)'%(classname,peer_object_id,count,total))
115 if classname == 'Node':
116 message_verbose ('DBG>> hostname=%s'%peer_object['hostname'])
117 elif classname == "Slice":
118 message_verbose ('DBG>> slicename=%s'%peer_object['name'])
119 if peer_object_id in objects:
120 # Update existing object
121 object = objects[peer_object_id]
123 # Replace foreign identifier with existing local
124 # identifier temporarily for the purposes of
126 peer_object[object.primary_key] = object[object.primary_key]
128 # Must use __eq__() instead of == since
129 # peer_object may be a raw dict instead of a Peer
131 trace ("in objects : comparing")
132 if not object.__eq__(peer_object):
133 # Only update intrinsic fields
135 object.update(object.db_fields(peer_object))
144 # Restore foreign identifier
145 peer_object[object.primary_key] = peer_object_id
147 trace ("not in objects -- creating")
149 object = classobj(self.api, peer_object)
151 # Replace foreign identifier with new local identifier
152 del object[object.primary_key]
153 trace ("forced clean id")
158 message_verbose("DBG>> syncing %s %d - commit_mode=%r"%(classname,peer_object_id,commit_mode))
160 object.sync(commit = commit_mode)
161 except PLCInvalidArgument, err:
162 # Skip if validation fails
163 # XXX Log an event instead of printing to logfile
164 message("Warning: %s Skipping invalid %s %r : %r"%(\
165 peer['peername'], classname, peer_object, err))
168 synced[peer_object_id] = object
171 message("%s: %s %d %s"%(peer['peername'], classname, object[object.primary_key], dbg))
173 message_verbose("Exiting sync on %s"%classname)
178 # Synchronize foreign sites
183 message('Dealing with Sites')
185 # Compare only the columns returned by the GetPeerData() call
186 if peer_tables['Sites']:
187 columns = peer_tables['Sites'][0].keys()
191 # Keyed on foreign site_id
192 old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
193 sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
195 # Synchronize new set (still keyed on foreign site_id)
196 peer_sites = sync(old_peer_sites, sites_at_peer, Site)
198 for peer_site_id, site in peer_sites.iteritems():
199 # Bind any newly cached sites to peer
200 if peer_site_id not in old_peer_sites:
201 peer.add_site(site, peer_site_id, commit = commit_mode)
202 site['peer_id'] = peer_id
203 site['peer_site_id'] = peer_site_id
205 timers['site'] = time.time() - start
208 # XXX Synchronize foreign key types
211 message('Dealing with Keys')
213 key_types = KeyTypes(self.api).dict()
216 # Synchronize foreign keys
221 # Compare only the columns returned by the GetPeerData() call
222 if peer_tables['Keys']:
223 columns = peer_tables['Keys'][0].keys()
227 # Keyed on foreign key_id
228 old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
229 keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
231 # Fix up key_type references
232 for peer_key_id, key in keys_at_peer.items():
233 if key['key_type'] not in key_types:
234 # XXX Log an event instead of printing to logfile
235 message("Warning: Skipping invalid %s key %r" % ( peer['peername'], key))
236 del keys_at_peer[peer_key_id]
239 # Synchronize new set (still keyed on foreign key_id)
240 peer_keys = sync(old_peer_keys, keys_at_peer, Key)
241 for peer_key_id, key in peer_keys.iteritems():
242 # Bind any newly cached keys to peer
243 if peer_key_id not in old_peer_keys:
244 peer.add_key(key, peer_key_id, commit = commit_mode)
245 key['peer_id'] = peer_id
246 key['peer_key_id'] = peer_key_id
248 timers['keys'] = time.time() - start
251 # Synchronize foreign users
256 message('Dealing with Persons')
258 # Compare only the columns returned by the GetPeerData() call
259 if peer_tables['Persons']:
260 columns = peer_tables['Persons'][0].keys()
264 # Keyed on foreign person_id
265 old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
267 # artificially attach the persons returned by GetPeerData to the new peer
268 # this is because validate_email needs peer_id to be correct when checking for duplicates
269 for person in peer_tables['Persons']:
270 person['peer_id']=peer_id
271 persons_at_peer = dict([(peer_person['person_id'], peer_person) \
272 for peer_person in peer_tables['Persons']])
274 # XXX Do we care about membership in foreign site(s)?
276 # Synchronize new set (still keyed on foreign person_id)
277 peer_persons = sync(old_peer_persons, persons_at_peer, Person)
279 # transcoder : retrieve a local key_id from a peer_key_id
280 key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
281 for peer_key_id,key in peer_keys.iteritems()])
283 for peer_person_id, person in peer_persons.iteritems():
284 # Bind any newly cached users to peer
285 if peer_person_id not in old_peer_persons:
286 peer.add_person(person, peer_person_id, commit = commit_mode)
287 person['peer_id'] = peer_id
288 person['peer_person_id'] = peer_person_id
289 person['key_ids'] = []
291 # User as viewed by peer
292 peer_person = persons_at_peer[peer_person_id]
294 # Foreign keys currently belonging to the user
295 old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
296 if key_transcoder[key_id] in peer_keys]
298 # Foreign keys that should belong to the user
299 # this is basically peer_person['key_ids'], we just check it makes sense
300 # (e.g. we might have failed importing it)
301 person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
303 # Remove stale keys from user
304 for key_id in (set(old_person_key_ids) - set(person_key_ids)):
305 person.remove_key(peer_keys[key_id], commit = commit_mode)
306 message ("%s Key %d removed from person %s"%(peer['peername'], key_id, person['email']))
308 # Add new keys to user
309 for key_id in (set(person_key_ids) - set(old_person_key_ids)):
310 person.add_key(peer_keys[key_id], commit = commit_mode)
311 message ("%s Key %d added into person %s"%(peer['peername'],key_id, person['email']))
313 timers['persons'] = time.time() - start
316 # XXX Synchronize foreign boot states
319 boot_states = BootStates(self.api).dict()
322 # Synchronize foreign nodes
327 message('Dealing with Nodes (1)')
329 # Compare only the columns returned by the GetPeerData() call
330 if peer_tables['Nodes']:
331 columns = peer_tables['Nodes'][0].keys()
335 # Keyed on foreign node_id
336 old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
337 nodes_at_peer = dict([(node['node_id'], node) \
338 for node in peer_tables['Nodes']])
340 # Fix up site_id and boot_states references
341 for peer_node_id, node in nodes_at_peer.items():
343 if node['site_id'] not in peer_sites:
344 errors.append("invalid site %d" % node['site_id'])
345 if node['boot_state'] not in boot_states:
346 errors.append("invalid boot state %s" % node['boot_state'])
348 # XXX Log an event instead of printing to logfile
349 message ("Warning: Skipping invalid %s node %r : " % (peer['peername'], node)\
351 del nodes_at_peer[peer_node_id]
354 node['site_id'] = peer_sites[node['site_id']]['site_id']
356 # Synchronize new set
357 peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
359 for peer_node_id, node in peer_nodes.iteritems():
360 # Bind any newly cached foreign nodes to peer
361 if peer_node_id not in old_peer_nodes:
362 peer.add_node(node, peer_node_id, commit = commit_mode)
363 node['peer_id'] = peer_id
364 node['peer_node_id'] = peer_node_id
366 timers['nodes'] = time.time() - start
369 # Synchronize local nodes
373 message('Dealing with Nodes (2)')
375 # Keyed on local node_id
376 local_nodes = Nodes(self.api).dict()
378 for node in peer_tables['PeerNodes']:
379 # Foreign identifier for our node as maintained by peer
380 peer_node_id = node['node_id']
381 # Local identifier for our node as cached by peer
382 node_id = node['peer_node_id']
383 if node_id in local_nodes:
384 # Still a valid local node, add it to the synchronized
385 # set of local node objects keyed on foreign node_id.
386 peer_nodes[peer_node_id] = local_nodes[node_id]
388 timers['local_nodes'] = time.time() - start
391 # XXX Synchronize foreign slice instantiation states
394 slice_instantiations = SliceInstantiations(self.api).dict()
397 # Synchronize foreign slices
402 message('Dealing with Slices (1)')
404 # Compare only the columns returned by the GetPeerData() call
405 if peer_tables['Slices']:
406 columns = peer_tables['Slices'][0].keys()
410 # Keyed on foreign slice_id
411 old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
412 slices_at_peer = dict([(slice['slice_id'], slice) \
413 for slice in peer_tables['Slices']])
415 # Fix up site_id, instantiation, and creator_person_id references
416 for peer_slice_id, slice in slices_at_peer.items():
418 if slice['site_id'] not in peer_sites:
419 errors.append("invalid site %d" % slice['site_id'])
420 if slice['instantiation'] not in slice_instantiations:
421 errors.append("invalid instantiation %s" % slice['instantiation'])
422 if slice['creator_person_id'] not in peer_persons:
424 slice['creator_person_id'] = None
426 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
428 message("Warning: Skipping invalid %s slice %r : " % (peer['peername'], slice) \
430 del slices_at_peer[peer_slice_id]
433 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
435 # Synchronize new set
436 peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
438 message('Dealing with Slices (2)')
439 # transcoder : retrieve a local node_id from a peer_node_id
440 node_transcoder = dict ( [ (node['node_id'],peer_node_id) \
441 for peer_node_id,node in peer_nodes.iteritems()])
442 person_transcoder = dict ( [ (person['person_id'],peer_person_id) \
443 for peer_person_id,person in peer_persons.iteritems()])
445 for peer_slice_id, slice in peer_slices.iteritems():
446 # Bind any newly cached foreign slices to peer
447 if peer_slice_id not in old_peer_slices:
448 peer.add_slice(slice, peer_slice_id, commit = commit_mode)
449 slice['peer_id'] = peer_id
450 slice['peer_slice_id'] = peer_slice_id
451 slice['node_ids'] = []
452 slice['person_ids'] = []
454 # Slice as viewed by peer
455 peer_slice = slices_at_peer[peer_slice_id]
457 # Nodes that are currently part of the slice
458 old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
459 if node_transcoder[node_id] in peer_nodes]
461 # Nodes that should be part of the slice
462 slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
464 # Remove stale nodes from slice
465 for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
466 slice.remove_node(peer_nodes[node_id], commit = commit_mode)
467 message ("%s node %s removed from slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
469 # Add new nodes to slice
470 for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
471 slice.add_node(peer_nodes[node_id], commit = commit_mode)
472 message ("%s node %s added into slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
474 # N.B.: Local nodes that may have been added to the slice
475 # by hand, are removed. In other words, don't do this.
477 # Foreign users that are currently part of the slice
478 #old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
479 # if person_transcoder[person_id] in peer_persons]
480 # An issue occurred with a user who registered on both sites (same email)
481 # So the remote person could not get cached locally
482 # The one-line map/filter style is nicer but ineffective here
483 old_slice_person_ids = []
484 for person_id in slice['person_ids']:
485 if not person_transcoder.has_key(person_id):
486 message ('WARNING : person_id %d in %s not transcodable (1) - skipped'%(person_id,slice['name']))
487 elif person_transcoder[person_id] not in peer_persons:
488 message('WARNING : person_id %d in %s not transcodable (2) - skipped'%(person_id,slice['name']))
490 old_slice_person_ids += [person_transcoder[person_id]]
492 # Foreign users that should be part of the slice
493 slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
495 # Remove stale users from slice
496 for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
497 slice.remove_person(peer_persons[person_id], commit = commit_mode)
498 message ("%s user %s removed from slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
500 # Add new users to slice
501 for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
502 slice.add_person(peer_persons[person_id], commit = commit_mode)
503 message ("%s user %s added into slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
505 # N.B.: Local users that may have been added to the slice
506 # by hand, are not touched.
508 timers['slices'] = time.time() - start
510 # Update peer itself and commit
511 peer.sync(commit = True)