2 # Thierry Parmentelat - INRIA
8 from PLC.Debug import log
9 from PLC.Faults import *
10 from PLC.Method import Method
11 from PLC.Parameter import Parameter, Mixed
12 from PLC.Auth import Auth
14 from PLC.Peers import Peer, Peers
15 from PLC.Sites import Site, Sites
16 from PLC.Persons import Person, Persons
17 from PLC.KeyTypes import KeyType, KeyTypes
18 from PLC.Keys import Key, Keys
19 from PLC.BootStates import BootState, BootStates
20 from PLC.Nodes import Node, Nodes
21 from PLC.SliceInstantiations import SliceInstantiations
22 from PLC.Slices import Slice, Slices
26 # initial version was doing only one final commit
27 # * set commit_mode to False to get that behaviour
28 # * set comit_mode to True to get everything synced at once
31 def message (to_print=None,verbose_only=False):
32 if verbose_only and not verbose:
34 print >> log, time.strftime("%m-%d-%H-%M-%S:"),
38 def message_verbose(to_print=None):
39 message(to_print,verbose_only=True)
41 class RefreshPeer(Method):
43 Fetches site, node, slice, person and key data from the specified peer
44 and caches it locally; also deletes stale entries.
45 Upon successful completion, returns a dict reporting various timers.
53 Mixed(Peer.fields['peer_id'],
54 Peer.fields['peername']),
57 returns = Parameter(int, "1 if successful")
59 def call(self, auth, peer_id_or_peername):
61 peers = Peers(self.api, [peer_id_or_peername])
63 raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
65 peer_id = peer['peer_id']
74 message('RefreshPeer starting up (commit_mode=%r)'%commit_mode)
75 message('Issuing GetPeerData')
76 peer_tables = peer.GetPeerData()
77 # for smooth federation with 4.2 - ignore fields that are useless anyway, and rewrite boot_state
78 boot_state_rewrite={'dbg':'safeboot','diag':'safeboot','disable':'disabled',
79 'inst':'reinstall','rins':'reinstall','new':'reinstall','rcnf':'reinstall'}
80 for node in peer_tables['Nodes']:
81 for key in ['nodenetwork_ids','dummybox_id']:
84 if node['boot_state'] in boot_state_rewrite: node['boot_state']=boot_state_rewrite[node['boot_state']]
85 for slice in peer_tables['Slices']:
86 for key in ['slice_attribute_ids']:
89 timers['transport'] = time.time() - start - peer_tables['db_time']
90 timers['peer_db'] = peer_tables['db_time']
91 message_verbose('GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport']))
93 def sync(objects, peer_objects, classobj):
95 Synchronizes two dictionaries of objects. objects should
96 be a dictionary of local objects keyed on their foreign
97 identifiers. peer_objects should be a dictionary of
98 foreign objects keyed on their local (i.e., foreign to us)
99 identifiers. Returns a final dictionary of local objects
100 keyed on their foreign identifiers.
103 classname=classobj(self.api).__class__.__name__
104 message_verbose('Entering sync on %s'%classname)
108 # Delete stale objects
109 for peer_object_id, object in objects.iteritems():
110 if peer_object_id not in peer_objects:
111 object.delete(commit = commit_mode)
112 message("%s %s %s deleted"%(peer['peername'],classname, object[object.primary_key]))
114 total = len(peer_objects)
116 # set this to something realistic to trace down a given object(s)
120 if classname == trace_type and peer_object_id in trace_ids:
121 message_verbose('TRACE>>'+message)
123 # Add/update new/existing objects
124 for peer_object_id, peer_object in peer_objects.iteritems():
125 message_verbose ('DBG %s peer_object_id=%d (%d/%d)'%(classname,peer_object_id,count,total))
127 if peer_object_id in synced:
128 message("Warning: %s Skipping already added %s: %r"%(
129 peer['peername'], classname, peer_object))
131 if classname == 'Node':
132 message_verbose ('DBG>> hostname=%s'%peer_object['hostname'])
133 elif classname == "Slice":
134 message_verbose ('DBG>> slicename=%s'%peer_object['name'])
135 if peer_object_id in objects:
136 # Update existing object
137 object = objects[peer_object_id]
139 # Replace foreign identifier with existing local
140 # identifier temporarily for the purposes of
142 peer_object[object.primary_key] = object[object.primary_key]
144 # Must use __eq__() instead of == since
145 # peer_object may be a raw dict instead of a Peer
147 trace ("in objects : comparing")
148 if not object.__eq__(peer_object):
149 # Only update intrinsic fields
151 object.update(object.db_fields(peer_object))
160 # Restore foreign identifier
161 peer_object[object.primary_key] = peer_object_id
163 trace ("not in objects -- creating")
165 object = classobj(self.api, peer_object)
167 # Replace foreign identifier with new local identifier
168 del object[object.primary_key]
169 trace ("forced clean id")
174 message_verbose("DBG>> syncing %s %d - commit_mode=%r"%(classname,peer_object_id,commit_mode))
176 object.sync(commit = commit_mode)
177 except PLCInvalidArgument, err:
178 # Skip if validation fails
179 # XXX Log an event instead of printing to logfile
180 message("Warning: %s Skipping invalid %s %r : %r"%(\
181 peer['peername'], classname, peer_object, err))
184 synced[peer_object_id] = object
187 message("%s: %s %d %s"%(peer['peername'], classname, object[object.primary_key], dbg))
189 message_verbose("Exiting sync on %s"%classname)
194 # Synchronize foreign sites
199 message('Dealing with Sites')
201 # Compare only the columns returned by the GetPeerData() call
202 if peer_tables['Sites']:
203 columns = peer_tables['Sites'][0].keys()
207 # Keyed on foreign site_id
208 old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
209 sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
211 # Synchronize new set (still keyed on foreign site_id)
212 peer_sites = sync(old_peer_sites, sites_at_peer, Site)
214 for peer_site_id, site in peer_sites.iteritems():
215 # Bind any newly cached sites to peer
216 if peer_site_id not in old_peer_sites:
217 peer.add_site(site, peer_site_id, commit = commit_mode)
218 site['peer_id'] = peer_id
219 site['peer_site_id'] = peer_site_id
221 timers['site'] = time.time() - start
224 # XXX Synchronize foreign key types
227 message('Dealing with Keys')
229 key_types = KeyTypes(self.api).dict()
232 # Synchronize foreign keys
237 # Compare only the columns returned by the GetPeerData() call
238 if peer_tables['Keys']:
239 columns = peer_tables['Keys'][0].keys()
243 # Keyed on foreign key_id
244 old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
245 keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
247 # Fix up key_type references
248 for peer_key_id, key in keys_at_peer.items():
249 if key['key_type'] not in key_types:
250 # XXX Log an event instead of printing to logfile
251 message("Warning: Skipping invalid %s key %r" % ( peer['peername'], key))
252 del keys_at_peer[peer_key_id]
255 # Synchronize new set (still keyed on foreign key_id)
256 peer_keys = sync(old_peer_keys, keys_at_peer, Key)
257 for peer_key_id, key in peer_keys.iteritems():
258 # Bind any newly cached keys to peer
259 if peer_key_id not in old_peer_keys:
260 peer.add_key(key, peer_key_id, commit = commit_mode)
261 key['peer_id'] = peer_id
262 key['peer_key_id'] = peer_key_id
264 timers['keys'] = time.time() - start
267 # Synchronize foreign users
272 message('Dealing with Persons')
274 # Compare only the columns returned by the GetPeerData() call
275 if peer_tables['Persons']:
276 columns = peer_tables['Persons'][0].keys()
280 # Keyed on foreign person_id
281 old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
283 # artificially attach the persons returned by GetPeerData to the new peer
284 # this is because validate_email needs peer_id to be correct when checking for duplicates
285 for person in peer_tables['Persons']:
286 person['peer_id']=peer_id
287 persons_at_peer = dict([(peer_person['person_id'], peer_person) \
288 for peer_person in peer_tables['Persons']])
290 # XXX Do we care about membership in foreign site(s)?
292 # Synchronize new set (still keyed on foreign person_id)
293 peer_persons = sync(old_peer_persons, persons_at_peer, Person)
295 # transcoder : retrieve a local key_id from a peer_key_id
296 key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
297 for peer_key_id,key in peer_keys.iteritems()])
299 for peer_person_id, person in peer_persons.iteritems():
300 # Bind any newly cached users to peer
301 if peer_person_id not in old_peer_persons:
302 peer.add_person(person, peer_person_id, commit = commit_mode)
303 person['peer_id'] = peer_id
304 person['peer_person_id'] = peer_person_id
305 person['key_ids'] = []
307 # User as viewed by peer
308 peer_person = persons_at_peer[peer_person_id]
310 # Foreign keys currently belonging to the user
311 old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
312 if key_transcoder[key_id] in peer_keys]
314 # Foreign keys that should belong to the user
315 # this is basically peer_person['key_ids'], we just check it makes sense
316 # (e.g. we might have failed importing it)
317 person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
319 # Remove stale keys from user
320 for key_id in (set(old_person_key_ids) - set(person_key_ids)):
321 person.remove_key(peer_keys[key_id], commit = commit_mode)
322 message ("%s Key %d removed from person %s"%(peer['peername'], key_id, person['email']))
324 # Add new keys to user
325 for key_id in (set(person_key_ids) - set(old_person_key_ids)):
326 person.add_key(peer_keys[key_id], commit = commit_mode)
327 message ("%s Key %d added into person %s"%(peer['peername'],key_id, person['email']))
329 timers['persons'] = time.time() - start
332 # XXX Synchronize foreign boot states
335 boot_states = BootStates(self.api).dict()
338 # Synchronize foreign nodes
343 message('Dealing with Nodes (1)')
345 # Compare only the columns returned by the GetPeerData() call
346 if peer_tables['Nodes']:
347 columns = peer_tables['Nodes'][0].keys()
349 # smooth federation with a 4.2 peer - ignore these fields that are useless anyway
350 columns = Node.fields
351 if 'interface_ids' in columns: columns.remove('interface_ids')
352 if 'dummybox_id' in columns: columns.remove('dummybox_id')
354 # Keyed on foreign node_id
355 old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
356 nodes_at_peer = dict([(node['node_id'], node) \
357 for node in peer_tables['Nodes']])
359 # Fix up site_id and boot_states references
360 for peer_node_id, node in nodes_at_peer.items():
362 if node['site_id'] not in peer_sites:
363 errors.append("invalid site %d" % node['site_id'])
364 if node['boot_state'] not in boot_states:
365 errors.append("invalid boot state %s" % node['boot_state'])
367 # XXX Log an event instead of printing to logfile
368 message ("Warning: Skipping invalid %s node %r : " % (peer['peername'], node)\
370 del nodes_at_peer[peer_node_id]
373 node['site_id'] = peer_sites[node['site_id']]['site_id']
375 # Synchronize new set
376 peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
378 for peer_node_id, node in peer_nodes.iteritems():
379 # Bind any newly cached foreign nodes to peer
380 if peer_node_id not in old_peer_nodes:
381 peer.add_node(node, peer_node_id, commit = commit_mode)
382 node['peer_id'] = peer_id
383 node['peer_node_id'] = peer_node_id
385 timers['nodes'] = time.time() - start
388 # Synchronize local nodes
392 message('Dealing with Nodes (2)')
394 # Keyed on local node_id
395 local_nodes = Nodes(self.api).dict()
397 for node in peer_tables['PeerNodes']:
398 # Foreign identifier for our node as maintained by peer
399 peer_node_id = node['node_id']
400 # Local identifier for our node as cached by peer
401 node_id = node['peer_node_id']
402 if node_id in local_nodes:
403 # Still a valid local node, add it to the synchronized
404 # set of local node objects keyed on foreign node_id.
405 peer_nodes[peer_node_id] = local_nodes[node_id]
407 timers['local_nodes'] = time.time() - start
410 # XXX Synchronize foreign slice instantiation states
413 slice_instantiations = SliceInstantiations(self.api).dict()
416 # Synchronize foreign slices
421 message('Dealing with Slices (1)')
423 # Compare only the columns returned by the GetPeerData() call
424 if peer_tables['Slices']:
425 columns = peer_tables['Slices'][0].keys()
429 # Keyed on foreign slice_id
430 old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
431 slices_at_peer = dict([(slice['slice_id'], slice) \
432 for slice in peer_tables['Slices']])
434 # Fix up site_id, instantiation, and creator_person_id references
435 for peer_slice_id, slice in slices_at_peer.items():
437 if slice['site_id'] not in peer_sites:
438 errors.append("invalid site %d" % slice['site_id'])
439 if slice['instantiation'] not in slice_instantiations:
440 errors.append("invalid instantiation %s" % slice['instantiation'])
441 if slice['creator_person_id'] not in peer_persons:
443 slice['creator_person_id'] = None
445 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
447 message("Warning: Skipping invalid %s slice %r : " % (peer['peername'], slice) \
449 del slices_at_peer[peer_slice_id]
452 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
454 # Synchronize new set
455 peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
457 message('Dealing with Slices (2)')
458 # transcoder : retrieve a local node_id from a peer_node_id
459 node_transcoder = dict ( [ (node['node_id'],peer_node_id) \
460 for peer_node_id,node in peer_nodes.iteritems()])
461 person_transcoder = dict ( [ (person['person_id'],peer_person_id) \
462 for peer_person_id,person in peer_persons.iteritems()])
464 for peer_slice_id, slice in peer_slices.iteritems():
465 # Bind any newly cached foreign slices to peer
466 if peer_slice_id not in old_peer_slices:
467 peer.add_slice(slice, peer_slice_id, commit = commit_mode)
468 slice['peer_id'] = peer_id
469 slice['peer_slice_id'] = peer_slice_id
470 slice['node_ids'] = []
471 slice['person_ids'] = []
473 # Slice as viewed by peer
474 peer_slice = slices_at_peer[peer_slice_id]
476 # Nodes that are currently part of the slice
477 old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
478 if node_id in node_transcoder and node_transcoder[node_id] in peer_nodes]
480 # Nodes that should be part of the slice
481 slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
483 # Remove stale nodes from slice
484 for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
485 slice.remove_node(peer_nodes[node_id], commit = commit_mode)
486 message ("%s node %s removed from slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
488 # Add new nodes to slice
489 for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
490 slice.add_node(peer_nodes[node_id], commit = commit_mode)
491 message ("%s node %s added into slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
493 # N.B.: Local nodes that may have been added to the slice
494 # by hand, are removed. In other words, don't do this.
496 # Foreign users that are currently part of the slice
497 #old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
498 # if person_transcoder[person_id] in peer_persons]
499 # An issue occurred with a user who registered on both sites (same email)
500 # So the remote person could not get cached locally
501 # The one-line map/filter style is nicer but ineffective here
502 old_slice_person_ids = []
503 for person_id in slice['person_ids']:
504 if not person_transcoder.has_key(person_id):
505 message ('WARNING : person_id %d in %s not transcodable (1) - skipped'%(person_id,slice['name']))
506 elif person_transcoder[person_id] not in peer_persons:
507 message('WARNING : person_id %d in %s not transcodable (2) - skipped'%(person_id,slice['name']))
509 old_slice_person_ids += [person_transcoder[person_id]]
511 # Foreign users that should be part of the slice
512 slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
514 # Remove stale users from slice
515 for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
516 slice.remove_person(peer_persons[person_id], commit = commit_mode)
517 message ("%s user %s removed from slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
519 # Add new users to slice
520 for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
521 slice.add_person(peer_persons[person_id], commit = commit_mode)
522 message ("%s user %s added into slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
524 # N.B.: Local users that may have been added to the slice
525 # by hand, are not touched.
527 timers['slices'] = time.time() - start
529 # Update peer itself and commit
530 peer.sync(commit = True)