2 # Thierry Parmentelat - INRIA
8 from PLC.Debug import log
9 from PLC.Faults import *
10 from PLC.Method import Method
11 from PLC.Parameter import Parameter, Mixed
12 from PLC.Auth import Auth
14 from PLC.Peers import Peer, Peers
15 from PLC.Sites import Site, Sites
16 from PLC.Persons import Person, Persons
17 from PLC.KeyTypes import KeyType, KeyTypes
18 from PLC.Keys import Key, Keys
19 from PLC.BootStates import BootState, BootStates
20 from PLC.Nodes import Node, Nodes
21 from PLC.SliceInstantiations import SliceInstantiations
22 from PLC.Slices import Slice, Slices
26 def message (to_print=None,verbose_only=False):
27 if verbose_only and not verbose:
29 print >> log, time.strftime("%m-%d-%H-%M-%S:"),
33 def message_verbose(to_print=None):
34 message(to_print,verbose_only=True)
36 class RefreshPeer(Method):
38 Fetches site, node, slice, person and key data from the specified peer
39 and caches it locally; also deletes stale entries.
40 Upon successful completion, returns a dict reporting various timers.
48 Mixed(Peer.fields['peer_id'],
49 Peer.fields['peername']),
52 returns = Parameter(int, "1 if successful")
54 def call(self, auth, peer_id_or_peername):
56 peers = Peers(self.api, [peer_id_or_peername])
58 raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
60 peer_id = peer['peer_id']
69 message('Issuing GetPeerData')
70 peer_tables = peer.GetPeerData()
71 timers['transport'] = time.time() - start - peer_tables['db_time']
72 timers['peer_db'] = peer_tables['db_time']
73 message_verbose('GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport']))
75 def sync(objects, peer_objects, classobj):
77 Synchronizes two dictionaries of objects. objects should
78 be a dictionary of local objects keyed on their foreign
79 identifiers. peer_objects should be a dictionary of
80 foreign objects keyed on their local (i.e., foreign to us)
81 identifiers. Returns a final dictionary of local objects
82 keyed on their foreign identifiers.
85 classname=classobj(self.api).__class__.__name__
86 message_verbose('Entering sync on %s'%classname)
90 # Delete stale objects
91 for peer_object_id, object in objects.iteritems():
92 if peer_object_id not in peer_objects:
93 object.delete(commit = False)
94 message("%s %s %s deleted"%(peer['peername'],classname, object[object.primary_key]))
96 total = len(peer_objects)
98 # Add/update new/existing objects
99 for peer_object_id, peer_object in peer_objects.iteritems():
100 message_verbose ('DBG %s peer_object_id=%d (%d/%d)'%(classname,peer_object_id,count,total))
102 if classname == 'Node':
103 message_verbose ('DBG>> hostname=%s'%peer_object['hostname'])
104 elif classname == "Slice":
105 message_verbose ('DBG>> slicename=%s'%peer_object['name'])
106 if peer_object_id in objects:
107 # Update existing object
108 object = objects[peer_object_id]
110 # Replace foreign identifier with existing local
111 # identifier temporarily for the purposes of
113 peer_object[object.primary_key] = object[object.primary_key]
115 # Must use __eq__() instead of == since
116 # peer_object may be a raw dict instead of a Peer
118 if not object.__eq__(peer_object):
119 # Only update intrinsic fields
120 object.update(object.db_fields(peer_object))
127 # Restore foreign identifier
128 peer_object[object.primary_key] = peer_object_id
131 object = classobj(self.api, peer_object)
132 # Replace foreign identifier with new local identifier
133 del object[object.primary_key]
139 object.sync(commit = False)
140 except PLCInvalidArgument, err:
141 # Skip if validation fails
142 # XXX Log an event instead of printing to logfile
143 message("Warning: %s Skipping invalid %s %r : %r"%(\
144 peer['peername'], classname, peer_object, err))
147 synced[peer_object_id] = object
150 message("%s: %s %d %s"%(peer['peername'], classname, object[object.primary_key], dbg))
152 message_verbose("Exiting sync on %s"%classname)
157 # Synchronize foreign sites
162 message('Dealing with Sites')
164 # Compare only the columns returned by the GetPeerData() call
165 if peer_tables['Sites']:
166 columns = peer_tables['Sites'][0].keys()
170 # Keyed on foreign site_id
171 old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
172 sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
174 # Synchronize new set (still keyed on foreign site_id)
175 peer_sites = sync(old_peer_sites, sites_at_peer, Site)
177 for peer_site_id, site in peer_sites.iteritems():
178 # Bind any newly cached sites to peer
179 if peer_site_id not in old_peer_sites:
180 peer.add_site(site, peer_site_id, commit = False)
181 site['peer_id'] = peer_id
182 site['peer_site_id'] = peer_site_id
184 timers['site'] = time.time() - start
187 # XXX Synchronize foreign key types
190 message('Dealing with Keys')
192 key_types = KeyTypes(self.api).dict()
195 # Synchronize foreign keys
200 # Compare only the columns returned by the GetPeerData() call
201 if peer_tables['Keys']:
202 columns = peer_tables['Keys'][0].keys()
206 # Keyed on foreign key_id
207 old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
208 keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
210 # Fix up key_type references
211 for peer_key_id, key in keys_at_peer.items():
212 if key['key_type'] not in key_types:
213 # XXX Log an event instead of printing to logfile
214 message("Warning: Skipping invalid %s key %r" % ( peer['peername'], key))
215 del keys_at_peer[peer_key_id]
218 # Synchronize new set (still keyed on foreign key_id)
219 peer_keys = sync(old_peer_keys, keys_at_peer, Key)
220 for peer_key_id, key in peer_keys.iteritems():
221 # Bind any newly cached keys to peer
222 if peer_key_id not in old_peer_keys:
223 peer.add_key(key, peer_key_id, commit = False)
224 key['peer_id'] = peer_id
225 key['peer_key_id'] = peer_key_id
227 timers['keys'] = time.time() - start
230 # Synchronize foreign users
235 message('Dealing with Persons')
237 # Compare only the columns returned by the GetPeerData() call
238 if peer_tables['Persons']:
239 columns = peer_tables['Persons'][0].keys()
243 # Keyed on foreign person_id
244 old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
246 # artificially attach the persons returned by GetPeerData to the new peer
247 # this is because validate_email needs peer_id to be correct when checking for duplicates
248 for person in peer_tables['Persons']:
249 person['peer_id']=peer_id
250 persons_at_peer = dict([(peer_person['person_id'], peer_person) \
251 for peer_person in peer_tables['Persons']])
253 # XXX Do we care about membership in foreign site(s)?
255 # Synchronize new set (still keyed on foreign person_id)
256 peer_persons = sync(old_peer_persons, persons_at_peer, Person)
258 # transcoder : retrieve a local key_id from a peer_key_id
259 key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
260 for peer_key_id,key in peer_keys.iteritems()])
262 for peer_person_id, person in peer_persons.iteritems():
263 # Bind any newly cached users to peer
264 if peer_person_id not in old_peer_persons:
265 peer.add_person(person, peer_person_id, commit = False)
266 person['peer_id'] = peer_id
267 person['peer_person_id'] = peer_person_id
268 person['key_ids'] = []
270 # User as viewed by peer
271 peer_person = persons_at_peer[peer_person_id]
273 # Foreign keys currently belonging to the user
274 old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
275 if key_transcoder[key_id] in peer_keys]
277 # Foreign keys that should belong to the user
278 # this is basically peer_person['key_ids'], we just check it makes sense
279 # (e.g. we might have failed importing it)
280 person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
282 # Remove stale keys from user
283 for key_id in (set(old_person_key_ids) - set(person_key_ids)):
284 person.remove_key(peer_keys[key_id], commit = False)
285 message ("%s Key %d removed from person %s"%(peer['peername'], key_id, person['email']))
287 # Add new keys to user
288 for key_id in (set(person_key_ids) - set(old_person_key_ids)):
289 person.add_key(peer_keys[key_id], commit = False)
290 message ("%s Key %d added into person %s"%(peer['peername'],key_id, person['email']))
292 timers['persons'] = time.time() - start
295 # XXX Synchronize foreign boot states
298 boot_states = BootStates(self.api).dict()
301 # Synchronize foreign nodes
306 message('Dealing with Nodes (1)')
308 # Compare only the columns returned by the GetPeerData() call
309 if peer_tables['Nodes']:
310 columns = peer_tables['Nodes'][0].keys()
314 # Keyed on foreign node_id
315 old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
316 nodes_at_peer = dict([(node['node_id'], node) \
317 for node in peer_tables['Nodes']])
319 # Fix up site_id and boot_states references
320 for peer_node_id, node in nodes_at_peer.items():
322 if node['site_id'] not in peer_sites:
323 errors.append("invalid site %d" % node['site_id'])
324 if node['boot_state'] not in boot_states:
325 errors.append("invalid boot state %s" % node['boot_state'])
327 # XXX Log an event instead of printing to logfile
328 message ("Warning: Skipping invalid %s node %r : " % (peer['peername'], node)\
330 del nodes_at_peer[peer_node_id]
333 node['site_id'] = peer_sites[node['site_id']]['site_id']
335 # Synchronize new set
336 peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
338 for peer_node_id, node in peer_nodes.iteritems():
339 # Bind any newly cached foreign nodes to peer
340 if peer_node_id not in old_peer_nodes:
341 peer.add_node(node, peer_node_id, commit = False)
342 node['peer_id'] = peer_id
343 node['peer_node_id'] = peer_node_id
345 timers['nodes'] = time.time() - start
348 # Synchronize local nodes
352 message('Dealing with Nodes (2)')
354 # Keyed on local node_id
355 local_nodes = Nodes(self.api).dict()
357 for node in peer_tables['PeerNodes']:
358 # Foreign identifier for our node as maintained by peer
359 peer_node_id = node['node_id']
360 # Local identifier for our node as cached by peer
361 node_id = node['peer_node_id']
362 if node_id in local_nodes:
363 # Still a valid local node, add it to the synchronized
364 # set of local node objects keyed on foreign node_id.
365 peer_nodes[peer_node_id] = local_nodes[node_id]
367 timers['local_nodes'] = time.time() - start
370 # XXX Synchronize foreign slice instantiation states
373 slice_instantiations = SliceInstantiations(self.api).dict()
376 # Synchronize foreign slices
381 message('Dealing with Slices (1)')
383 # Compare only the columns returned by the GetPeerData() call
384 if peer_tables['Slices']:
385 columns = peer_tables['Slices'][0].keys()
389 # Keyed on foreign slice_id
390 old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
391 slices_at_peer = dict([(slice['slice_id'], slice) \
392 for slice in peer_tables['Slices']])
394 # Fix up site_id, instantiation, and creator_person_id references
395 for peer_slice_id, slice in slices_at_peer.items():
397 if slice['site_id'] not in peer_sites:
398 errors.append("invalid site %d" % slice['site_id'])
399 if slice['instantiation'] not in slice_instantiations:
400 errors.append("invalid instantiation %s" % slice['instantiation'])
401 if slice['creator_person_id'] not in peer_persons:
403 slice['creator_person_id'] = None
405 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
407 message("Warning: Skipping invalid %s slice %r : " % (peer['peername'], slice) \
409 del slices_at_peer[peer_slice_id]
412 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
414 # Synchronize new set
415 peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
417 message('Dealing with Slices (2)')
418 # transcoder : retrieve a local node_id from a peer_node_id
419 node_transcoder = dict ( [ (node['node_id'],peer_node_id) \
420 for peer_node_id,node in peer_nodes.iteritems()])
421 person_transcoder = dict ( [ (person['person_id'],peer_person_id) \
422 for peer_person_id,person in peer_persons.iteritems()])
424 for peer_slice_id, slice in peer_slices.iteritems():
425 # Bind any newly cached foreign slices to peer
426 if peer_slice_id not in old_peer_slices:
427 peer.add_slice(slice, peer_slice_id, commit = False)
428 slice['peer_id'] = peer_id
429 slice['peer_slice_id'] = peer_slice_id
430 slice['node_ids'] = []
431 slice['person_ids'] = []
433 # Slice as viewed by peer
434 peer_slice = slices_at_peer[peer_slice_id]
436 # Nodes that are currently part of the slice
437 old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
438 if node_transcoder[node_id] in peer_nodes]
440 # Nodes that should be part of the slice
441 slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
443 # Remove stale nodes from slice
444 for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
445 slice.remove_node(peer_nodes[node_id], commit = False)
446 message ("%s node %s removed from slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
448 # Add new nodes to slice
449 for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
450 slice.add_node(peer_nodes[node_id], commit = False)
451 message ("%s node %s added into slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
453 # N.B.: Local nodes that may have been added to the slice
454 # by hand, are removed. In other words, don't do this.
456 # Foreign users that are currently part of the slice
457 #old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
458 # if person_transcoder[person_id] in peer_persons]
459 # An issue occurred with a user who registered on both sites (same email)
460 # So the remote person could not get cached locally
461 # The one-line map/filter style is nicer but ineffective here
462 old_slice_person_ids = []
463 for person_id in slice['person_ids']:
464 if not person_transcoder.has_key(person_id):
465 message ('WARNING : person_id %d in %s not transcodable (1) - skipped'%(person_id,slice['name']))
466 elif person_transcoder[person_id] not in peer_persons:
467 message('WARNING : person_id %d in %s not transcodable (2) - skipped'%(person_id,slice['name']))
469 old_slice_person_ids += [person_transcoder[person_id]]
471 # Foreign users that should be part of the slice
472 slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
474 # Remove stale users from slice
475 for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
476 slice.remove_person(peer_persons[person_id], commit = False)
477 message ("%s user %s removed from slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
479 # Add new users to slice
480 for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
481 slice.add_person(peer_persons[person_id], commit = False)
482 message ("%s user %s added into slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
484 # N.B.: Local users that may have been added to the slice
485 # by hand, are not touched.
487 timers['slices'] = time.time() - start
489 # Update peer itself and commit
490 peer.sync(commit = True)