2 # Thierry Parmentelat - INRIA
7 from PLC.Debug import log
8 from PLC.Faults import *
9 from PLC.Method import Method
10 from PLC.Parameter import Parameter, Mixed
11 from PLC.Auth import Auth
13 from PLC.Peers import Peer, Peers
14 from PLC.Sites import Site, Sites
15 from PLC.Persons import Person, Persons
16 from PLC.KeyTypes import KeyType, KeyTypes
17 from PLC.Keys import Key, Keys
18 from PLC.BootStates import BootState, BootStates
19 from PLC.Nodes import Node, Nodes
20 from PLC.SliceInstantiations import SliceInstantiations
21 from PLC.Slices import Slice, Slices
23 class RefreshPeer(Method):
25 Fetches node and slice data from the specified peer and caches it
26 locally; also deletes stale entries. Returns 1 if successful,
34 Mixed(Peer.fields['peer_id'],
35 Peer.fields['peername']),
38 returns = Parameter(int, "1 if successful")
40 def call(self, auth, peer_id_or_peername):
42 peers = Peers(self.api, [peer_id_or_peername])
44 raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
46 peer_id = peer['peer_id']
55 peer_tables = peer.GetPeerData()
56 timers['transport'] = time.time() - start - peer_tables['db_time']
57 timers['peer_db'] = peer_tables['db_time']
59 def sync(objects, peer_objects, classobj):
61 Synchronizes two dictionaries of objects. objects should
62 be a dictionary of local objects keyed on their foreign
63 identifiers. peer_objects should be a dictionary of
64 foreign objects keyed on their local (i.e., foreign to us)
65 identifiers. Returns a final dictionary of local objects
66 keyed on their foreign identifiers.
71 # Delete stale objects
72 for peer_object_id, object in objects.iteritems():
73 if peer_object_id not in peer_objects:
74 object.delete(commit = False)
75 print classobj, "object %d deleted" % object[object.primary_key]
77 # Add/update new/existing objects
78 for peer_object_id, peer_object in peer_objects.iteritems():
79 if peer_object_id in objects:
80 # Update existing object
81 object = objects[peer_object_id]
83 # Replace foreign identifier with existing local
84 # identifier temporarily for the purposes of
86 peer_object[object.primary_key] = object[object.primary_key]
88 # Must use __eq__() instead of == since
89 # peer_object may be a raw dict instead of a Peer
91 if not object.__eq__(peer_object):
92 # Only update intrinsic fields
93 object.update(object.db_fields(peer_object))
100 # Restore foreign identifier
101 peer_object[object.primary_key] = peer_object_id
104 object = classobj(self.api, peer_object)
105 # Replace foreign identifier with new local identifier
106 del object[object.primary_key]
112 object.sync(commit = False)
113 except PLCInvalidArgument, err:
114 # Skip if validation fails
115 # XXX Log an event instead of printing to logfile
116 print >> log, "Warning: Skipping invalid", \
117 peer['peername'], object.__class__.__name__, \
118 ":", peer_object, ":", err
121 synced[peer_object_id] = object
124 print >> log, peer['peername'], classobj(self.api).__class__.__name__, \
125 object[object.class_key], object[object.primary_key], dbg
130 # Synchronize foreign sites
135 # Compare only the columns returned by the GetPeerData() call
136 if peer_tables['Sites']:
137 columns = peer_tables['Sites'][0].keys()
141 # Keyed on foreign site_id
142 old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
143 sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
145 # Synchronize new set (still keyed on foreign site_id)
146 peer_sites = sync(old_peer_sites, sites_at_peer, Site)
148 for peer_site_id, site in peer_sites.iteritems():
149 # Bind any newly cached sites to peer
150 if peer_site_id not in old_peer_sites:
151 peer.add_site(site, peer_site_id, commit = False)
152 site['peer_id'] = peer_id
153 site['peer_site_id'] = peer_site_id
155 timers['site'] = time.time() - start
158 # XXX Synchronize foreign key types
161 key_types = KeyTypes(self.api).dict()
164 # Synchronize foreign keys
169 # Compare only the columns returned by the GetPeerData() call
170 if peer_tables['Keys']:
171 columns = peer_tables['Keys'][0].keys()
175 # Keyed on foreign key_id
176 old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
177 keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
179 # Fix up key_type references
180 for peer_key_id, key in keys_at_peer.items():
181 if key['key_type'] not in key_types:
182 # XXX Log an event instead of printing to logfile
183 print >> log, "Warning: Skipping invalid %s key:" % peer['peername'], \
184 key, ": invalid key type", key['key_type']
185 del keys_at_peer[peer_key_id]
188 # Synchronize new set (still keyed on foreign key_id)
189 peer_keys = sync(old_peer_keys, keys_at_peer, Key)
190 for peer_key_id, key in peer_keys.iteritems():
191 # Bind any newly cached keys to peer
192 if peer_key_id not in old_peer_keys:
193 peer.add_key(key, peer_key_id, commit = False)
194 key['peer_id'] = peer_id
195 key['peer_key_id'] = peer_key_id
197 timers['keys'] = time.time() - start
200 # Synchronize foreign users
205 # Compare only the columns returned by the GetPeerData() call
206 if peer_tables['Persons']:
207 columns = peer_tables['Persons'][0].keys()
211 # Keyed on foreign person_id
212 old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
213 persons_at_peer = dict([(peer_person['person_id'], peer_person) \
214 for peer_person in peer_tables['Persons']])
216 # XXX Do we care about membership in foreign site(s)?
218 # Synchronize new set (still keyed on foreign person_id)
219 peer_persons = sync(old_peer_persons, persons_at_peer, Person)
221 # transcoder : retrieve a local key_id from a peer_key_id
222 key_transcoder = dict ( [ (key['key_id'],key['peer_key_id']) for key in peer_keys.values()])
224 for peer_person_id, person in peer_persons.iteritems():
225 # Bind any newly cached users to peer
226 if peer_person_id not in old_peer_persons:
227 peer.add_person(person, peer_person_id, commit = False)
228 person['peer_id'] = peer_id
229 person['peer_person_id'] = peer_person_id
230 person['key_ids'] = []
232 # User as viewed by peer
233 peer_person = persons_at_peer[peer_person_id]
235 # Foreign keys currently belonging to the user
236 old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids']]
238 # Foreign keys that should belong to the user
239 # this is basically peer_person['key_ids'], we just check it makes sense
240 # (e.g. we might have failed importing it)
241 person_key_ids = [ x for x in peer_person['key_ids'] if x in peer_keys]
243 # Remove stale keys from user
244 for key_id in (set(old_person_key_ids) - set(person_key_ids)):
245 person.remove_key(peer_keys[key_id], commit = False)
246 print >> log, peer['peername'], 'Key', key_id, 'removed from', person['email']
248 # Add new keys to user
249 for key_id in (set(person_key_ids) - set(old_person_key_ids)):
250 person.add_key(peer_keys[key_id], commit = False)
251 print >> log, peer['peername'], 'Key', key_id, 'added into', person['email']
253 timers['persons'] = time.time() - start
256 # XXX Synchronize foreign boot states
259 boot_states = BootStates(self.api).dict()
262 # Synchronize foreign nodes
267 # Compare only the columns returned by the GetPeerData() call
268 if peer_tables['Nodes']:
269 columns = peer_tables['Nodes'][0].keys()
273 # Keyed on foreign node_id
274 old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
275 nodes_at_peer = dict([(node['node_id'], node) \
276 for node in peer_tables['Nodes']])
278 # Fix up site_id and boot_states references
279 for peer_node_id, node in nodes_at_peer.items():
281 if node['site_id'] not in peer_sites:
282 errors.append("invalid site %d" % node['site_id'])
283 if node['boot_state'] not in boot_states:
284 errors.append("invalid boot state %s" % node['boot_state'])
286 # XXX Log an event instead of printing to logfile
287 print >> log, "Warning: Skipping invalid %s node:" % peer['peername'], \
288 node, ":", ", ".join(errors)
289 del nodes_at_peer[peer_node_id]
292 node['site_id'] = peer_sites[node['site_id']]['site_id']
294 # Synchronize new set
295 peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
297 for peer_node_id, node in peer_nodes.iteritems():
298 # Bind any newly cached foreign nodes to peer
299 if peer_node_id not in old_peer_nodes:
300 peer.add_node(node, peer_node_id, commit = False)
301 node['peer_id'] = peer_id
302 node['peer_node_id'] = peer_node_id
304 timers['nodes'] = time.time() - start
307 # Synchronize local nodes
312 # Keyed on local node_id
313 local_nodes = Nodes(self.api).dict()
315 for node in peer_tables['PeerNodes']:
316 # Foreign identifier for our node as maintained by peer
317 peer_node_id = node['node_id']
318 # Local identifier for our node as cached by peer
319 node_id = node['peer_node_id']
320 if node_id in local_nodes:
321 # Still a valid local node, add it to the synchronized
322 # set of local node objects keyed on foreign node_id.
323 peer_nodes[peer_node_id] = local_nodes[node_id]
325 timers['local_nodes'] = time.time() - start
328 # XXX Synchronize foreign slice instantiation states
331 slice_instantiations = SliceInstantiations(self.api).dict()
334 # Synchronize foreign slices
339 # Compare only the columns returned by the GetPeerData() call
340 if peer_tables['Slices']:
341 columns = peer_tables['Slices'][0].keys()
345 # Keyed on foreign slice_id
346 old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
347 slices_at_peer = dict([(slice['slice_id'], slice) \
348 for slice in peer_tables['Slices']])
350 # Fix up site_id, instantiation, and creator_person_id references
351 for peer_slice_id, slice in slices_at_peer.items():
353 if slice['site_id'] not in peer_sites:
354 errors.append("invalid site %d" % slice['site_id'])
355 if slice['instantiation'] not in slice_instantiations:
356 errors.append("invalid instantiation %s" % slice['instantiation'])
357 if slice['creator_person_id'] not in peer_persons:
359 slice['creator_person_id'] = None
361 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
363 print >> log, "Warning: Skipping invalid %s slice:" % peer['peername'], \
364 slice, ":", ", ".join(errors)
365 del slices_at_peer[peer_slice_id]
368 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
370 # Synchronize new set
371 peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
373 # transcoder : retrieve a local node_id from a peer_node_id
374 node_transcoder = dict ( [ (node['node_id'],node['peer_node_id']) for node in peer_nodes.values()])
375 person_transcoder = dict ( [ (person['person_id'],person['peer_person_id']) for person in peer_persons.values()])
377 for peer_slice_id, slice in peer_slices.iteritems():
378 # Bind any newly cached foreign slices to peer
379 if peer_slice_id not in old_peer_slices:
380 peer.add_slice(slice, peer_slice_id, commit = False)
381 slice['peer_id'] = peer_id
382 slice['peer_slice_id'] = peer_slice_id
383 slice['node_ids'] = []
384 slice['person_ids'] = []
386 # Slice as viewed by peer
387 peer_slice = slices_at_peer[peer_slice_id]
389 # Nodes that are currently part of the slice
390 old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids']]
392 # Nodes that should be part of the slice
393 slice_node_ids = [ x for x in peer_slice['node_ids'] if x in peer_nodes]
395 # Remove stale nodes from slice
396 for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
397 slice.remove_node(peer_nodes[node_id], commit = False)
398 print >> log, peer['peername'], 'Node', peer_nodes[node_id]['hostname'], 'removed from', slice['name']
400 # Add new nodes to slice
401 for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
402 slice.add_node(peer_nodes[node_id], commit = False)
403 print >> log, peer['peername'], 'Node', peer_nodes[node_id]['hostname'], 'added into', slice['name']
405 # N.B.: Local nodes that may have been added to the slice
406 # by hand, are removed. In other words, don't do this.
408 # Foreign users that are currently part of the slice
409 old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids']]
411 # Foreign users that should be part of the slice
412 slice_person_ids = [ x for x in peer_slice['person_ids'] if x in peer_persons ]
414 # Remove stale users from slice
415 for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
416 slice.remove_person(peer_persons[person_id], commit = False)
417 print >> log, peer['peername'], 'User', peer_persons[person_id]['email'], 'removed from', slice['name']
419 # Add new users to slice
420 for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
421 slice.add_person(peer_persons[person_id], commit = False)
422 print >> log, peer['peername'], 'User', peer_persons[person_id]['email'], 'added into', slice['name']
424 # N.B.: Local users that may have been added to the slice
425 # by hand, are not touched.
427 timers['slices'] = time.time() - start
429 # Update peer itself and commit
430 peer.sync(commit = True)