2 # Thierry Parmentelat - INRIA
4 # $Id: RefreshPeer.py 5574 2007-10-25 20:33:17Z thierry $
8 from PLC.Debug import log
9 from PLC.Faults import *
10 from PLC.Method import Method
11 from PLC.Parameter import Parameter, Mixed
12 from PLC.Auth import Auth
14 from PLC.Peers import Peer, Peers
15 from PLC.Sites import Site, Sites
16 from PLC.Persons import Person, Persons
17 from PLC.KeyTypes import KeyType, KeyTypes
18 from PLC.Keys import Key, Keys
19 from PLC.BootStates import BootState, BootStates
20 from PLC.Nodes import Node, Nodes
21 from PLC.SliceInstantiations import SliceInstantiations
22 from PLC.Slices import Slice, Slices
26 class RefreshPeer(Method):
28 Fetches site, node, slice, person and key data from the specified peer
29 and caches it locally; also deletes stale entries.
30 Upon successful completion, returns a dict reporting various timers.
38 Mixed(Peer.fields['peer_id'],
39 Peer.fields['peername']),
42 returns = Parameter(int, "1 if successful")
44 def call(self, auth, peer_id_or_peername):
46 peers = Peers(self.api, [peer_id_or_peername])
48 raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
50 peer_id = peer['peer_id']
59 print >>log, 'Issuing GetPeerData'
60 peer_tables = peer.GetPeerData()
61 timers['transport'] = time.time() - start - peer_tables['db_time']
62 timers['peer_db'] = peer_tables['db_time']
64 print >>log, 'GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport'])
66 def sync(objects, peer_objects, classobj):
68 Synchronizes two dictionaries of objects. objects should
69 be a dictionary of local objects keyed on their foreign
70 identifiers. peer_objects should be a dictionary of
71 foreign objects keyed on their local (i.e., foreign to us)
72 identifiers. Returns a final dictionary of local objects
73 keyed on their foreign identifiers.
77 print >>log, 'Entering sync on',classobj(self.api).__class__.__name__
81 # Delete stale objects
82 for peer_object_id, object in objects.iteritems():
83 if peer_object_id not in peer_objects:
84 object.delete(commit = False)
85 print >> log, peer['peername'],classobj(self.api).__class__.__name__, object[object.primary_key],"deleted"
87 # Add/update new/existing objects
88 for peer_object_id, peer_object in peer_objects.iteritems():
89 if peer_object_id in objects:
90 # Update existing object
91 object = objects[peer_object_id]
93 # Replace foreign identifier with existing local
94 # identifier temporarily for the purposes of
96 peer_object[object.primary_key] = object[object.primary_key]
98 # Must use __eq__() instead of == since
99 # peer_object may be a raw dict instead of a Peer
101 if not object.__eq__(peer_object):
102 # Only update intrinsic fields
103 object.update(object.db_fields(peer_object))
110 # Restore foreign identifier
111 peer_object[object.primary_key] = peer_object_id
114 object = classobj(self.api, peer_object)
115 # Replace foreign identifier with new local identifier
116 del object[object.primary_key]
122 object.sync(commit = False)
123 except PLCInvalidArgument, err:
124 # Skip if validation fails
125 # XXX Log an event instead of printing to logfile
126 print >> log, "Warning: Skipping invalid", \
127 peer['peername'], object.__class__.__name__, \
128 ":", peer_object, ":", err
131 synced[peer_object_id] = object
134 print >> log, peer['peername'], classobj(self.api).__class__.__name__, object[object.primary_key], dbg
137 print >>log, 'Exiting sync on',classobj(self.api).__class__.__name__
142 # Synchronize foreign sites
147 print >>log, 'Dealing with Sites'
149 # Compare only the columns returned by the GetPeerData() call
150 if peer_tables['Sites']:
151 columns = peer_tables['Sites'][0].keys()
155 # Keyed on foreign site_id
156 old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
157 sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
159 # Synchronize new set (still keyed on foreign site_id)
160 peer_sites = sync(old_peer_sites, sites_at_peer, Site)
162 for peer_site_id, site in peer_sites.iteritems():
163 # Bind any newly cached sites to peer
164 if peer_site_id not in old_peer_sites:
165 peer.add_site(site, peer_site_id, commit = False)
166 site['peer_id'] = peer_id
167 site['peer_site_id'] = peer_site_id
169 timers['site'] = time.time() - start
172 # XXX Synchronize foreign key types
175 print >>log, 'Dealing with Keys'
177 key_types = KeyTypes(self.api).dict()
180 # Synchronize foreign keys
185 # Compare only the columns returned by the GetPeerData() call
186 if peer_tables['Keys']:
187 columns = peer_tables['Keys'][0].keys()
191 # Keyed on foreign key_id
192 old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
193 keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
195 # Fix up key_type references
196 for peer_key_id, key in keys_at_peer.items():
197 if key['key_type'] not in key_types:
198 # XXX Log an event instead of printing to logfile
199 print >> log, "Warning: Skipping invalid %s key:" % peer['peername'], \
200 key, ": invalid key type", key['key_type']
201 del keys_at_peer[peer_key_id]
204 # Synchronize new set (still keyed on foreign key_id)
205 peer_keys = sync(old_peer_keys, keys_at_peer, Key)
206 for peer_key_id, key in peer_keys.iteritems():
207 # Bind any newly cached keys to peer
208 if peer_key_id not in old_peer_keys:
209 peer.add_key(key, peer_key_id, commit = False)
210 key['peer_id'] = peer_id
211 key['peer_key_id'] = peer_key_id
213 timers['keys'] = time.time() - start
216 # Synchronize foreign users
221 print >>log, 'Dealing with Persons'
223 # Compare only the columns returned by the GetPeerData() call
224 if peer_tables['Persons']:
225 columns = peer_tables['Persons'][0].keys()
229 # Keyed on foreign person_id
230 old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
232 # artificially attach the persons returned by GetPeerData to the new peer
233 # this is because validate_email needs peer_id to be correct when checking for duplicates
234 for person in peer_tables['Persons']:
235 person['peer_id']=peer_id
236 persons_at_peer = dict([(peer_person['person_id'], peer_person) \
237 for peer_person in peer_tables['Persons']])
239 # XXX Do we care about membership in foreign site(s)?
241 # Synchronize new set (still keyed on foreign person_id)
242 peer_persons = sync(old_peer_persons, persons_at_peer, Person)
244 # transcoder : retrieve a local key_id from a peer_key_id
245 key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
246 for peer_key_id,key in peer_keys.iteritems()])
248 for peer_person_id, person in peer_persons.iteritems():
249 # Bind any newly cached users to peer
250 if peer_person_id not in old_peer_persons:
251 peer.add_person(person, peer_person_id, commit = False)
252 person['peer_id'] = peer_id
253 person['peer_person_id'] = peer_person_id
254 person['key_ids'] = []
256 # User as viewed by peer
257 peer_person = persons_at_peer[peer_person_id]
259 # Foreign keys currently belonging to the user
260 old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
261 if key_transcoder[key_id] in peer_keys]
263 # Foreign keys that should belong to the user
264 # this is basically peer_person['key_ids'], we just check it makes sense
265 # (e.g. we might have failed importing it)
266 person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
268 # Remove stale keys from user
269 for key_id in (set(old_person_key_ids) - set(person_key_ids)):
270 person.remove_key(peer_keys[key_id], commit = False)
271 print >> log, peer['peername'], 'Key', key_id, 'removed from', person['email']
273 # Add new keys to user
274 for key_id in (set(person_key_ids) - set(old_person_key_ids)):
275 person.add_key(peer_keys[key_id], commit = False)
276 print >> log, peer['peername'], 'Key', key_id, 'added into', person['email']
278 timers['persons'] = time.time() - start
281 # XXX Synchronize foreign boot states
284 boot_states = BootStates(self.api).dict()
287 # Synchronize foreign nodes
292 print >>log, 'Dealing with Nodes'
294 # Compare only the columns returned by the GetPeerData() call
295 if peer_tables['Nodes']:
296 columns = peer_tables['Nodes'][0].keys()
300 # Keyed on foreign node_id
301 old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
302 nodes_at_peer = dict([(node['node_id'], node) \
303 for node in peer_tables['Nodes']])
305 # Fix up site_id and boot_states references
306 for peer_node_id, node in nodes_at_peer.items():
308 if node['site_id'] not in peer_sites:
309 errors.append("invalid site %d" % node['site_id'])
310 if node['boot_state'] not in boot_states:
311 errors.append("invalid boot state %s" % node['boot_state'])
313 # XXX Log an event instead of printing to logfile
314 print >> log, "Warning: Skipping invalid %s node:" % peer['peername'], \
315 node, ":", ", ".join(errors)
316 del nodes_at_peer[peer_node_id]
319 node['site_id'] = peer_sites[node['site_id']]['site_id']
321 # Synchronize new set
322 peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
324 for peer_node_id, node in peer_nodes.iteritems():
325 # Bind any newly cached foreign nodes to peer
326 if peer_node_id not in old_peer_nodes:
327 peer.add_node(node, peer_node_id, commit = False)
328 node['peer_id'] = peer_id
329 node['peer_node_id'] = peer_node_id
331 timers['nodes'] = time.time() - start
334 # Synchronize local nodes
339 # Keyed on local node_id
340 local_nodes = Nodes(self.api).dict()
342 for node in peer_tables['PeerNodes']:
343 # Foreign identifier for our node as maintained by peer
344 peer_node_id = node['node_id']
345 # Local identifier for our node as cached by peer
346 node_id = node['peer_node_id']
347 if node_id in local_nodes:
348 # Still a valid local node, add it to the synchronized
349 # set of local node objects keyed on foreign node_id.
350 peer_nodes[peer_node_id] = local_nodes[node_id]
352 timers['local_nodes'] = time.time() - start
355 # XXX Synchronize foreign slice instantiation states
358 slice_instantiations = SliceInstantiations(self.api).dict()
361 # Synchronize foreign slices
366 print >>log, 'Dealing with Slices'
368 # Compare only the columns returned by the GetPeerData() call
369 if peer_tables['Slices']:
370 columns = peer_tables['Slices'][0].keys()
374 # Keyed on foreign slice_id
375 old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
376 slices_at_peer = dict([(slice['slice_id'], slice) \
377 for slice in peer_tables['Slices']])
379 # Fix up site_id, instantiation, and creator_person_id references
380 for peer_slice_id, slice in slices_at_peer.items():
382 if slice['site_id'] not in peer_sites:
383 errors.append("invalid site %d" % slice['site_id'])
384 if slice['instantiation'] not in slice_instantiations:
385 errors.append("invalid instantiation %s" % slice['instantiation'])
386 if slice['creator_person_id'] not in peer_persons:
388 slice['creator_person_id'] = None
390 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
392 print >> log, "Warning: Skipping invalid %s slice:" % peer['peername'], \
393 slice, ":", ", ".join(errors)
394 del slices_at_peer[peer_slice_id]
397 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
399 # Synchronize new set
400 peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
402 # transcoder : retrieve a local node_id from a peer_node_id
403 node_transcoder = dict ( [ (node['node_id'],peer_node_id) \
404 for peer_node_id,node in peer_nodes.iteritems()])
405 person_transcoder = dict ( [ (person['person_id'],peer_person_id) \
406 for peer_person_id,person in peer_persons.iteritems()])
408 for peer_slice_id, slice in peer_slices.iteritems():
409 # Bind any newly cached foreign slices to peer
410 if peer_slice_id not in old_peer_slices:
411 peer.add_slice(slice, peer_slice_id, commit = False)
412 slice['peer_id'] = peer_id
413 slice['peer_slice_id'] = peer_slice_id
414 slice['node_ids'] = []
415 slice['person_ids'] = []
417 # Slice as viewed by peer
418 peer_slice = slices_at_peer[peer_slice_id]
420 # Nodes that are currently part of the slice
421 old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
422 if node_transcoder[node_id] in peer_nodes]
424 # Nodes that should be part of the slice
425 slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
427 # Remove stale nodes from slice
428 for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
429 slice.remove_node(peer_nodes[node_id], commit = False)
430 print >> log, peer['peername'], 'Node', peer_nodes[node_id]['hostname'], 'removed from', slice['name']
432 # Add new nodes to slice
433 for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
434 slice.add_node(peer_nodes[node_id], commit = False)
435 print >> log, peer['peername'], 'Node', peer_nodes[node_id]['hostname'], 'added into', slice['name']
437 # N.B.: Local nodes that may have been added to the slice
438 # by hand, are removed. In other words, don't do this.
440 # Foreign users that are currently part of the slice
441 #old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
442 # if person_transcoder[person_id] in peer_persons]
443 # An issue occurred with a user who registered on both sites (same email)
444 # So the remote person could not get cached locally
445 # The one-line map/filter style is nicer but ineffective here
446 old_slice_person_ids = []
447 for person_id in slice['person_ids']:
448 if not person_transcoder.has_key(person_id):
449 print >> log, 'WARNING : person_id %d in %s not transcodable (1) - skipped'%(person_id,slice['name'])
450 elif person_transcoder[person_id] not in peer_persons:
451 print >> log, 'WARNING : person_id %d in %s not transcodable (2) - skipped'%(person_id,slice['name'])
453 old_slice_person_ids += [person_transcoder[person_id]]
455 # Foreign users that should be part of the slice
456 slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
458 # Remove stale users from slice
459 for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
460 slice.remove_person(peer_persons[person_id], commit = False)
461 print >> log, peer['peername'], 'User', peer_persons[person_id]['email'], 'removed from', slice['name']
463 # Add new users to slice
464 for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
465 slice.add_person(peer_persons[person_id], commit = False)
466 print >> log, peer['peername'], 'User', peer_persons[person_id]['email'], 'added into', slice['name']
468 # N.B.: Local users that may have been added to the slice
469 # by hand, are not touched.
471 timers['slices'] = time.time() - start
473 # Update peer itself and commit
474 peer.sync(commit = True)