2 # Thierry Parmentelat - INRIA
7 from PLC.Debug import log
8 from PLC.Faults import *
9 from PLC.Method import Method
10 from PLC.Parameter import Parameter, Mixed
11 from PLC.Auth import Auth
13 from PLC.Peers import Peer, Peers
14 from PLC.Sites import Site, Sites
15 from PLC.Persons import Person, Persons
16 from PLC.KeyTypes import KeyType, KeyTypes
17 from PLC.Keys import Key, Keys
18 from PLC.BootStates import BootState, BootStates
19 from PLC.Nodes import Node, Nodes
20 from PLC.SliceInstantiations import SliceInstantiations
21 from PLC.Slices import Slice, Slices
23 class RefreshPeer(Method):
25 Fetches node and slice data from the specified peer and caches it
26 locally; also deletes stale entries. Returns 1 if successful,
34 Mixed(Peer.fields['peer_id'],
35 Peer.fields['peername']),
38 returns = Parameter(int, "1 if successful")
40 def call(self, auth, peer_id_or_peername):
45 peers = Peers(self.api, [peer_id_or_peername])
47 raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
49 peer_id = peer['peer_id']
57 peer_tables = peer.GetPeerData()
58 timers['transport'] = time.time() - start - peer_tables['db_time']
59 timers['peer_db'] = peer_tables['db_time']
62 timers['prepare'] = now-start-timers['peer_db']-timers['transport']
65 def sync(objects, peer_objects, classobj,debug_dict={}):
67 Synchronizes two dictionaries of objects. objects should
68 be a dictionary of local objects keyed on their foreign
69 identifiers. peer_objects should be a dictionary of
70 foreign objects keyed on their local (i.e., foreign to us)
71 identifiers. Returns a final dictionary of local objects
72 keyed on their foreign identifiers.
75 for key in ['delete','sync','process','focus','added','deleted','updated','unchanged','synced','screwed']:
81 # Delete stale objects
82 for peer_object_id, object in objects.iteritems():
83 if peer_object_id not in peer_objects:
84 object.delete(commit = False)
85 print classobj, "object %d deleted" % object[object.primary_key]
86 debug_dict['deleted'] += 1
89 debug_dict['delete']=xnow-xstart
92 # Add/update new/existing objects
93 for peer_object_id, peer_object in peer_objects.iteritems():
96 debug_dict['sync'] += (xnow-xstart)
99 #if peer_object_id in objects:
100 if objects.has_key(peer_object_id):
101 # Update existing object
102 object = objects[peer_object_id]
104 # Replace foreign identifier with existing local
105 # identifier temporarily for the purposes of
107 peer_object[object.primary_key] = object[object.primary_key]
109 # Must use __eq__() instead of == since
110 # peer_object may be a raw dict instead of a Peer
112 if not object.__eq__(peer_object):
113 # Only update intrinsic fields
114 object.update(object.db_fields(peer_object))
117 debug_dict['updated'] += 1
121 debug_dict['unchanged'] += 1
123 # Restore foreign identifier
124 peer_object[object.primary_key] = peer_object_id
127 object = classobj(self.api, peer_object)
128 # Replace foreign identifier with new local identifier
129 del object[object.primary_key]
132 debug_dict['added'] += 1
135 debug_dict['process'] += (xnow-xstart)
140 object.sync(commit = False)
141 debug_dict['synced'] += 1
142 except PLCInvalidArgument, err:
143 # Skip if validation fails
144 # XXX Log an event instead of printing to logfile
145 print >> log, "Warning: Skipping invalid", \
146 peer['peername'], object.__class__.__name__, \
147 ":", peer_object, ":", err
148 debug_dict['screwed'] += 1
151 synced[peer_object_id] = object
154 print >> log, peer['peername'], classobj(self.api).__class__.__name__, object[object.primary_key], dbg
157 debug_dict['sync'] += (xnow-xstart)
163 # Synchronize foreign sites
166 # Compare only the columns returned by the GetPeerData() call
167 if peer_tables['Sites']:
168 columns = peer_tables['Sites'][0].keys()
172 # Keyed on foreign site_id
173 old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
174 sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
176 # Synchronize new set (still keyed on foreign site_id)
177 peer_sites = sync(old_peer_sites, sites_at_peer, Site)
179 for peer_site_id, site in peer_sites.iteritems():
180 # Bind any newly cached sites to peer
181 if peer_site_id not in old_peer_sites:
182 peer.add_site(site, peer_site_id, commit = False)
183 site['peer_id'] = peer_id
184 site['peer_site_id'] = peer_site_id
187 timers['site'] = now - start
191 # XXX Synchronize foreign key types
194 key_types = KeyTypes(self.api).dict()
197 # Synchronize foreign keys
201 # Compare only the columns returned by the GetPeerData() call
202 if peer_tables['Keys']:
203 columns = peer_tables['Keys'][0].keys()
207 # Keyed on foreign key_id
208 old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
209 keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
211 # Fix up key_type references
212 for peer_key_id, key in keys_at_peer.items():
213 if key['key_type'] not in key_types:
214 # XXX Log an event instead of printing to logfile
215 print >> log, "Warning: Skipping invalid %s key:" % peer['peername'], \
216 key, ": invalid key type", key['key_type']
217 del keys_at_peer[peer_key_id]
220 # Synchronize new set (still keyed on foreign key_id)
221 peer_keys = sync(old_peer_keys, keys_at_peer, Key)
222 for peer_key_id, key in peer_keys.iteritems():
223 # Bind any newly cached keys to peer
224 if peer_key_id not in old_peer_keys:
225 peer.add_key(key, peer_key_id, commit = False)
226 key['peer_id'] = peer_id
227 key['peer_key_id'] = peer_key_id
229 timers['keys'] = time.time() - start
232 # Synchronize foreign users
238 # Compare only the columns returned by the GetPeerData() call
239 if peer_tables['Persons']:
240 columns = peer_tables['Persons'][0].keys()
244 # Keyed on foreign person_id
245 old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
246 persons_at_peer = dict([(peer_person['person_id'], peer_person) \
247 for peer_person in peer_tables['Persons']])
250 timers [ 'persons-1' ] = now - substart
253 # XXX Do we care about membership in foreign site(s)?
255 # Synchronize new set (still keyed on foreign person_id)
257 peer_persons = sync(old_peer_persons, persons_at_peer, Person,yyy)
259 timers[ 'persons-'+key ] = yyy[key]
262 timers [ 'persons-2' ] = now - substart
266 for key in ['persons-31','persons-32','persons-33','persons-34','persons-35','persons-36','person3-added']:
269 # allows to retrieve local_key_id from a peer_key_id, if any
270 peer_key_id_from_local_key_id = dict( \
271 [ (key['key_id'],peer_key_id) for (peer_key_id,key) in peer_keys.items()])
273 for peer_person_id, person in peer_persons.iteritems():
276 timers [ 'persons-36' ] += (now - subsubstart)
279 # Bind any newly cached users to peer
280 if peer_person_id not in old_peer_persons:
281 peer.add_person(person, peer_person_id, commit = False)
282 person['peer_id'] = peer_id
283 person['peer_person_id'] = peer_person_id
284 person['key_ids'] = []
285 timers['person3-added'] += 1
288 timers [ 'persons-31' ] += (now - subsubstart)
291 # User as viewed by peer
292 peer_person = persons_at_peer[peer_person_id]
294 # Foreign keys currently belonging to the user
295 old_person_keys = dict(filter(lambda (peer_key_id, key): \
296 key['key_id'] in person['key_ids'],
298 print 'old_person_keys',old_person_keys.keys()
300 old_person_key_ids_set = set(\
301 [ peer_key_id_from_local_key_id[local_key_id] for local_key_id in person['key_ids']])
302 print 'old_person_keys_set',old_person_key_ids_set
306 timers [ 'persons-33' ] += (now - subsubstart)
309 # Foreign keys that should belong to the user
310 person_keys = dict(filter(lambda (peer_key_id, key): \
311 peer_key_id in peer_person['key_ids'],
313 print 'person_keys',person_keys.keys()
315 person_keys_new = dict( [ (peer_key_id,peer_keys[peer_key_id]) \
316 for peer_key_id in peer_person['key_ids'] ])
317 print 'person_keys_new',person_keys_new.keys()
321 timers [ 'persons-34' ] += (now - subsubstart)
324 # Remove stale keys from user
325 for peer_key_id in (set(old_person_keys.keys()) - set(person_keys.keys())):
326 # for peer_key_id in (old_person_key_ids_set - set(person_keys.keys())):
327 person.remove_key(old_person_keys[peer_key_id], commit = False)
330 timers [ 'persons-35' ] += (now - subsubstart)
333 # Add new keys to user
334 for peer_key_id in (set(person_keys.keys()) - set(old_person_keys.keys())):
335 # for peer_key_id in (set(person_keys.keys()) - old_person_key_ids_set):
336 person.add_key(person_keys[peer_key_id], commit = False)
339 timers [ 'persons-36' ] += (now - subsubstart)
343 timers [ 'persons-3' ] = now - substart
346 timers['persons'] = time.time() - start
349 # XXX Synchronize foreign boot states
354 boot_states = BootStates(self.api).dict()
357 # Synchronize foreign nodes
361 # Compare only the columns returned by the GetPeerData() call
362 if peer_tables['Nodes']:
363 columns = peer_tables['Nodes'][0].keys()
367 # Keyed on foreign node_id
368 old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
369 nodes_at_peer = dict([(node['node_id'], node) \
370 for node in peer_tables['Nodes']])
372 # Fix up site_id and boot_states references
373 for peer_node_id, node in nodes_at_peer.items():
375 if node['site_id'] not in peer_sites:
376 errors.append("invalid site %d" % node['site_id'])
377 if node['boot_state'] not in boot_states:
378 errors.append("invalid boot state %s" % node['boot_state'])
380 # XXX Log an event instead of printing to logfile
381 print >> log, "Warning: Skipping invalid %s node:" % peer['peername'], \
382 node, ":", ", ".join(errors)
383 del nodes_at_peer[peer_node_id]
386 node['site_id'] = peer_sites[node['site_id']]['site_id']
388 # Synchronize new set
389 peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
391 for peer_node_id, node in peer_nodes.iteritems():
392 # Bind any newly cached foreign nodes to peer
393 if peer_node_id not in old_peer_nodes:
394 peer.add_node(node, peer_node_id, commit = False)
395 node['peer_id'] = peer_id
396 node['peer_node_id'] = peer_node_id
398 timers['nodes'] = time.time() - start
401 # Synchronize local nodes
406 # Keyed on local node_id
407 local_nodes = Nodes(self.api).dict()
409 for node in peer_tables['PeerNodes']:
410 # Foreign identifier for our node as maintained by peer
411 peer_node_id = node['node_id']
412 # Local identifier for our node as cached by peer
413 node_id = node['peer_node_id']
414 if node_id in local_nodes:
415 # Still a valid local node, add it to the synchronized
416 # set of local node objects keyed on foreign node_id.
417 peer_nodes[peer_node_id] = local_nodes[node_id]
419 timers['local_nodes'] = time.time() - start
422 # XXX Synchronize foreign slice instantiation states
427 slice_instantiations = SliceInstantiations(self.api).dict()
430 # Synchronize foreign slices
433 # Compare only the columns returned by the GetPeerData() call
434 if peer_tables['Slices']:
435 columns = peer_tables['Slices'][0].keys()
439 # Keyed on foreign slice_id
440 old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
441 slices_at_peer = dict([(slice['slice_id'], slice) \
442 for slice in peer_tables['Slices']])
444 # Fix up site_id, instantiation, and creator_person_id references
445 for peer_slice_id, slice in slices_at_peer.items():
447 if slice['site_id'] not in peer_sites:
448 errors.append("invalid site %d" % slice['site_id'])
449 if slice['instantiation'] not in slice_instantiations:
450 errors.append("invalid instantiation %s" % slice['instantiation'])
451 if slice['creator_person_id'] not in peer_persons:
453 slice['creator_person_id'] = None
455 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
457 print >> log, "Warning: Skipping invalid %s slice:" % peer['peername'], \
458 slice, ":", ", ".join(errors)
459 del slices_at_peer[peer_slice_id]
462 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
464 # Synchronize new set
465 peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
467 for peer_slice_id, slice in peer_slices.iteritems():
468 # Bind any newly cached foreign slices to peer
469 if peer_slice_id not in old_peer_slices:
470 peer.add_slice(slice, peer_slice_id, commit = False)
471 slice['peer_id'] = peer_id
472 slice['peer_slice_id'] = peer_slice_id
473 slice['node_ids'] = []
474 slice['person_ids'] = []
476 # Slice as viewed by peer
477 peer_slice = slices_at_peer[peer_slice_id]
479 # Nodes that are currently part of the slice
480 old_slice_nodes = dict(filter(lambda (peer_node_id, node): \
481 node['node_id'] in slice['node_ids'],
484 # Nodes that should be part of the slice
485 slice_nodes = dict(filter(lambda (peer_node_id, node): \
486 peer_node_id in peer_slice['node_ids'],
489 # Remove stale nodes from slice
490 for node_id in (set(old_slice_nodes.keys()) - set(slice_nodes.keys())):
491 slice.remove_node(old_slice_nodes[node_id], commit = False)
493 # Add new nodes to slice
494 for node_id in (set(slice_nodes.keys()) - set(old_slice_nodes.keys())):
495 slice.add_node(slice_nodes[node_id], commit = False)
497 # N.B.: Local nodes that may have been added to the slice
498 # by hand, are removed. In other words, don't do this.
500 # Foreign users that are currently part of the slice
501 old_slice_persons = dict(filter(lambda (peer_person_id, person): \
502 person['person_id'] in slice['person_ids'],
503 peer_persons.items()))
505 # Foreign users that should be part of the slice
506 slice_persons = dict(filter(lambda (peer_person_id, person): \
507 peer_person_id in peer_slice['person_ids'],
508 peer_persons.items()))
510 # Remove stale users from slice
511 for peer_person_id in (set(old_slice_persons.keys()) - set(slice_persons.keys())):
512 slice.remove_person(old_slice_persons[peer_person_id], commit = False)
514 # Add new users to slice
515 for peer_person_id in (set(slice_persons.keys()) - set(old_slice_persons.keys())):
516 slice.add_person(slice_persons[peer_person_id], commit = False)
518 # N.B.: Local users that may have been added to the slice
519 # by hand, are not touched.
521 timers['slices'] = time.time() - start
524 # Update peer itself and commit
525 peer.sync(commit = True)
527 timers['sync'] = time.time() - start