2 # Thierry Parmentelat - INRIA
7 from PLC.Debug import log
8 from PLC.Faults import *
9 from PLC.Method import Method
10 from PLC.Parameter import Parameter, Mixed
11 from PLC.Auth import Auth
13 from PLC.Peers import Peer, Peers
14 from PLC.Sites import Site, Sites
15 from PLC.Persons import Person, Persons
16 from PLC.KeyTypes import KeyType, KeyTypes
17 from PLC.Keys import Key, Keys
18 from PLC.BootStates import BootState, BootStates
19 from PLC.Nodes import Node, Nodes
20 from PLC.SliceInstantiations import SliceInstantiations
21 from PLC.Slices import Slice, Slices
23 class RefreshPeer(Method):
25 Fetches node and slice data from the specified peer and caches it
26 locally; also deletes stale entries. Returns 1 if successful,
34 Mixed(Peer.fields['peer_id'],
35 Peer.fields['peername']),
38 returns = Parameter(int, "1 if successful")
40 def call(self, auth, peer_id_or_peername):
42 peers = Peers(self.api, [peer_id_or_peername])
44 raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
46 peer_id = peer['peer_id']
55 peer_tables = peer.GetPeerData()
56 timers['transport'] = time.time() - start - peer_tables['db_time']
57 timers['peer_db'] = peer_tables['db_time']
59 def sync(objects, peer_objects, classobj):
61 Synchronizes two dictionaries of objects. objects should
62 be a dictionary of local objects keyed on their foreign
63 identifiers. peer_objects should be a dictionary of
64 foreign objects keyed on their local (i.e., foreign to us)
65 identifiers. Returns a final dictionary of local objects
66 keyed on their foreign identifiers.
71 # Delete stale objects
72 for peer_object_id, object in objects.iteritems():
73 if peer_object_id not in peer_objects:
74 object.delete(commit = False)
75 print classobj(self.api).__class__.__name__, "object %s deleted" % object[object.class_key]
77 # Add/update new/existing objects
78 for peer_object_id, peer_object in peer_objects.iteritems():
79 if peer_object_id in objects:
80 # Update existing object
81 object = objects[peer_object_id]
83 # Replace foreign identifier with existing local
84 # identifier temporarily for the purposes of
86 peer_object[object.primary_key] = object[object.primary_key]
88 # Must use __eq__() instead of == since
89 # peer_object may be a raw dict instead of a Peer
91 if not object.__eq__(peer_object):
92 # Only update intrinsic fields
93 object.update(object.db_fields(peer_object))
100 # Restore foreign identifier
101 peer_object[object.primary_key] = peer_object_id
104 object = classobj(self.api, peer_object)
105 # Replace foreign identifier with new local identifier
106 del object[object.primary_key]
112 object.sync(commit = False)
113 except PLCInvalidArgument, err:
114 # Skip if validation fails
115 # XXX Log an event instead of printing to logfile
116 print >> log, "Warning: Skipping invalid", \
117 peer['peername'], object.__class__.__name__, \
118 ":", peer_object, ":", err
121 synced[peer_object_id] = object
124 print >> log, peer['peername'], classobj(self.api).__class__.__name__, \
125 object[object.class_key], object[object.primary_key], dbg
130 # Synchronize foreign sites
135 # Compare only the columns returned by the GetPeerData() call
136 if peer_tables['Sites']:
137 columns = peer_tables['Sites'][0].keys()
141 # Keyed on foreign site_id
142 old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
143 sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
145 # Synchronize new set (still keyed on foreign site_id)
146 peer_sites = sync(old_peer_sites, sites_at_peer, Site)
148 for peer_site_id, site in peer_sites.iteritems():
149 # Bind any newly cached sites to peer
150 if peer_site_id not in old_peer_sites:
151 peer.add_site(site, peer_site_id, commit = False)
152 site['peer_id'] = peer_id
153 site['peer_site_id'] = peer_site_id
155 timers['site'] = time.time() - start
158 # XXX Synchronize foreign key types
161 key_types = KeyTypes(self.api).dict()
164 # Synchronize foreign keys
169 # Compare only the columns returned by the GetPeerData() call
170 if peer_tables['Keys']:
171 columns = peer_tables['Keys'][0].keys()
175 # Keyed on foreign key_id
176 old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
177 keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
179 # Fix up key_type references
180 for peer_key_id, key in keys_at_peer.items():
181 if key['key_type'] not in key_types:
182 # XXX Log an event instead of printing to logfile
183 print >> log, "Warning: Skipping invalid %s key:" % peer['peername'], \
184 key, ": invalid key type", key['key_type']
185 del keys_at_peer[peer_key_id]
188 # Synchronize new set (still keyed on foreign key_id)
189 peer_keys = sync(old_peer_keys, keys_at_peer, Key)
190 for peer_key_id, key in peer_keys.iteritems():
191 # Bind any newly cached keys to peer
192 if peer_key_id not in old_peer_keys:
193 peer.add_key(key, peer_key_id, commit = False)
194 key['peer_id'] = peer_id
195 key['peer_key_id'] = peer_key_id
197 timers['keys'] = time.time() - start
200 # Synchronize foreign users
205 # Compare only the columns returned by the GetPeerData() call
206 if peer_tables['Persons']:
207 columns = peer_tables['Persons'][0].keys()
211 # Keyed on foreign person_id
212 old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
213 persons_at_peer = dict([(peer_person['person_id'], peer_person) \
214 for peer_person in peer_tables['Persons']])
216 # XXX Do we care about membership in foreign site(s)?
218 # Synchronize new set (still keyed on foreign person_id)
219 peer_persons = sync(old_peer_persons, persons_at_peer, Person)
221 # transcoder : retrieve a local key_id from a peer_key_id
222 key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
223 for peer_key_id,key in peer_keys.iteritems()])
225 for peer_person_id, person in peer_persons.iteritems():
226 # Bind any newly cached users to peer
227 if peer_person_id not in old_peer_persons:
228 peer.add_person(person, peer_person_id, commit = False)
229 person['peer_id'] = peer_id
230 person['peer_person_id'] = peer_person_id
231 person['key_ids'] = []
233 # User as viewed by peer
234 peer_person = persons_at_peer[peer_person_id]
236 # Foreign keys currently belonging to the user
237 old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
238 if key_transcoder[key_id] in peer_keys]
240 # Foreign keys that should belong to the user
241 # this is basically peer_person['key_ids'], we just check it makes sense
242 # (e.g. we might have failed importing it)
243 person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
245 # Remove stale keys from user
246 for key_id in (set(old_person_key_ids) - set(person_key_ids)):
247 person.remove_key(peer_keys[key_id], commit = False)
248 print >> log, peer['peername'], 'Key', key_id, 'removed from', person['email']
250 # Add new keys to user
251 for key_id in (set(person_key_ids) - set(old_person_key_ids)):
252 person.add_key(peer_keys[key_id], commit = False)
253 print >> log, peer['peername'], 'Key', key_id, 'added into', person['email']
255 timers['persons'] = time.time() - start
258 # XXX Synchronize foreign boot states
261 boot_states = BootStates(self.api).dict()
264 # Synchronize foreign nodes
269 # Compare only the columns returned by the GetPeerData() call
270 if peer_tables['Nodes']:
271 columns = peer_tables['Nodes'][0].keys()
275 # Keyed on foreign node_id
276 old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
277 nodes_at_peer = dict([(node['node_id'], node) \
278 for node in peer_tables['Nodes']])
280 # Fix up site_id and boot_states references
281 for peer_node_id, node in nodes_at_peer.items():
283 if node['site_id'] not in peer_sites:
284 errors.append("invalid site %d" % node['site_id'])
285 if node['boot_state'] not in boot_states:
286 errors.append("invalid boot state %s" % node['boot_state'])
288 # XXX Log an event instead of printing to logfile
289 print >> log, "Warning: Skipping invalid %s node:" % peer['peername'], \
290 node, ":", ", ".join(errors)
291 del nodes_at_peer[peer_node_id]
294 node['site_id'] = peer_sites[node['site_id']]['site_id']
296 # Synchronize new set
297 peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
299 for peer_node_id, node in peer_nodes.iteritems():
300 # Bind any newly cached foreign nodes to peer
301 if peer_node_id not in old_peer_nodes:
302 peer.add_node(node, peer_node_id, commit = False)
303 node['peer_id'] = peer_id
304 node['peer_node_id'] = peer_node_id
306 timers['nodes'] = time.time() - start
309 # Synchronize local nodes
314 # Keyed on local node_id
315 local_nodes = Nodes(self.api).dict()
317 for node in peer_tables['PeerNodes']:
318 # Foreign identifier for our node as maintained by peer
319 peer_node_id = node['node_id']
320 # Local identifier for our node as cached by peer
321 node_id = node['peer_node_id']
322 if node_id in local_nodes:
323 # Still a valid local node, add it to the synchronized
324 # set of local node objects keyed on foreign node_id.
325 peer_nodes[peer_node_id] = local_nodes[node_id]
327 timers['local_nodes'] = time.time() - start
330 # XXX Synchronize foreign slice instantiation states
333 slice_instantiations = SliceInstantiations(self.api).dict()
336 # Synchronize foreign slices
341 # Compare only the columns returned by the GetPeerData() call
342 if peer_tables['Slices']:
343 columns = peer_tables['Slices'][0].keys()
347 # Keyed on foreign slice_id
348 old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
349 slices_at_peer = dict([(slice['slice_id'], slice) \
350 for slice in peer_tables['Slices']])
352 # Fix up site_id, instantiation, and creator_person_id references
353 for peer_slice_id, slice in slices_at_peer.items():
355 if slice['site_id'] not in peer_sites:
356 errors.append("invalid site %d" % slice['site_id'])
357 if slice['instantiation'] not in slice_instantiations:
358 errors.append("invalid instantiation %s" % slice['instantiation'])
359 if slice['creator_person_id'] not in peer_persons:
361 slice['creator_person_id'] = None
363 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
365 print >> log, "Warning: Skipping invalid %s slice:" % peer['peername'], \
366 slice, ":", ", ".join(errors)
367 del slices_at_peer[peer_slice_id]
370 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
372 # Synchronize new set
373 peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
375 # transcoder : retrieve a local node_id from a peer_node_id
376 node_transcoder = dict ( [ (node['node_id'],peer_node_id) \
377 for peer_node_id,node in peer_nodes.iteritems()])
378 person_transcoder = dict ( [ (person['person_id'],peer_person_id) \
379 for peer_person_id,person in peer_persons.iteritems()])
381 for peer_slice_id, slice in peer_slices.iteritems():
382 # Bind any newly cached foreign slices to peer
383 if peer_slice_id not in old_peer_slices:
384 peer.add_slice(slice, peer_slice_id, commit = False)
385 slice['peer_id'] = peer_id
386 slice['peer_slice_id'] = peer_slice_id
387 slice['node_ids'] = []
388 slice['person_ids'] = []
390 # Slice as viewed by peer
391 peer_slice = slices_at_peer[peer_slice_id]
393 # Nodes that are currently part of the slice
394 old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
395 if node_transcoder[node_id] in peer_nodes]
397 # Nodes that should be part of the slice
398 slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
400 # Remove stale nodes from slice
401 for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
402 slice.remove_node(peer_nodes[node_id], commit = False)
403 print >> log, peer['peername'], 'Node', peer_nodes[node_id]['hostname'], 'removed from', slice['name']
405 # Add new nodes to slice
406 for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
407 slice.add_node(peer_nodes[node_id], commit = False)
408 print >> log, peer['peername'], 'Node', peer_nodes[node_id]['hostname'], 'added into', slice['name']
410 # N.B.: Local nodes that may have been added to the slice
411 # by hand, are removed. In other words, don't do this.
413 # Foreign users that are currently part of the slice
414 old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
415 if person_transcoder[person_id] in peer_persons]
417 # Foreign users that should be part of the slice
418 slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
420 # Remove stale users from slice
421 for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
422 slice.remove_person(peer_persons[person_id], commit = False)
423 print >> log, peer['peername'], 'User', peer_persons[person_id]['email'], 'removed from', slice['name']
425 # Add new users to slice
426 for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
427 slice.add_person(peer_persons[person_id], commit = False)
428 print >> log, peer['peername'], 'User', peer_persons[person_id]['email'], 'added into', slice['name']
430 # N.B.: Local users that may have been added to the slice
431 # by hand, are not touched.
433 timers['slices'] = time.time() - start
435 # Update peer itself and commit
436 peer.sync(commit = True)