2 # Thierry Parmentelat - INRIA
8 from PLC.Debug import log
9 from PLC.Faults import *
10 from PLC.Method import Method
11 from PLC.Parameter import Parameter, Mixed
12 from PLC.Auth import Auth
14 from PLC.Peers import Peer, Peers
15 from PLC.Sites import Site, Sites
16 from PLC.Persons import Person, Persons
17 from PLC.KeyTypes import KeyType, KeyTypes
18 from PLC.Keys import Key, Keys
19 from PLC.BootStates import BootState, BootStates
20 from PLC.Nodes import Node, Nodes
21 from PLC.SliceInstantiations import SliceInstantiations
22 from PLC.Slices import Slice, Slices
26 class RefreshPeer(Method):
28 Fetches site, node, slice, person and key data from the specified peer
29 and caches it locally; also deletes stale entries.
30 Upon successful completion, returns a dict reporting various timers.
38 Mixed(Peer.fields['peer_id'],
39 Peer.fields['peername']),
42 returns = Parameter(int, "1 if successful")
44 def call(self, auth, peer_id_or_peername):
46 peers = Peers(self.api, [peer_id_or_peername])
48 raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
50 peer_id = peer['peer_id']
59 print >>log, 'Issuing GetPeerData'
60 peer_tables = peer.GetPeerData()
61 timers['transport'] = time.time() - start - peer_tables['db_time']
62 timers['peer_db'] = peer_tables['db_time']
64 print >>log, 'GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport'])
66 def sync(objects, peer_objects, classobj):
68 Synchronizes two dictionaries of objects. objects should
69 be a dictionary of local objects keyed on their foreign
70 identifiers. peer_objects should be a dictionary of
71 foreign objects keyed on their local (i.e., foreign to us)
72 identifiers. Returns a final dictionary of local objects
73 keyed on their foreign identifiers.
77 print >>log, 'Entering sync on',classobj(self.api).__class__.__name__
81 # Delete stale objects
82 for peer_object_id, object in objects.iteritems():
83 if peer_object_id not in peer_objects:
84 object.delete(commit = False)
85 print >> log, peer['peername'],classobj(self.api).__class__.__name__, object[object.primary_key],"deleted"
87 # Add/update new/existing objects
88 for peer_object_id, peer_object in peer_objects.iteritems():
89 if peer_object_id in objects:
90 # Update existing object
91 object = objects[peer_object_id]
93 # Replace foreign identifier with existing local
94 # identifier temporarily for the purposes of
96 peer_object[object.primary_key] = object[object.primary_key]
98 # Must use __eq__() instead of == since
99 # peer_object may be a raw dict instead of a Peer
101 if not object.__eq__(peer_object):
102 # Only update intrinsic fields
103 object.update(object.db_fields(peer_object))
110 # Restore foreign identifier
111 peer_object[object.primary_key] = peer_object_id
114 object = classobj(self.api, peer_object)
115 # Replace foreign identifier with new local identifier
116 del object[object.primary_key]
122 object.sync(commit = False)
123 except PLCInvalidArgument, err:
124 # Skip if validation fails
125 # XXX Log an event instead of printing to logfile
126 print >> log, "Warning: Skipping invalid", \
127 peer['peername'], object.__class__.__name__, \
128 ":", peer_object, ":", err
131 synced[peer_object_id] = object
134 print >> log, peer['peername'], classobj(self.api).__class__.__name__, object[object.primary_key], dbg
137 print >>log, 'Exiting sync on',classobj(self.api).__class__.__name__
142 # Synchronize foreign sites
147 print >>log, 'Dealing with Sites'
149 # Compare only the columns returned by the GetPeerData() call
150 if peer_tables['Sites']:
151 columns = peer_tables['Sites'][0].keys()
155 # Keyed on foreign site_id
156 old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
157 sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
159 # Synchronize new set (still keyed on foreign site_id)
160 peer_sites = sync(old_peer_sites, sites_at_peer, Site)
162 for peer_site_id, site in peer_sites.iteritems():
163 # Bind any newly cached sites to peer
164 if peer_site_id not in old_peer_sites:
165 peer.add_site(site, peer_site_id, commit = False)
166 site['peer_id'] = peer_id
167 site['peer_site_id'] = peer_site_id
169 timers['site'] = time.time() - start
172 # XXX Synchronize foreign key types
175 print >>log, 'Dealing with Keys'
177 key_types = KeyTypes(self.api).dict()
180 # Synchronize foreign keys
185 # Compare only the columns returned by the GetPeerData() call
186 if peer_tables['Keys']:
187 columns = peer_tables['Keys'][0].keys()
191 # Keyed on foreign key_id
192 old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
193 keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
195 # Fix up key_type references
196 for peer_key_id, key in keys_at_peer.items():
197 if key['key_type'] not in key_types:
198 # XXX Log an event instead of printing to logfile
199 print >> log, "Warning: Skipping invalid %s key:" % peer['peername'], \
200 key, ": invalid key type", key['key_type']
201 del keys_at_peer[peer_key_id]
204 # Synchronize new set (still keyed on foreign key_id)
205 peer_keys = sync(old_peer_keys, keys_at_peer, Key)
206 for peer_key_id, key in peer_keys.iteritems():
207 # Bind any newly cached keys to peer
208 if peer_key_id not in old_peer_keys:
209 peer.add_key(key, peer_key_id, commit = False)
210 key['peer_id'] = peer_id
211 key['peer_key_id'] = peer_key_id
213 timers['keys'] = time.time() - start
216 # Synchronize foreign users
221 print >>log, 'Dealing with Persons'
223 # Compare only the columns returned by the GetPeerData() call
224 if peer_tables['Persons']:
225 columns = peer_tables['Persons'][0].keys()
229 # Keyed on foreign person_id
230 old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
231 persons_at_peer = dict([(peer_person['person_id'], peer_person) \
232 for peer_person in peer_tables['Persons']])
234 # XXX Do we care about membership in foreign site(s)?
236 # Synchronize new set (still keyed on foreign person_id)
237 peer_persons = sync(old_peer_persons, persons_at_peer, Person)
239 # transcoder : retrieve a local key_id from a peer_key_id
240 key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
241 for peer_key_id,key in peer_keys.iteritems()])
243 for peer_person_id, person in peer_persons.iteritems():
244 # Bind any newly cached users to peer
245 if peer_person_id not in old_peer_persons:
246 peer.add_person(person, peer_person_id, commit = False)
247 person['peer_id'] = peer_id
248 person['peer_person_id'] = peer_person_id
249 person['key_ids'] = []
251 # User as viewed by peer
252 peer_person = persons_at_peer[peer_person_id]
254 # Foreign keys currently belonging to the user
255 old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
256 if key_transcoder[key_id] in peer_keys]
258 # Foreign keys that should belong to the user
259 # this is basically peer_person['key_ids'], we just check it makes sense
260 # (e.g. we might have failed importing it)
261 person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
263 # Remove stale keys from user
264 for key_id in (set(old_person_key_ids) - set(person_key_ids)):
265 person.remove_key(peer_keys[key_id], commit = False)
266 print >> log, peer['peername'], 'Key', key_id, 'removed from', person['email']
268 # Add new keys to user
269 for key_id in (set(person_key_ids) - set(old_person_key_ids)):
270 person.add_key(peer_keys[key_id], commit = False)
271 print >> log, peer['peername'], 'Key', key_id, 'added into', person['email']
273 timers['persons'] = time.time() - start
276 # XXX Synchronize foreign boot states
279 boot_states = BootStates(self.api).dict()
282 # Synchronize foreign nodes
287 print >>log, 'Dealing with Nodes'
289 # Compare only the columns returned by the GetPeerData() call
290 if peer_tables['Nodes']:
291 columns = peer_tables['Nodes'][0].keys()
295 # Keyed on foreign node_id
296 old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
297 nodes_at_peer = dict([(node['node_id'], node) \
298 for node in peer_tables['Nodes']])
300 # Fix up site_id and boot_states references
301 for peer_node_id, node in nodes_at_peer.items():
303 if node['site_id'] not in peer_sites:
304 errors.append("invalid site %d" % node['site_id'])
305 if node['boot_state'] not in boot_states:
306 errors.append("invalid boot state %s" % node['boot_state'])
308 # XXX Log an event instead of printing to logfile
309 print >> log, "Warning: Skipping invalid %s node:" % peer['peername'], \
310 node, ":", ", ".join(errors)
311 del nodes_at_peer[peer_node_id]
314 node['site_id'] = peer_sites[node['site_id']]['site_id']
316 # Synchronize new set
317 peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
319 for peer_node_id, node in peer_nodes.iteritems():
320 # Bind any newly cached foreign nodes to peer
321 if peer_node_id not in old_peer_nodes:
322 peer.add_node(node, peer_node_id, commit = False)
323 node['peer_id'] = peer_id
324 node['peer_node_id'] = peer_node_id
326 timers['nodes'] = time.time() - start
329 # Synchronize local nodes
334 # Keyed on local node_id
335 local_nodes = Nodes(self.api).dict()
337 for node in peer_tables['PeerNodes']:
338 # Foreign identifier for our node as maintained by peer
339 peer_node_id = node['node_id']
340 # Local identifier for our node as cached by peer
341 node_id = node['peer_node_id']
342 if node_id in local_nodes:
343 # Still a valid local node, add it to the synchronized
344 # set of local node objects keyed on foreign node_id.
345 peer_nodes[peer_node_id] = local_nodes[node_id]
347 timers['local_nodes'] = time.time() - start
350 # XXX Synchronize foreign slice instantiation states
353 slice_instantiations = SliceInstantiations(self.api).dict()
356 # Synchronize foreign slices
361 print >>log, 'Dealing with Slices'
363 # Compare only the columns returned by the GetPeerData() call
364 if peer_tables['Slices']:
365 columns = peer_tables['Slices'][0].keys()
369 # Keyed on foreign slice_id
370 old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
371 slices_at_peer = dict([(slice['slice_id'], slice) \
372 for slice in peer_tables['Slices']])
374 # Fix up site_id, instantiation, and creator_person_id references
375 for peer_slice_id, slice in slices_at_peer.items():
377 if slice['site_id'] not in peer_sites:
378 errors.append("invalid site %d" % slice['site_id'])
379 if slice['instantiation'] not in slice_instantiations:
380 errors.append("invalid instantiation %s" % slice['instantiation'])
381 if slice['creator_person_id'] not in peer_persons:
383 slice['creator_person_id'] = None
385 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
387 print >> log, "Warning: Skipping invalid %s slice:" % peer['peername'], \
388 slice, ":", ", ".join(errors)
389 del slices_at_peer[peer_slice_id]
392 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
394 # Synchronize new set
395 peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
397 # transcoder : retrieve a local node_id from a peer_node_id
398 node_transcoder = dict ( [ (node['node_id'],peer_node_id) \
399 for peer_node_id,node in peer_nodes.iteritems()])
400 person_transcoder = dict ( [ (person['person_id'],peer_person_id) \
401 for peer_person_id,person in peer_persons.iteritems()])
403 for peer_slice_id, slice in peer_slices.iteritems():
404 # Bind any newly cached foreign slices to peer
405 if peer_slice_id not in old_peer_slices:
406 peer.add_slice(slice, peer_slice_id, commit = False)
407 slice['peer_id'] = peer_id
408 slice['peer_slice_id'] = peer_slice_id
409 slice['node_ids'] = []
410 slice['person_ids'] = []
412 # Slice as viewed by peer
413 peer_slice = slices_at_peer[peer_slice_id]
415 # Nodes that are currently part of the slice
416 old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
417 if node_transcoder[node_id] in peer_nodes]
419 # Nodes that should be part of the slice
420 slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
422 # Remove stale nodes from slice
423 for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
424 slice.remove_node(peer_nodes[node_id], commit = False)
425 print >> log, peer['peername'], 'Node', peer_nodes[node_id]['hostname'], 'removed from', slice['name']
427 # Add new nodes to slice
428 for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
429 slice.add_node(peer_nodes[node_id], commit = False)
430 print >> log, peer['peername'], 'Node', peer_nodes[node_id]['hostname'], 'added into', slice['name']
432 # N.B.: Local nodes that may have been added to the slice
433 # by hand, are removed. In other words, don't do this.
435 # Foreign users that are currently part of the slice
436 #old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
437 # if person_transcoder[person_id] in peer_persons]
438 # An issue occurred with a user who registered on both sites (same email)
439 # So the remote person could not get cached locally
440 # The one-line map/filter style is nicer but ineffective here
441 old_slice_person_ids = []
442 for person_id in slice['person_ids']:
443 if not person_transcoder.has_key(person_id):
444 print >> log, 'WARNING : person_id %d in %s not transcodable (1) - skipped'%(person_id,slice['name'])
445 elif person_transcoder[person_id] not in peer_persons:
446 print >> log, 'WARNING : person_id %d in %s not transcodable (2) - skipped'%(person_id,slice['name'])
448 old_slice_person_ids += [person_transcoder[person_id]]
450 # Foreign users that should be part of the slice
451 slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
453 # Remove stale users from slice
454 for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
455 slice.remove_person(peer_persons[person_id], commit = False)
456 print >> log, peer['peername'], 'User', peer_persons[person_id]['email'], 'removed from', slice['name']
458 # Add new users to slice
459 for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
460 slice.add_person(peer_persons[person_id], commit = False)
461 print >> log, peer['peername'], 'User', peer_persons[person_id]['email'], 'added into', slice['name']
463 # N.B.: Local users that may have been added to the slice
464 # by hand, are not touched.
466 timers['slices'] = time.time() - start
468 # Update peer itself and commit
469 peer.sync(commit = True)