2 # Thierry Parmentelat - INRIA
7 from PLC.Debug import log
8 from PLC.Faults import *
9 from PLC.Method import Method
10 from PLC.Parameter import Parameter, Mixed
11 from PLC.Auth import Auth
13 from PLC.Peers import Peer, Peers
14 from PLC.Sites import Site, Sites
15 from PLC.Persons import Person, Persons
16 from PLC.KeyTypes import KeyType, KeyTypes
17 from PLC.Keys import Key, Keys
18 from PLC.BootStates import BootState, BootStates
19 from PLC.Nodes import Node, Nodes
20 from PLC.SliceInstantiations import SliceInstantiations
21 from PLC.Slices import Slice, Slices
23 class RefreshPeer(Method):
25 Fetches node and slice data from the specified peer and caches it
26 locally; also deletes stale entries. Returns 1 if successful,
34 Mixed(Peer.fields['peer_id'],
35 Peer.fields['peername']),
38 returns = Parameter(int, "1 if successful")
40 def call(self, auth, peer_id_or_peername):
42 peers = Peers(self.api, [peer_id_or_peername])
44 raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
46 peer_id = peer['peer_id']
55 peer_tables = peer.GetPeerData()
56 timers['transport'] = time.time() - start - peer_tables['db_time']
57 timers['peer_db'] = peer_tables['db_time']
59 def sync(objects, peer_objects, classobj):
61 Synchronizes two dictionaries of objects. objects should
62 be a dictionary of local objects keyed on their foreign
63 identifiers. peer_objects should be a dictionary of
64 foreign objects keyed on their local (i.e., foreign to us)
65 identifiers. Returns a final dictionary of local objects
66 keyed on their foreign identifiers.
71 # Delete stale objects
72 for peer_object_id, object in objects.iteritems():
73 if peer_object_id not in peer_objects:
74 object.delete(commit = False)
75 print classobj(self.api).__class__.__name__, "object %s deleted" % object[object.class_key]
77 # Add/update new/existing objects
78 for peer_object_id, peer_object in peer_objects.iteritems():
79 if peer_object_id in objects:
80 # Update existing object
81 object = objects[peer_object_id]
83 # Replace foreign identifier with existing local
84 # identifier temporarily for the purposes of
86 peer_object[object.primary_key] = object[object.primary_key]
88 # Must use __eq__() instead of == since
89 # peer_object may be a raw dict instead of a Peer
91 if not object.__eq__(peer_object):
92 # Only update intrinsic fields
93 object.update(object.db_fields(peer_object))
100 # Restore foreign identifier
101 peer_object[object.primary_key] = peer_object_id
104 object = classobj(self.api, peer_object)
105 # Replace foreign identifier with new local identifier
106 del object[object.primary_key]
112 object.sync(commit = False)
113 except PLCInvalidArgument, err:
114 # Skip if validation fails
115 # XXX Log an event instead of printing to logfile
116 print >> log, "Warning: Skipping invalid", \
117 peer['peername'], object.__class__.__name__, \
118 ":", peer_object, ":", err
121 synced[peer_object_id] = object
124 print >> log, peer['peername'], classobj(self.api).__class__.__name__, \
125 object[object.class_key], object[object.primary_key], dbg
130 # Synchronize foreign sites
135 # Compare only the columns returned by the GetPeerData() call
136 if peer_tables['Sites']:
137 columns = peer_tables['Sites'][0].keys()
141 # Keyed on foreign site_id
142 old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
143 sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
145 # Synchronize new set (still keyed on foreign site_id)
146 peer_sites = sync(old_peer_sites, sites_at_peer, Site)
148 for peer_site_id, site in peer_sites.iteritems():
149 # Bind any newly cached sites to peer
150 if peer_site_id not in old_peer_sites:
151 peer.add_site(site, peer_site_id, commit = False)
152 site['peer_id'] = peer_id
153 site['peer_site_id'] = peer_site_id
155 timers['site'] = time.time() - start
158 # XXX Synchronize foreign key types
161 key_types = KeyTypes(self.api).dict()
164 # Synchronize foreign keys
169 # Compare only the columns returned by the GetPeerData() call
170 if peer_tables['Keys']:
171 columns = peer_tables['Keys'][0].keys()
175 # Keyed on foreign key_id
176 old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
177 keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
179 # Fix up key_type references
180 for peer_key_id, key in keys_at_peer.items():
181 if key['key_type'] not in key_types:
182 # XXX Log an event instead of printing to logfile
183 print >> log, "Warning: Skipping invalid %s key:" % peer['peername'], \
184 key, ": invalid key type", key['key_type']
185 del keys_at_peer[peer_key_id]
188 # Synchronize new set (still keyed on foreign key_id)
189 peer_keys = sync(old_peer_keys, keys_at_peer, Key)
190 for peer_key_id, key in peer_keys.iteritems():
191 # Bind any newly cached keys to peer
192 if peer_key_id not in old_peer_keys:
193 peer.add_key(key, peer_key_id, commit = False)
194 key['peer_id'] = peer_id
195 key['peer_key_id'] = peer_key_id
197 timers['keys'] = time.time() - start
200 # Synchronize foreign users
205 # Compare only the columns returned by the GetPeerData() call
206 if peer_tables['Persons']:
207 columns = peer_tables['Persons'][0].keys()
211 # Keyed on foreign person_id
212 old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
213 persons_at_peer = dict([(peer_person['person_id'], peer_person) \
214 for peer_person in peer_tables['Persons']])
216 # XXX Do we care about membership in foreign site(s)?
218 # Synchronize new set (still keyed on foreign person_id)
219 peer_persons = sync(old_peer_persons, persons_at_peer, Person)
221 # transcoder : retrieve a local key_id from a peer_key_id
222 key_transcoder = dict ( [ (key['key_id'],key['peer_key_id']) for key in peer_keys.values()])
224 for peer_person_id, person in peer_persons.iteritems():
225 # Bind any newly cached users to peer
226 if peer_person_id not in old_peer_persons:
227 peer.add_person(person, peer_person_id, commit = False)
228 person['peer_id'] = peer_id
229 person['peer_person_id'] = peer_person_id
230 person['key_ids'] = []
232 # User as viewed by peer
233 peer_person = persons_at_peer[peer_person_id]
235 # Foreign keys currently belonging to the user
236 old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
237 if key_transcoder[key_id] in peer_keys]
239 # Foreign keys that should belong to the user
240 # this is basically peer_person['key_ids'], we just check it makes sense
241 # (e.g. we might have failed importing it)
242 person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
244 # Remove stale keys from user
245 for key_id in (set(old_person_key_ids) - set(person_key_ids)):
246 person.remove_key(peer_keys[key_id], commit = False)
247 print >> log, peer['peername'], 'Key', key_id, 'removed from', person['email']
249 # Add new keys to user
250 for key_id in (set(person_key_ids) - set(old_person_key_ids)):
251 person.add_key(peer_keys[key_id], commit = False)
252 print >> log, peer['peername'], 'Key', key_id, 'added into', person['email']
254 timers['persons'] = time.time() - start
257 # XXX Synchronize foreign boot states
260 boot_states = BootStates(self.api).dict()
263 # Synchronize foreign nodes
268 # Compare only the columns returned by the GetPeerData() call
269 if peer_tables['Nodes']:
270 columns = peer_tables['Nodes'][0].keys()
274 # Keyed on foreign node_id
275 old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
276 nodes_at_peer = dict([(node['node_id'], node) \
277 for node in peer_tables['Nodes']])
279 # Fix up site_id and boot_states references
280 for peer_node_id, node in nodes_at_peer.items():
282 if node['site_id'] not in peer_sites:
283 errors.append("invalid site %d" % node['site_id'])
284 if node['boot_state'] not in boot_states:
285 errors.append("invalid boot state %s" % node['boot_state'])
287 # XXX Log an event instead of printing to logfile
288 print >> log, "Warning: Skipping invalid %s node:" % peer['peername'], \
289 node, ":", ", ".join(errors)
290 del nodes_at_peer[peer_node_id]
293 node['site_id'] = peer_sites[node['site_id']]['site_id']
295 # Synchronize new set
296 peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
298 for peer_node_id, node in peer_nodes.iteritems():
299 # Bind any newly cached foreign nodes to peer
300 if peer_node_id not in old_peer_nodes:
301 peer.add_node(node, peer_node_id, commit = False)
302 node['peer_id'] = peer_id
303 node['peer_node_id'] = peer_node_id
305 timers['nodes'] = time.time() - start
308 # Synchronize local nodes
313 # Keyed on local node_id
314 local_nodes = Nodes(self.api).dict()
316 for node in peer_tables['PeerNodes']:
317 # Foreign identifier for our node as maintained by peer
318 peer_node_id = node['node_id']
319 # Local identifier for our node as cached by peer
320 node_id = node['peer_node_id']
321 if node_id in local_nodes:
322 # Still a valid local node, add it to the synchronized
323 # set of local node objects keyed on foreign node_id.
324 peer_nodes[peer_node_id] = local_nodes[node_id]
326 timers['local_nodes'] = time.time() - start
329 # XXX Synchronize foreign slice instantiation states
332 slice_instantiations = SliceInstantiations(self.api).dict()
335 # Synchronize foreign slices
340 # Compare only the columns returned by the GetPeerData() call
341 if peer_tables['Slices']:
342 columns = peer_tables['Slices'][0].keys()
346 # Keyed on foreign slice_id
347 old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
348 slices_at_peer = dict([(slice['slice_id'], slice) \
349 for slice in peer_tables['Slices']])
351 # Fix up site_id, instantiation, and creator_person_id references
352 for peer_slice_id, slice in slices_at_peer.items():
354 if slice['site_id'] not in peer_sites:
355 errors.append("invalid site %d" % slice['site_id'])
356 if slice['instantiation'] not in slice_instantiations:
357 errors.append("invalid instantiation %s" % slice['instantiation'])
358 if slice['creator_person_id'] not in peer_persons:
360 slice['creator_person_id'] = None
362 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
364 print >> log, "Warning: Skipping invalid %s slice:" % peer['peername'], \
365 slice, ":", ", ".join(errors)
366 del slices_at_peer[peer_slice_id]
369 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
371 # Synchronize new set
372 peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
374 # transcoder : retrieve a local node_id from a peer_node_id
375 node_transcoder = dict ( [ (node['node_id'],node['peer_node_id']) for node in peer_nodes.values()])
376 person_transcoder = dict ( [ (person['person_id'],person['peer_person_id']) for person in peer_persons.values()])
378 for peer_slice_id, slice in peer_slices.iteritems():
379 # Bind any newly cached foreign slices to peer
380 if peer_slice_id not in old_peer_slices:
381 peer.add_slice(slice, peer_slice_id, commit = False)
382 slice['peer_id'] = peer_id
383 slice['peer_slice_id'] = peer_slice_id
384 slice['node_ids'] = []
385 slice['person_ids'] = []
387 # Slice as viewed by peer
388 peer_slice = slices_at_peer[peer_slice_id]
390 # Nodes that are currently part of the slice
391 old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
392 if node_transcoder[node_id] in peer_nodes]
394 # Nodes that should be part of the slice
395 slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
397 # Remove stale nodes from slice
398 for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
399 slice.remove_node(peer_nodes[node_id], commit = False)
400 print >> log, peer['peername'], 'Node', peer_nodes[node_id]['hostname'], 'removed from', slice['name']
402 # Add new nodes to slice
403 for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
404 slice.add_node(peer_nodes[node_id], commit = False)
405 print >> log, peer['peername'], 'Node', peer_nodes[node_id]['hostname'], 'added into', slice['name']
407 # N.B.: Local nodes that may have been added to the slice
408 # by hand, are removed. In other words, don't do this.
410 # Foreign users that are currently part of the slice
411 old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
412 if person_transcoder[person_id] in peer_persons]
414 # Foreign users that should be part of the slice
415 slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
417 # Remove stale users from slice
418 for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
419 slice.remove_person(peer_persons[person_id], commit = False)
420 print >> log, peer['peername'], 'User', peer_persons[person_id]['email'], 'removed from', slice['name']
422 # Add new users to slice
423 for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
424 slice.add_person(peer_persons[person_id], commit = False)
425 print >> log, peer['peername'], 'User', peer_persons[person_id]['email'], 'added into', slice['name']
427 # N.B.: Local users that may have been added to the slice
428 # by hand, are not touched.
430 timers['slices'] = time.time() - start
432 # Update peer itself and commit
433 peer.sync(commit = True)