2 # Thierry Parmentelat - INRIA
7 from PLC.Debug import log
8 from PLC.Faults import *
9 from PLC.Method import Method
10 from PLC.Parameter import Parameter, Mixed
11 from PLC.Auth import Auth
13 from PLC.Peers import Peer, Peers
14 from PLC.Sites import Site, Sites
15 from PLC.Persons import Person, Persons
16 from PLC.KeyTypes import KeyType, KeyTypes
17 from PLC.Keys import Key, Keys
18 from PLC.BootStates import BootState, BootStates
19 from PLC.Nodes import Node, Nodes
20 from PLC.SliceInstantiations import SliceInstantiations
21 from PLC.Slices import Slice, Slices
25 class RefreshPeer(Method):
27 Fetches site, node, slice, person and key data from the specified peer
28 and caches it locally; also deletes stale entries.
29 Upon successful completion, returns a dict reporting various timers.
37 Mixed(Peer.fields['peer_id'],
38 Peer.fields['peername']),
41 returns = Parameter(int, "1 if successful")
43 def call(self, auth, peer_id_or_peername):
45 peers = Peers(self.api, [peer_id_or_peername])
47 raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
49 peer_id = peer['peer_id']
58 print >>log, 'Issuing GetPeerData'
59 peer_tables = peer.GetPeerData()
60 timers['transport'] = time.time() - start - peer_tables['db_time']
61 timers['peer_db'] = peer_tables['db_time']
63 print >>log, 'GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport'])
65 def sync(objects, peer_objects, classobj):
67 Synchronizes two dictionaries of objects. objects should
68 be a dictionary of local objects keyed on their foreign
69 identifiers. peer_objects should be a dictionary of
70 foreign objects keyed on their local (i.e., foreign to us)
71 identifiers. Returns a final dictionary of local objects
72 keyed on their foreign identifiers.
76 print >>log, 'Entering sync on',classobj(self.api).__class__.__name__
80 # Delete stale objects
81 for peer_object_id, object in objects.iteritems():
82 if peer_object_id not in peer_objects:
83 object.delete(commit = False)
84 print >> log, peer['peername'],classobj(self.api).__class__.__name__, object[object.primary_key],"deleted"
86 # Add/update new/existing objects
87 for peer_object_id, peer_object in peer_objects.iteritems():
88 if peer_object_id in objects:
89 # Update existing object
90 object = objects[peer_object_id]
92 # Replace foreign identifier with existing local
93 # identifier temporarily for the purposes of
95 peer_object[object.primary_key] = object[object.primary_key]
97 # Must use __eq__() instead of == since
98 # peer_object may be a raw dict instead of a Peer
100 if not object.__eq__(peer_object):
101 # Only update intrinsic fields
102 object.update(object.db_fields(peer_object))
109 # Restore foreign identifier
110 peer_object[object.primary_key] = peer_object_id
113 object = classobj(self.api, peer_object)
114 # Replace foreign identifier with new local identifier
115 del object[object.primary_key]
121 object.sync(commit = False)
122 except PLCInvalidArgument, err:
123 # Skip if validation fails
124 # XXX Log an event instead of printing to logfile
125 print >> log, "Warning: Skipping invalid", \
126 peer['peername'], object.__class__.__name__, \
127 ":", peer_object, ":", err
130 synced[peer_object_id] = object
133 print >> log, peer['peername'], classobj(self.api).__class__.__name__, object[object.primary_key], dbg
136 print >>log, 'Exiting sync on',classobj(self.api).__class__.__name__
141 # Synchronize foreign sites
146 print >>log, 'Dealing with Sites'
148 # Compare only the columns returned by the GetPeerData() call
149 if peer_tables['Sites']:
150 columns = peer_tables['Sites'][0].keys()
154 # Keyed on foreign site_id
155 old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
156 sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
158 # Synchronize new set (still keyed on foreign site_id)
159 peer_sites = sync(old_peer_sites, sites_at_peer, Site)
161 for peer_site_id, site in peer_sites.iteritems():
162 # Bind any newly cached sites to peer
163 if peer_site_id not in old_peer_sites:
164 peer.add_site(site, peer_site_id, commit = False)
165 site['peer_id'] = peer_id
166 site['peer_site_id'] = peer_site_id
168 timers['site'] = time.time() - start
171 # XXX Synchronize foreign key types
174 print >>log, 'Dealing with Keys'
176 key_types = KeyTypes(self.api).dict()
179 # Synchronize foreign keys
184 # Compare only the columns returned by the GetPeerData() call
185 if peer_tables['Keys']:
186 columns = peer_tables['Keys'][0].keys()
190 # Keyed on foreign key_id
191 old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
192 keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
194 # Fix up key_type references
195 for peer_key_id, key in keys_at_peer.items():
196 if key['key_type'] not in key_types:
197 # XXX Log an event instead of printing to logfile
198 print >> log, "Warning: Skipping invalid %s key:" % peer['peername'], \
199 key, ": invalid key type", key['key_type']
200 del keys_at_peer[peer_key_id]
203 # Synchronize new set (still keyed on foreign key_id)
204 peer_keys = sync(old_peer_keys, keys_at_peer, Key)
205 for peer_key_id, key in peer_keys.iteritems():
206 # Bind any newly cached keys to peer
207 if peer_key_id not in old_peer_keys:
208 peer.add_key(key, peer_key_id, commit = False)
209 key['peer_id'] = peer_id
210 key['peer_key_id'] = peer_key_id
212 timers['keys'] = time.time() - start
215 # Synchronize foreign users
220 print >>log, 'Dealing with Persons'
222 # Compare only the columns returned by the GetPeerData() call
223 if peer_tables['Persons']:
224 columns = peer_tables['Persons'][0].keys()
228 # Keyed on foreign person_id
229 old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
230 persons_at_peer = dict([(peer_person['person_id'], peer_person) \
231 for peer_person in peer_tables['Persons']])
233 # XXX Do we care about membership in foreign site(s)?
235 # Synchronize new set (still keyed on foreign person_id)
236 peer_persons = sync(old_peer_persons, persons_at_peer, Person)
238 # transcoder : retrieve a local key_id from a peer_key_id
239 key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
240 for peer_key_id,key in peer_keys.iteritems()])
242 for peer_person_id, person in peer_persons.iteritems():
243 # Bind any newly cached users to peer
244 if peer_person_id not in old_peer_persons:
245 peer.add_person(person, peer_person_id, commit = False)
246 person['peer_id'] = peer_id
247 person['peer_person_id'] = peer_person_id
248 person['key_ids'] = []
250 # User as viewed by peer
251 peer_person = persons_at_peer[peer_person_id]
253 # Foreign keys currently belonging to the user
254 old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
255 if key_transcoder[key_id] in peer_keys]
257 # Foreign keys that should belong to the user
258 # this is basically peer_person['key_ids'], we just check it makes sense
259 # (e.g. we might have failed importing it)
260 person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
262 # Remove stale keys from user
263 for key_id in (set(old_person_key_ids) - set(person_key_ids)):
264 person.remove_key(peer_keys[key_id], commit = False)
265 print >> log, peer['peername'], 'Key', key_id, 'removed from', person['email']
267 # Add new keys to user
268 for key_id in (set(person_key_ids) - set(old_person_key_ids)):
269 person.add_key(peer_keys[key_id], commit = False)
270 print >> log, peer['peername'], 'Key', key_id, 'added into', person['email']
272 timers['persons'] = time.time() - start
275 # XXX Synchronize foreign boot states
278 boot_states = BootStates(self.api).dict()
281 # Synchronize foreign nodes
286 print >>log, 'Dealing with Nodes'
288 # Compare only the columns returned by the GetPeerData() call
289 if peer_tables['Nodes']:
290 columns = peer_tables['Nodes'][0].keys()
294 # Keyed on foreign node_id
295 old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
296 nodes_at_peer = dict([(node['node_id'], node) \
297 for node in peer_tables['Nodes']])
299 # Fix up site_id and boot_states references
300 for peer_node_id, node in nodes_at_peer.items():
302 if node['site_id'] not in peer_sites:
303 errors.append("invalid site %d" % node['site_id'])
304 if node['boot_state'] not in boot_states:
305 errors.append("invalid boot state %s" % node['boot_state'])
307 # XXX Log an event instead of printing to logfile
308 print >> log, "Warning: Skipping invalid %s node:" % peer['peername'], \
309 node, ":", ", ".join(errors)
310 del nodes_at_peer[peer_node_id]
313 node['site_id'] = peer_sites[node['site_id']]['site_id']
315 # Synchronize new set
316 peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
318 for peer_node_id, node in peer_nodes.iteritems():
319 # Bind any newly cached foreign nodes to peer
320 if peer_node_id not in old_peer_nodes:
321 peer.add_node(node, peer_node_id, commit = False)
322 node['peer_id'] = peer_id
323 node['peer_node_id'] = peer_node_id
325 timers['nodes'] = time.time() - start
328 # Synchronize local nodes
333 # Keyed on local node_id
334 local_nodes = Nodes(self.api).dict()
336 for node in peer_tables['PeerNodes']:
337 # Foreign identifier for our node as maintained by peer
338 peer_node_id = node['node_id']
339 # Local identifier for our node as cached by peer
340 node_id = node['peer_node_id']
341 if node_id in local_nodes:
342 # Still a valid local node, add it to the synchronized
343 # set of local node objects keyed on foreign node_id.
344 peer_nodes[peer_node_id] = local_nodes[node_id]
346 timers['local_nodes'] = time.time() - start
349 # XXX Synchronize foreign slice instantiation states
352 slice_instantiations = SliceInstantiations(self.api).dict()
355 # Synchronize foreign slices
360 print >>log, 'Dealing with Slices'
362 # Compare only the columns returned by the GetPeerData() call
363 if peer_tables['Slices']:
364 columns = peer_tables['Slices'][0].keys()
368 # Keyed on foreign slice_id
369 old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
370 slices_at_peer = dict([(slice['slice_id'], slice) \
371 for slice in peer_tables['Slices']])
373 # Fix up site_id, instantiation, and creator_person_id references
374 for peer_slice_id, slice in slices_at_peer.items():
376 if slice['site_id'] not in peer_sites:
377 errors.append("invalid site %d" % slice['site_id'])
378 if slice['instantiation'] not in slice_instantiations:
379 errors.append("invalid instantiation %s" % slice['instantiation'])
380 if slice['creator_person_id'] not in peer_persons:
382 slice['creator_person_id'] = None
384 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
386 print >> log, "Warning: Skipping invalid %s slice:" % peer['peername'], \
387 slice, ":", ", ".join(errors)
388 del slices_at_peer[peer_slice_id]
391 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
393 # Synchronize new set
394 peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
396 # transcoder : retrieve a local node_id from a peer_node_id
397 node_transcoder = dict ( [ (node['node_id'],peer_node_id) \
398 for peer_node_id,node in peer_nodes.iteritems()])
399 person_transcoder = dict ( [ (person['person_id'],peer_person_id) \
400 for peer_person_id,person in peer_persons.iteritems()])
402 for peer_slice_id, slice in peer_slices.iteritems():
403 # Bind any newly cached foreign slices to peer
404 if peer_slice_id not in old_peer_slices:
405 peer.add_slice(slice, peer_slice_id, commit = False)
406 slice['peer_id'] = peer_id
407 slice['peer_slice_id'] = peer_slice_id
408 slice['node_ids'] = []
409 slice['person_ids'] = []
411 # Slice as viewed by peer
412 peer_slice = slices_at_peer[peer_slice_id]
414 # Nodes that are currently part of the slice
415 old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
416 if node_transcoder[node_id] in peer_nodes]
418 # Nodes that should be part of the slice
419 slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
421 # Remove stale nodes from slice
422 for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
423 slice.remove_node(peer_nodes[node_id], commit = False)
424 print >> log, peer['peername'], 'Node', peer_nodes[node_id]['hostname'], 'removed from', slice['name']
426 # Add new nodes to slice
427 for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
428 slice.add_node(peer_nodes[node_id], commit = False)
429 print >> log, peer['peername'], 'Node', peer_nodes[node_id]['hostname'], 'added into', slice['name']
431 # N.B.: Local nodes that may have been added to the slice
432 # by hand, are removed. In other words, don't do this.
434 # Foreign users that are currently part of the slice
435 #old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
436 # if person_transcoder[person_id] in peer_persons]
437 # An issue occurred with a user who registered on both sites (same email)
438 # So the remote person could not get cached locally
439 # The one-line map/filter style is nicer but ineffective here
440 old_slice_person_ids = []
441 for person_id in slice['person_ids']:
442 if not person_transcoder.has_key(person_id):
443 print >> log, 'WARNING : person_id %d in %s not transcodable (1) - skipped'%(person_id,slice['name'])
444 elif person_transcoder[person_id] not in peer_persons:
445 print >> log, 'WARNING : person_id %d in %s not transcodable (2) - skipped'%(person_id,slice['name'])
447 old_slice_person_ids += [person_transcoder[person_id]]
449 # Foreign users that should be part of the slice
450 slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
452 # Remove stale users from slice
453 for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
454 slice.remove_person(peer_persons[person_id], commit = False)
455 print >> log, peer['peername'], 'User', peer_persons[person_id]['email'], 'removed from', slice['name']
457 # Add new users to slice
458 for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
459 slice.add_person(peer_persons[person_id], commit = False)
460 print >> log, peer['peername'], 'User', peer_persons[person_id]['email'], 'added into', slice['name']
462 # N.B.: Local users that may have been added to the slice
463 # by hand, are not touched.
465 timers['slices'] = time.time() - start
467 # Update peer itself and commit
468 peer.sync(commit = True)