solves xrefs person->key slice->node slice->person linearly
[plcapi.git] / PLC / Methods / RefreshPeer.py
1 #
2 # Thierry Parmentelat - INRIA
3
4
5 import time
6
7 from PLC.Debug import log
8 from PLC.Faults import *
9 from PLC.Method import Method
10 from PLC.Parameter import Parameter, Mixed
11 from PLC.Auth import Auth
12
13 from PLC.Peers import Peer, Peers
14 from PLC.Sites import Site, Sites
15 from PLC.Persons import Person, Persons
16 from PLC.KeyTypes import KeyType, KeyTypes
17 from PLC.Keys import Key, Keys
18 from PLC.BootStates import BootState, BootStates
19 from PLC.Nodes import Node, Nodes
20 from PLC.SliceInstantiations import SliceInstantiations
21 from PLC.Slices import Slice, Slices
22
23 class RefreshPeer(Method):
24     """
25     Fetches node and slice data from the specified peer and caches it
26     locally; also deletes stale entries. Returns 1 if successful,
27     faults otherwise.
28     """
29
30     roles = ['admin']
31
32     accepts = [
33         Auth(),
34         Mixed(Peer.fields['peer_id'],
35               Peer.fields['peername']),
36         ]
37
38     returns = Parameter(int, "1 if successful")
39
40     def call(self, auth, peer_id_or_peername):
41         # Get peer
42         peers = Peers(self.api, [peer_id_or_peername])
43         if not peers:
44             raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
45         peer = peers[0]
46         peer_id = peer['peer_id']
47
48         # Connect to peer API
49         peer.connect()
50
51         timers = {}
52
53         # Get peer data
54         start = time.time()
55         peer_tables = peer.GetPeerData()
56         timers['transport'] = time.time() - start - peer_tables['db_time']
57         timers['peer_db'] = peer_tables['db_time']
58
59         def sync(objects, peer_objects, classobj):
60             """
61             Synchronizes two dictionaries of objects. objects should
62             be a dictionary of local objects keyed on their foreign
63             identifiers. peer_objects should be a dictionary of
64             foreign objects keyed on their local (i.e., foreign to us)
65             identifiers. Returns a final dictionary of local objects
66             keyed on their foreign identifiers.
67             """
68
69             synced = {}
70
71             # Delete stale objects
72             for peer_object_id, object in objects.iteritems():
73                 if peer_object_id not in peer_objects:
74                     object.delete(commit = False)
75                     print classobj, "object %d deleted" % object[object.primary_key]
76
77             # Add/update new/existing objects
78             for peer_object_id, peer_object in peer_objects.iteritems():
79                 if peer_object_id in objects:
80                     # Update existing object
81                     object = objects[peer_object_id]
82
83                     # Replace foreign identifier with existing local
84                     # identifier temporarily for the purposes of
85                     # comparison.
86                     peer_object[object.primary_key] = object[object.primary_key]
87
88                     # Must use __eq__() instead of == since
89                     # peer_object may be a raw dict instead of a Peer
90                     # object.
91                     if not object.__eq__(peer_object):
92                         # Only update intrinsic fields
93                         object.update(object.db_fields(peer_object))
94                         sync = True
95                         dbg = "changed"
96                     else:
97                         sync = False
98                         dbg = None
99
100                     # Restore foreign identifier
101                     peer_object[object.primary_key] = peer_object_id
102                 else:
103                     # Add new object
104                     object = classobj(self.api, peer_object)
105                     # Replace foreign identifier with new local identifier
106                     del object[object.primary_key]
107                     sync = True
108                     dbg = "added"
109
110                 if sync:
111                     try:
112                         object.sync(commit = False)
113                     except PLCInvalidArgument, err:
114                         # Skip if validation fails
115                         # XXX Log an event instead of printing to logfile
116                         print >> log, "Warning: Skipping invalid", \
117                               peer['peername'], object.__class__.__name__, \
118                               ":", peer_object, ":", err
119                         continue
120
121                 synced[peer_object_id] = object
122
123                 if dbg:
124                     print >> log, peer['peername'], classobj(self.api).__class__.__name__, object[object.primary_key], dbg
125
126             return synced
127
128         #
129         # Synchronize foreign sites
130         #
131
132         start = time.time()
133
134         # Compare only the columns returned by the GetPeerData() call
135         if peer_tables['Sites']:
136             columns = peer_tables['Sites'][0].keys()
137         else:
138             columns = None
139
140         # Keyed on foreign site_id
141         old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
142         sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
143
144         # Synchronize new set (still keyed on foreign site_id)
145         peer_sites = sync(old_peer_sites, sites_at_peer, Site)
146
147         for peer_site_id, site in peer_sites.iteritems():
148             # Bind any newly cached sites to peer
149             if peer_site_id not in old_peer_sites:
150                 peer.add_site(site, peer_site_id, commit = False)
151                 site['peer_id'] = peer_id
152                 site['peer_site_id'] = peer_site_id
153
154         timers['site'] = time.time() - start
155
156         #
157         # XXX Synchronize foreign key types
158         #
159
160         key_types = KeyTypes(self.api).dict()
161
162         #
163         # Synchronize foreign keys
164         #
165
166         start = time.time()
167
168         # Compare only the columns returned by the GetPeerData() call
169         if peer_tables['Keys']:
170             columns = peer_tables['Keys'][0].keys()
171         else:
172             columns = None
173
174         # Keyed on foreign key_id
175         old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
176         keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
177
178         # Fix up key_type references
179         for peer_key_id, key in keys_at_peer.items():
180             if key['key_type'] not in key_types:
181                 # XXX Log an event instead of printing to logfile
182                 print >> log, "Warning: Skipping invalid %s key:" % peer['peername'], \
183                       key, ": invalid key type", key['key_type']
184                 del keys_at_peer[peer_key_id]
185                 continue
186
187         # Synchronize new set (still keyed on foreign key_id)
188         peer_keys = sync(old_peer_keys, keys_at_peer, Key)
189         for peer_key_id, key in peer_keys.iteritems():
190             # Bind any newly cached keys to peer
191             if peer_key_id not in old_peer_keys:
192                 peer.add_key(key, peer_key_id, commit = False)
193                 key['peer_id'] = peer_id
194                 key['peer_key_id'] = peer_key_id
195
196         timers['keys'] = time.time() - start
197
198         #
199         # Synchronize foreign users
200         #
201
202         start = time.time()
203
204         # Compare only the columns returned by the GetPeerData() call
205         if peer_tables['Persons']:
206             columns = peer_tables['Persons'][0].keys()
207         else:
208             columns = None
209
210         # Keyed on foreign person_id
211         old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
212         persons_at_peer = dict([(peer_person['person_id'], peer_person) \
213                                 for peer_person in peer_tables['Persons']])
214
215         # XXX Do we care about membership in foreign site(s)?
216
217         # Synchronize new set (still keyed on foreign person_id)
218         peer_persons = sync(old_peer_persons, persons_at_peer, Person)
219
220         # transcoder : retrieve a local key_id from a peer_key_id
221         key_transcoder = dict ( [ (key['key_id'],key['peer_key_id']) for key in peer_keys.values()])
222
223         for peer_person_id, person in peer_persons.iteritems():
224             # Bind any newly cached users to peer
225             if peer_person_id not in old_peer_persons:
226                 peer.add_person(person, peer_person_id, commit = False)
227                 person['peer_id'] = peer_id
228                 person['peer_person_id'] = peer_person_id
229                 person['key_ids'] = []
230
231             # User as viewed by peer
232             peer_person = persons_at_peer[peer_person_id]
233             
234             # Foreign keys currently belonging to the user
235             old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids']]
236
237             # Foreign keys that should belong to the user
238             # this is basically peer_person['key_ids'], we just check it makes sense 
239             # (e.g. we might have failed importing it)
240             person_key_ids = [ x for x in peer_person['key_ids'] if x in peer_keys]
241
242             # Remove stale keys from user
243             for peer_key_id in (set(old_person_key_ids) - set(person_key_ids)):
244                 person.remove_key(peer_keys[peer_key_id], commit = False)
245
246             # Add new keys to user
247             for peer_key_id in (set(person_key_ids) - set(old_person_key_ids)):
248                 person.add_key(peer_keys[peer_key_id], commit = False)
249
250         timers['persons'] = time.time() - start
251
252         #
253         # XXX Synchronize foreign boot states
254         #
255
256         boot_states = BootStates(self.api).dict()
257
258         #
259         # Synchronize foreign nodes
260         #
261
262         start = time.time()
263
264         # Compare only the columns returned by the GetPeerData() call
265         if peer_tables['Nodes']:
266             columns = peer_tables['Nodes'][0].keys()
267         else:
268             columns = None
269
270         # Keyed on foreign node_id
271         old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
272         nodes_at_peer = dict([(node['node_id'], node) \
273                               for node in peer_tables['Nodes']])
274
275         # Fix up site_id and boot_states references
276         for peer_node_id, node in nodes_at_peer.items():
277             errors = []
278             if node['site_id'] not in peer_sites:
279                 errors.append("invalid site %d" % node['site_id'])
280             if node['boot_state'] not in boot_states:
281                 errors.append("invalid boot state %s" % node['boot_state'])
282             if errors:
283                 # XXX Log an event instead of printing to logfile
284                 print >> log, "Warning: Skipping invalid %s node:" % peer['peername'], \
285                       node, ":", ", ".join(errors)
286                 del nodes_at_peer[peer_node_id]
287                 continue
288             else:
289                 node['site_id'] = peer_sites[node['site_id']]['site_id']
290
291         # Synchronize new set
292         peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
293
294         for peer_node_id, node in peer_nodes.iteritems():
295             # Bind any newly cached foreign nodes to peer
296             if peer_node_id not in old_peer_nodes:
297                 peer.add_node(node, peer_node_id, commit = False)
298                 node['peer_id'] = peer_id
299                 node['peer_node_id'] = peer_node_id
300
301         timers['nodes'] = time.time() - start
302
303         #
304         # Synchronize local nodes
305         #
306
307         start = time.time()
308
309         # Keyed on local node_id
310         local_nodes = Nodes(self.api).dict()
311
312         for node in peer_tables['PeerNodes']:
313             # Foreign identifier for our node as maintained by peer
314             peer_node_id = node['node_id']
315             # Local identifier for our node as cached by peer
316             node_id = node['peer_node_id']
317             if node_id in local_nodes:
318                 # Still a valid local node, add it to the synchronized
319                 # set of local node objects keyed on foreign node_id.
320                 peer_nodes[peer_node_id] = local_nodes[node_id]
321
322         timers['local_nodes'] = time.time() - start
323
324         #
325         # XXX Synchronize foreign slice instantiation states
326         #
327
328         slice_instantiations = SliceInstantiations(self.api).dict()
329
330         #
331         # Synchronize foreign slices
332         #
333
334         start = time.time()
335
336         # Compare only the columns returned by the GetPeerData() call
337         if peer_tables['Slices']:
338             columns = peer_tables['Slices'][0].keys()
339         else:
340             columns = None
341
342         # Keyed on foreign slice_id
343         old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
344         slices_at_peer = dict([(slice['slice_id'], slice) \
345                                for slice in peer_tables['Slices']])
346
347         # Fix up site_id, instantiation, and creator_person_id references
348         for peer_slice_id, slice in slices_at_peer.items():
349             errors = []
350             if slice['site_id'] not in peer_sites:
351                 errors.append("invalid site %d" % slice['site_id'])
352             if slice['instantiation'] not in slice_instantiations:
353                 errors.append("invalid instantiation %s" % slice['instantiation'])
354             if slice['creator_person_id'] not in peer_persons:
355                 # Just NULL it out
356                 slice['creator_person_id'] = None
357             else:
358                 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
359             if errors:
360                 print >> log, "Warning: Skipping invalid %s slice:" % peer['peername'], \
361                       slice, ":", ", ".join(errors)
362                 del slices_at_peer[peer_slice_id]
363                 continue
364             else:
365                 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
366
367         # Synchronize new set
368         peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
369
370         # transcoder : retrieve a local node_id from a peer_node_id
371         node_transcoder = dict ( [ (node['node_id'],node['peer_node_id']) for node in peer_nodes.values()])
372         person_transcoder = dict ( [ (person['person_id'],person['peer_person_id']) for person in peer_persons.values()])
373
374         for peer_slice_id, slice in peer_slices.iteritems():
375             # Bind any newly cached foreign slices to peer
376             if peer_slice_id not in old_peer_slices:
377                 peer.add_slice(slice, peer_slice_id, commit = False)
378                 slice['peer_id'] = peer_id
379                 slice['peer_slice_id'] = peer_slice_id
380                 slice['node_ids'] = []
381                 slice['person_ids'] = []
382
383             # Slice as viewed by peer
384             peer_slice = slices_at_peer[peer_slice_id]
385
386             # Nodes that are currently part of the slice
387             old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids']]
388
389             # Nodes that should be part of the slice
390             slice_node_ids = [ x for x in peer_slice['node_ids'] if x in peer_nodes]
391
392             # Remove stale nodes from slice
393             for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
394                 slice.remove_node(peer_nodes[node_id], commit = False)
395
396             # Add new nodes to slice
397             for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
398                 slice.add_node(peer_nodes[node_id], commit = False)
399
400             # N.B.: Local nodes that may have been added to the slice
401             # by hand, are removed. In other words, don't do this.
402
403             # Foreign users that are currently part of the slice
404             old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids']]
405
406             # Foreign users that should be part of the slice
407             slice_person_ids = [ x for x in peer_slice['person_ids'] if x in peer_persons]
408
409             # Remove stale users from slice
410             for peer_person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
411                 slice.remove_person(peer_persons[peer_person_id], commit = False)
412
413             # Add new users to slice
414             for peer_person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
415                 slice.add_person(peer_persons[peer_person_id], commit = False)
416
417             # N.B.: Local users that may have been added to the slice
418             # by hand, are not touched.
419
420         timers['slices'] = time.time() - start
421
422         # Update peer itself and commit
423         peer.sync(commit = True)
424
425         return timers