2 # Thierry Parmentelat - INRIA
8 from PLC.Debug import log
9 from PLC.Faults import *
10 from PLC.Method import Method
11 from PLC.Parameter import Parameter, Mixed
12 from PLC.Auth import Auth
14 from PLC.Peers import Peer, Peers
15 from PLC.Sites import Site, Sites
16 from PLC.Persons import Person, Persons
17 from PLC.KeyTypes import KeyType, KeyTypes
18 from PLC.Keys import Key, Keys
19 from PLC.BootStates import BootState, BootStates
20 from PLC.Nodes import Node, Nodes
21 from PLC.SliceInstantiations import SliceInstantiations
22 from PLC.Slices import Slice, Slices
26 # initial version was doing only one final commit
27 # * set commit_mode to False to get that behaviour
28 # * set comit_mode to True to get everything synced at once
31 def message (to_print=None,verbose_only=False):
32 if verbose_only and not verbose:
34 print >> log, time.strftime("%m-%d-%H-%M-%S:"),
38 def message_verbose(to_print=None):
39 message(to_print,verbose_only=True)
41 class RefreshPeer(Method):
43 Fetches site, node, slice, person and key data from the specified peer
44 and caches it locally; also deletes stale entries.
45 Upon successful completion, returns a dict reporting various timers.
53 Mixed(Peer.fields['peer_id'],
54 Peer.fields['peername']),
57 returns = Parameter(int, "1 if successful")
59 def call(self, auth, peer_id_or_peername):
61 peers = Peers(self.api, [peer_id_or_peername])
63 raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
65 peer_id = peer['peer_id']
74 message('RefreshPeer starting up (commit_mode=%r)'%commit_mode)
75 message('Issuing GetPeerData')
76 peer_tables = peer.GetPeerData()
77 # for smooth federation with 4.2 - ignore fields that are useless anyway, and rewrite boot_state
78 boot_state_rewrite={'dbg':'safeboot','diag':'safeboot','disable':'disabled',
79 'inst':'reinstall','rins':'reinstall','new':'reinstall','rcnf':'reinstall'}
80 for node in peer_tables['Nodes']:
81 for key in ['nodenetwork_ids','dummybox_id']:
84 if node['boot_state'] in boot_state_rewrite: node['boot_state']=boot_state_rewrite[node['boot_state']]
85 for slice in peer_tables['Slices']:
86 for key in ['slice_attribute_ids']:
89 timers['transport'] = time.time() - start - peer_tables['db_time']
90 timers['peer_db'] = peer_tables['db_time']
91 message_verbose('GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport']))
93 def sync(objects, peer_objects, classobj):
95 Synchronizes two dictionaries of objects. objects should
96 be a dictionary of local objects keyed on their foreign
97 identifiers. peer_objects should be a dictionary of
98 foreign objects keyed on their local (i.e., foreign to us)
99 identifiers. Returns a final dictionary of local objects
100 keyed on their foreign identifiers.
103 classname=classobj(self.api).__class__.__name__
104 message_verbose('Entering sync on %s'%classname)
108 # Delete stale objects
109 for peer_object_id, object in objects.iteritems():
110 if peer_object_id not in peer_objects:
111 object.delete(commit = commit_mode)
112 message("%s %s %s deleted"%(peer['peername'],classname, object[object.primary_key]))
114 total = len(peer_objects)
116 # set this to something realistic to trace down a given object(s)
120 if classname == trace_type and peer_object_id in trace_ids:
121 message_verbose('TRACE>>'+message)
123 # Add/update new/existing objects
124 for peer_object_id, peer_object in peer_objects.iteritems():
125 message_verbose ('DBG %s peer_object_id=%d (%d/%d)'%(classname,peer_object_id,count,total))
127 if classname == 'Node':
128 message_verbose ('DBG>> hostname=%s'%peer_object['hostname'])
129 elif classname == "Slice":
130 message_verbose ('DBG>> slicename=%s'%peer_object['name'])
131 if peer_object_id in objects:
132 # Update existing object
133 object = objects[peer_object_id]
135 # Replace foreign identifier with existing local
136 # identifier temporarily for the purposes of
138 peer_object[object.primary_key] = object[object.primary_key]
140 # Must use __eq__() instead of == since
141 # peer_object may be a raw dict instead of a Peer
143 trace ("in objects : comparing")
144 if not object.__eq__(peer_object):
145 # Only update intrinsic fields
147 object.update(object.db_fields(peer_object))
156 # Restore foreign identifier
157 peer_object[object.primary_key] = peer_object_id
159 trace ("not in objects -- creating")
161 object = classobj(self.api, peer_object)
163 # Replace foreign identifier with new local identifier
164 del object[object.primary_key]
165 trace ("forced clean id")
170 message_verbose("DBG>> syncing %s %d - commit_mode=%r"%(classname,peer_object_id,commit_mode))
172 object.sync(commit = commit_mode)
173 except PLCInvalidArgument, err:
174 # Skip if validation fails
175 # XXX Log an event instead of printing to logfile
176 message("Warning: %s Skipping invalid %s %r : %r"%(\
177 peer['peername'], classname, peer_object, err))
180 synced[peer_object_id] = object
183 message("%s: %s %d %s"%(peer['peername'], classname, object[object.primary_key], dbg))
185 message_verbose("Exiting sync on %s"%classname)
190 # Synchronize foreign sites
195 message('Dealing with Sites')
197 # Compare only the columns returned by the GetPeerData() call
198 if peer_tables['Sites']:
199 columns = peer_tables['Sites'][0].keys()
203 # Keyed on foreign site_id
204 old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
205 sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
207 # Synchronize new set (still keyed on foreign site_id)
208 peer_sites = sync(old_peer_sites, sites_at_peer, Site)
210 for peer_site_id, site in peer_sites.iteritems():
211 # Bind any newly cached sites to peer
212 if peer_site_id not in old_peer_sites:
213 peer.add_site(site, peer_site_id, commit = commit_mode)
214 site['peer_id'] = peer_id
215 site['peer_site_id'] = peer_site_id
217 timers['site'] = time.time() - start
220 # XXX Synchronize foreign key types
223 message('Dealing with Keys')
225 key_types = KeyTypes(self.api).dict()
228 # Synchronize foreign keys
233 # Compare only the columns returned by the GetPeerData() call
234 if peer_tables['Keys']:
235 columns = peer_tables['Keys'][0].keys()
239 # Keyed on foreign key_id
240 old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
241 keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
243 # Fix up key_type references
244 for peer_key_id, key in keys_at_peer.items():
245 if key['key_type'] not in key_types:
246 # XXX Log an event instead of printing to logfile
247 message("Warning: Skipping invalid %s key %r" % ( peer['peername'], key))
248 del keys_at_peer[peer_key_id]
251 # Synchronize new set (still keyed on foreign key_id)
252 peer_keys = sync(old_peer_keys, keys_at_peer, Key)
253 for peer_key_id, key in peer_keys.iteritems():
254 # Bind any newly cached keys to peer
255 if peer_key_id not in old_peer_keys:
256 peer.add_key(key, peer_key_id, commit = commit_mode)
257 key['peer_id'] = peer_id
258 key['peer_key_id'] = peer_key_id
260 timers['keys'] = time.time() - start
263 # Synchronize foreign users
268 message('Dealing with Persons')
270 # Compare only the columns returned by the GetPeerData() call
271 if peer_tables['Persons']:
272 columns = peer_tables['Persons'][0].keys()
276 # Keyed on foreign person_id
277 old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
279 # artificially attach the persons returned by GetPeerData to the new peer
280 # this is because validate_email needs peer_id to be correct when checking for duplicates
281 for person in peer_tables['Persons']:
282 person['peer_id']=peer_id
283 persons_at_peer = dict([(peer_person['person_id'], peer_person) \
284 for peer_person in peer_tables['Persons']])
286 # XXX Do we care about membership in foreign site(s)?
288 # Synchronize new set (still keyed on foreign person_id)
289 peer_persons = sync(old_peer_persons, persons_at_peer, Person)
291 # transcoder : retrieve a local key_id from a peer_key_id
292 key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
293 for peer_key_id,key in peer_keys.iteritems()])
295 for peer_person_id, person in peer_persons.iteritems():
296 # Bind any newly cached users to peer
297 if peer_person_id not in old_peer_persons:
298 peer.add_person(person, peer_person_id, commit = commit_mode)
299 person['peer_id'] = peer_id
300 person['peer_person_id'] = peer_person_id
301 person['key_ids'] = []
303 # User as viewed by peer
304 peer_person = persons_at_peer[peer_person_id]
306 # Foreign keys currently belonging to the user
307 old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
308 if key_transcoder[key_id] in peer_keys]
310 # Foreign keys that should belong to the user
311 # this is basically peer_person['key_ids'], we just check it makes sense
312 # (e.g. we might have failed importing it)
313 person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
315 # Remove stale keys from user
316 for key_id in (set(old_person_key_ids) - set(person_key_ids)):
317 person.remove_key(peer_keys[key_id], commit = commit_mode)
318 message ("%s Key %d removed from person %s"%(peer['peername'], key_id, person['email']))
320 # Add new keys to user
321 for key_id in (set(person_key_ids) - set(old_person_key_ids)):
322 person.add_key(peer_keys[key_id], commit = commit_mode)
323 message ("%s Key %d added into person %s"%(peer['peername'],key_id, person['email']))
325 timers['persons'] = time.time() - start
328 # XXX Synchronize foreign boot states
331 boot_states = BootStates(self.api).dict()
334 # Synchronize foreign nodes
339 message('Dealing with Nodes (1)')
341 # Compare only the columns returned by the GetPeerData() call
342 if peer_tables['Nodes']:
343 columns = peer_tables['Nodes'][0].keys()
345 # smooth federation with a 4.2 peer - ignore these fields that are useless anyway
346 columns = Node.fields
347 if 'interface_ids' in columns: columns.remove('interface_ids')
348 if 'dummybox_id' in columns: columns.remove('dummybox_id')
350 # Keyed on foreign node_id
351 old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
352 nodes_at_peer = dict([(node['node_id'], node) \
353 for node in peer_tables['Nodes']])
355 # Fix up site_id and boot_states references
356 for peer_node_id, node in nodes_at_peer.items():
358 if node['site_id'] not in peer_sites:
359 errors.append("invalid site %d" % node['site_id'])
360 if node['boot_state'] not in boot_states:
361 errors.append("invalid boot state %s" % node['boot_state'])
363 # XXX Log an event instead of printing to logfile
364 message ("Warning: Skipping invalid %s node %r : " % (peer['peername'], node)\
366 del nodes_at_peer[peer_node_id]
369 node['site_id'] = peer_sites[node['site_id']]['site_id']
371 # Synchronize new set
372 peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
374 for peer_node_id, node in peer_nodes.iteritems():
375 # Bind any newly cached foreign nodes to peer
376 if peer_node_id not in old_peer_nodes:
377 peer.add_node(node, peer_node_id, commit = commit_mode)
378 node['peer_id'] = peer_id
379 node['peer_node_id'] = peer_node_id
381 timers['nodes'] = time.time() - start
384 # Synchronize local nodes
388 message('Dealing with Nodes (2)')
390 # Keyed on local node_id
391 local_nodes = Nodes(self.api).dict()
393 for node in peer_tables['PeerNodes']:
394 # Foreign identifier for our node as maintained by peer
395 peer_node_id = node['node_id']
396 # Local identifier for our node as cached by peer
397 node_id = node['peer_node_id']
398 if node_id in local_nodes:
399 # Still a valid local node, add it to the synchronized
400 # set of local node objects keyed on foreign node_id.
401 peer_nodes[peer_node_id] = local_nodes[node_id]
403 timers['local_nodes'] = time.time() - start
406 # XXX Synchronize foreign slice instantiation states
409 slice_instantiations = SliceInstantiations(self.api).dict()
412 # Synchronize foreign slices
417 message('Dealing with Slices (1)')
419 # Compare only the columns returned by the GetPeerData() call
420 if peer_tables['Slices']:
421 columns = peer_tables['Slices'][0].keys()
425 # Keyed on foreign slice_id
426 old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
427 slices_at_peer = dict([(slice['slice_id'], slice) \
428 for slice in peer_tables['Slices']])
430 # Fix up site_id, instantiation, and creator_person_id references
431 for peer_slice_id, slice in slices_at_peer.items():
433 if slice['site_id'] not in peer_sites:
434 errors.append("invalid site %d" % slice['site_id'])
435 if slice['instantiation'] not in slice_instantiations:
436 errors.append("invalid instantiation %s" % slice['instantiation'])
437 if slice['creator_person_id'] not in peer_persons:
439 slice['creator_person_id'] = None
441 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
443 message("Warning: Skipping invalid %s slice %r : " % (peer['peername'], slice) \
445 del slices_at_peer[peer_slice_id]
448 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
450 # Synchronize new set
451 peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
453 message('Dealing with Slices (2)')
454 # transcoder : retrieve a local node_id from a peer_node_id
455 node_transcoder = dict ( [ (node['node_id'],peer_node_id) \
456 for peer_node_id,node in peer_nodes.iteritems()])
457 person_transcoder = dict ( [ (person['person_id'],peer_person_id) \
458 for peer_person_id,person in peer_persons.iteritems()])
460 for peer_slice_id, slice in peer_slices.iteritems():
461 # Bind any newly cached foreign slices to peer
462 if peer_slice_id not in old_peer_slices:
463 peer.add_slice(slice, peer_slice_id, commit = commit_mode)
464 slice['peer_id'] = peer_id
465 slice['peer_slice_id'] = peer_slice_id
466 slice['node_ids'] = []
467 slice['person_ids'] = []
469 # Slice as viewed by peer
470 peer_slice = slices_at_peer[peer_slice_id]
472 # Nodes that are currently part of the slice
473 old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
474 if node_transcoder[node_id] in peer_nodes]
476 # Nodes that should be part of the slice
477 slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
479 # Remove stale nodes from slice
480 for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
481 slice.remove_node(peer_nodes[node_id], commit = commit_mode)
482 message ("%s node %s removed from slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
484 # Add new nodes to slice
485 for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
486 slice.add_node(peer_nodes[node_id], commit = commit_mode)
487 message ("%s node %s added into slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
489 # N.B.: Local nodes that may have been added to the slice
490 # by hand, are removed. In other words, don't do this.
492 # Foreign users that are currently part of the slice
493 #old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
494 # if person_transcoder[person_id] in peer_persons]
495 # An issue occurred with a user who registered on both sites (same email)
496 # So the remote person could not get cached locally
497 # The one-line map/filter style is nicer but ineffective here
498 old_slice_person_ids = []
499 for person_id in slice['person_ids']:
500 if not person_transcoder.has_key(person_id):
501 message ('WARNING : person_id %d in %s not transcodable (1) - skipped'%(person_id,slice['name']))
502 elif person_transcoder[person_id] not in peer_persons:
503 message('WARNING : person_id %d in %s not transcodable (2) - skipped'%(person_id,slice['name']))
505 old_slice_person_ids += [person_transcoder[person_id]]
507 # Foreign users that should be part of the slice
508 slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
510 # Remove stale users from slice
511 for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
512 slice.remove_person(peer_persons[person_id], commit = commit_mode)
513 message ("%s user %s removed from slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
515 # Add new users to slice
516 for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
517 slice.add_person(peer_persons[person_id], commit = commit_mode)
518 message ("%s user %s added into slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
520 # N.B.: Local users that may have been added to the slice
521 # by hand, are not touched.
523 timers['slices'] = time.time() - start
525 # Update peer itself and commit
526 peer.sync(commit = True)