2 # Thierry Parmentelat - INRIA
7 from PLC.Debug import log
8 from PLC.Faults import *
9 from PLC.Method import Method
10 from PLC.Parameter import Parameter, Mixed
11 from PLC.Auth import Auth
13 from PLC.Peers import Peer, Peers
14 from PLC.Sites import Site, Sites
15 from PLC.Persons import Person, Persons
16 from PLC.KeyTypes import KeyType, KeyTypes
17 from PLC.Keys import Key, Keys
18 from PLC.BootStates import BootState, BootStates
19 from PLC.Nodes import Node, Nodes
20 from PLC.SliceInstantiations import SliceInstantiations
21 from PLC.Slices import Slice, Slices
23 class RefreshPeer(Method):
25 Fetches node and slice data from the specified peer and caches it
26 locally; also deletes stale entries. Returns 1 if successful,
34 Mixed(Peer.fields['peer_id'],
35 Peer.fields['peername']),
38 returns = Parameter(int, "1 if successful")
40 def call(self, auth, peer_id_or_peername):
42 peers = Peers(self.api, [peer_id_or_peername])
44 raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
46 peer_id = peer['peer_id']
55 peer_tables = peer.GetPeerData()
56 timers['transport'] = time.time() - start - peer_tables['db_time']
57 timers['peer_db'] = peer_tables['db_time']
59 def sync(objects, peer_objects, classobj):
61 Synchronizes two dictionaries of objects. objects should
62 be a dictionary of local objects keyed on their foreign
63 identifiers. peer_objects should be a dictionary of
64 foreign objects keyed on their local (i.e., foreign to us)
65 identifiers. Returns a final dictionary of local objects
66 keyed on their foreign identifiers.
71 # Delete stale objects
72 for peer_object_id, object in objects.iteritems():
73 if peer_object_id not in peer_objects:
74 object.delete(commit = False)
75 print classobj, "object %d deleted" % object[object.primary_key]
77 # Add/update new/existing objects
78 for peer_object_id, peer_object in peer_objects.iteritems():
79 if peer_object_id in objects:
80 # Update existing object
81 object = objects[peer_object_id]
83 # Replace foreign identifier with existing local
84 # identifier temporarily for the purposes of
86 peer_object[object.primary_key] = object[object.primary_key]
88 # Must use __eq__() instead of == since
89 # peer_object may be a raw dict instead of a Peer
91 if not object.__eq__(peer_object):
92 # Only update intrinsic fields
93 object.update(object.db_fields(peer_object))
100 # Restore foreign identifier
101 peer_object[object.primary_key] = peer_object_id
104 object = classobj(self.api, peer_object)
105 # Replace foreign identifier with new local identifier
106 del object[object.primary_key]
112 object.sync(commit = False)
113 except PLCInvalidArgument, err:
114 # Skip if validation fails
115 # XXX Log an event instead of printing to logfile
116 print >> log, "Warning: Skipping invalid", \
117 peer['peername'], object.__class__.__name__, \
118 ":", peer_object, ":", err
121 synced[peer_object_id] = object
124 print >> log, peer['peername'], classobj(self.api).__class__.__name__, object[object.primary_key], dbg
129 # Synchronize foreign sites
134 # Compare only the columns returned by the GetPeerData() call
135 if peer_tables['Sites']:
136 columns = peer_tables['Sites'][0].keys()
140 # Keyed on foreign site_id
141 old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
142 sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
144 # Synchronize new set (still keyed on foreign site_id)
145 peer_sites = sync(old_peer_sites, sites_at_peer, Site)
147 for peer_site_id, site in peer_sites.iteritems():
148 # Bind any newly cached sites to peer
149 if peer_site_id not in old_peer_sites:
150 peer.add_site(site, peer_site_id, commit = False)
151 site['peer_id'] = peer_id
152 site['peer_site_id'] = peer_site_id
154 timers['site'] = time.time() - start
157 # XXX Synchronize foreign key types
160 key_types = KeyTypes(self.api).dict()
163 # Synchronize foreign keys
168 # Compare only the columns returned by the GetPeerData() call
169 if peer_tables['Keys']:
170 columns = peer_tables['Keys'][0].keys()
174 # Keyed on foreign key_id
175 old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
176 keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
178 # Fix up key_type references
179 for peer_key_id, key in keys_at_peer.items():
180 if key['key_type'] not in key_types:
181 # XXX Log an event instead of printing to logfile
182 print >> log, "Warning: Skipping invalid %s key:" % peer['peername'], \
183 key, ": invalid key type", key['key_type']
184 del keys_at_peer[peer_key_id]
187 # Synchronize new set (still keyed on foreign key_id)
188 peer_keys = sync(old_peer_keys, keys_at_peer, Key)
189 for peer_key_id, key in peer_keys.iteritems():
190 # Bind any newly cached keys to peer
191 if peer_key_id not in old_peer_keys:
192 peer.add_key(key, peer_key_id, commit = False)
193 key['peer_id'] = peer_id
194 key['peer_key_id'] = peer_key_id
196 timers['keys'] = time.time() - start
199 # Synchronize foreign users
204 # Compare only the columns returned by the GetPeerData() call
205 if peer_tables['Persons']:
206 columns = peer_tables['Persons'][0].keys()
210 # Keyed on foreign person_id
211 old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
212 persons_at_peer = dict([(peer_person['person_id'], peer_person) \
213 for peer_person in peer_tables['Persons']])
215 # XXX Do we care about membership in foreign site(s)?
217 # Synchronize new set (still keyed on foreign person_id)
218 peer_persons = sync(old_peer_persons, persons_at_peer, Person)
220 # transcoder : retrieve a local key_id from a peer_key_id
221 key_transcoder = dict ( [ (key['key_id'],key['peer_key_id']) for key in peer_keys.values()])
223 for peer_person_id, person in peer_persons.iteritems():
224 # Bind any newly cached users to peer
225 if peer_person_id not in old_peer_persons:
226 peer.add_person(person, peer_person_id, commit = False)
227 person['peer_id'] = peer_id
228 person['peer_person_id'] = peer_person_id
229 person['key_ids'] = []
231 # User as viewed by peer
232 peer_person = persons_at_peer[peer_person_id]
234 # Foreign keys currently belonging to the user
235 old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids']]
237 # Foreign keys that should belong to the user
238 # this is basically peer_person['key_ids'], we just check it makes sense
239 # (e.g. we might have failed importing it)
240 person_key_ids = [ x for x in peer_person['key_ids'] if x in peer_keys]
242 # Remove stale keys from user
243 for key_id in (set(old_person_key_ids) - set(person_key_ids)):
244 person.remove_key(peer_keys[key_id], commit = False)
245 print >> log, peer['peername'], 'Key', key_id, 'removed from', person['email']
247 # Add new keys to user
248 for key_id in (set(person_key_ids) - set(old_person_key_ids)):
249 person.add_key(peer_keys[key_id], commit = False)
250 print >> log, peer['peername'], 'Key', key_id, 'added into', person['email']
252 timers['persons'] = time.time() - start
255 # XXX Synchronize foreign boot states
258 boot_states = BootStates(self.api).dict()
261 # Synchronize foreign nodes
266 # Compare only the columns returned by the GetPeerData() call
267 if peer_tables['Nodes']:
268 columns = peer_tables['Nodes'][0].keys()
272 # Keyed on foreign node_id
273 old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
274 nodes_at_peer = dict([(node['node_id'], node) \
275 for node in peer_tables['Nodes']])
277 # Fix up site_id and boot_states references
278 for peer_node_id, node in nodes_at_peer.items():
280 if node['site_id'] not in peer_sites:
281 errors.append("invalid site %d" % node['site_id'])
282 if node['boot_state'] not in boot_states:
283 errors.append("invalid boot state %s" % node['boot_state'])
285 # XXX Log an event instead of printing to logfile
286 print >> log, "Warning: Skipping invalid %s node:" % peer['peername'], \
287 node, ":", ", ".join(errors)
288 del nodes_at_peer[peer_node_id]
291 node['site_id'] = peer_sites[node['site_id']]['site_id']
293 # Synchronize new set
294 peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
296 for peer_node_id, node in peer_nodes.iteritems():
297 # Bind any newly cached foreign nodes to peer
298 if peer_node_id not in old_peer_nodes:
299 peer.add_node(node, peer_node_id, commit = False)
300 node['peer_id'] = peer_id
301 node['peer_node_id'] = peer_node_id
303 timers['nodes'] = time.time() - start
306 # Synchronize local nodes
311 # Keyed on local node_id
312 local_nodes = Nodes(self.api).dict()
314 for node in peer_tables['PeerNodes']:
315 # Foreign identifier for our node as maintained by peer
316 peer_node_id = node['node_id']
317 # Local identifier for our node as cached by peer
318 node_id = node['peer_node_id']
319 if node_id in local_nodes:
320 # Still a valid local node, add it to the synchronized
321 # set of local node objects keyed on foreign node_id.
322 peer_nodes[peer_node_id] = local_nodes[node_id]
324 timers['local_nodes'] = time.time() - start
327 # XXX Synchronize foreign slice instantiation states
330 slice_instantiations = SliceInstantiations(self.api).dict()
333 # Synchronize foreign slices
338 # Compare only the columns returned by the GetPeerData() call
339 if peer_tables['Slices']:
340 columns = peer_tables['Slices'][0].keys()
344 # Keyed on foreign slice_id
345 old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
346 slices_at_peer = dict([(slice['slice_id'], slice) \
347 for slice in peer_tables['Slices']])
349 # Fix up site_id, instantiation, and creator_person_id references
350 for peer_slice_id, slice in slices_at_peer.items():
352 if slice['site_id'] not in peer_sites:
353 errors.append("invalid site %d" % slice['site_id'])
354 if slice['instantiation'] not in slice_instantiations:
355 errors.append("invalid instantiation %s" % slice['instantiation'])
356 if slice['creator_person_id'] not in peer_persons:
358 slice['creator_person_id'] = None
360 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
362 print >> log, "Warning: Skipping invalid %s slice:" % peer['peername'], \
363 slice, ":", ", ".join(errors)
364 del slices_at_peer[peer_slice_id]
367 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
369 # Synchronize new set
370 peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
372 # transcoder : retrieve a local node_id from a peer_node_id
373 node_transcoder = dict ( [ (node['node_id'],node['peer_node_id']) for node in peer_nodes.values()])
374 person_transcoder = dict ( [ (person['person_id'],person['peer_person_id']) for person in peer_persons.values()])
376 for peer_slice_id, slice in peer_slices.iteritems():
377 # Bind any newly cached foreign slices to peer
378 if peer_slice_id not in old_peer_slices:
379 peer.add_slice(slice, peer_slice_id, commit = False)
380 slice['peer_id'] = peer_id
381 slice['peer_slice_id'] = peer_slice_id
382 slice['node_ids'] = []
383 slice['person_ids'] = []
385 # Slice as viewed by peer
386 peer_slice = slices_at_peer[peer_slice_id]
388 # Nodes that are currently part of the slice
389 old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids']]
391 # Nodes that should be part of the slice
392 slice_node_ids = [ x for x in peer_slice['node_ids'] if x in peer_nodes]
394 # Remove stale nodes from slice
395 for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
396 slice.remove_node(peer_nodes[node_id], commit = False)
397 print >> log, peer['peername'], 'Node', node_id, 'removed from', slice['name']
399 # Add new nodes to slice
400 for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
401 slice.add_node(peer_nodes[node_id], commit = False)
402 print >> log, peer['peername'], 'Node', node_id, 'added into', slice['name']
404 # N.B.: Local nodes that may have been added to the slice
405 # by hand, are removed. In other words, don't do this.
407 # Foreign users that are currently part of the slice
408 old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids']]
410 # Foreign users that should be part of the slice
411 slice_person_ids = [ x for x in peer_slice['person_ids'] if x in peer_persons ]
413 # Remove stale users from slice
414 for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
415 slice.remove_person(peer_persons[person_id], commit = False)
416 print >> log, peer['peername'], 'User', person_id, 'removed from', slice['name']
418 # Add new users to slice
419 for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
420 slice.add_person(peer_persons[person_id], commit = False)
421 print >> log, peer['peername'], 'User', person_id, 'added into', slice['name']
423 # N.B.: Local users that may have been added to the slice
424 # by hand, are not touched.
426 timers['slices'] = time.time() - start
428 # Update peer itself and commit
429 peer.sync(commit = True)