- be more verbose about node and slice caching errors
[plcapi.git] / PLC / Methods / RefreshPeer.py
1 #
2 # Thierry Parmentelat - INRIA
3
4
5 import time
6
7 from PLC.Debug import log
8 from PLC.Faults import *
9 from PLC.Method import Method
10 from PLC.Parameter import Parameter, Mixed
11 from PLC.Auth import Auth
12
13 from PLC.Peers import Peer, Peers
14 from PLC.Sites import Site, Sites
15 from PLC.Persons import Person, Persons
16 from PLC.KeyTypes import KeyType, KeyTypes
17 from PLC.Keys import Key, Keys
18 from PLC.BootStates import BootState, BootStates
19 from PLC.Nodes import Node, Nodes
20 from PLC.SliceInstantiations import SliceInstantiations
21 from PLC.Slices import Slice, Slices
22
23 class RefreshPeer(Method):
24     """
25     Fetches node and slice data from the specified peer and caches it
26     locally; also deletes stale entries. Returns 1 if successful,
27     faults otherwise.
28     """
29
30     roles = ['admin']
31
32     accepts = [
33         Auth(),
34         Mixed(Peer.fields['peer_id'],
35               Peer.fields['peername']),
36         ]
37
38     returns = Parameter(int, "1 if successful")
39
40     def call(self, auth, peer_id_or_peername):
41         # Get peer
42         peers = Peers(self.api, [peer_id_or_peername])
43         if not peers:
44             raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
45         peer = peers[0]
46         peer_id = peer['peer_id']
47
48         # Connect to peer API
49         peer.connect()
50
51         timers = {}
52
53         # Get peer data
54         start = time.time()
55         peer_tables = peer.GetPeerData()
56         timers['transport'] = time.time() - start - peer_tables['db_time']
57         timers['peer_db'] = peer_tables['db_time']
58
59         def sync(objects, peer_objects, classobj):
60             """
61             Synchronizes two dictionaries of objects. objects should
62             be a dictionary of local objects keyed on their foreign
63             identifiers. peer_objects should be a dictionary of
64             foreign objects keyed on their local (i.e., foreign to us)
65             identifiers. Returns a final dictionary of local objects
66             keyed on their foreign identifiers.
67             """
68
69             synced = {}
70
71             # Delete stale objects
72             for peer_object_id, object in objects.iteritems():
73                 if peer_object_id not in peer_objects:
74                     object.delete(commit = False)
75                     print classobj, "object %d deleted" % object[object.primary_key]
76
77             # Add/update new/existing objects
78             for peer_object_id, peer_object in peer_objects.iteritems():
79                 if peer_object_id in objects:
80                     # Update existing object
81                     object = objects[peer_object_id]
82
83                     # Replace foreign identifier with existing local
84                     # identifier temporarily for the purposes of
85                     # comparison.
86                     peer_object[object.primary_key] = object[object.primary_key]
87
88                     # Must use __eq__() instead of == since
89                     # peer_object may be a raw dict instead of a Peer
90                     # object.
91                     if not object.__eq__(peer_object):
92                         # Only update intrinsic fields
93                         object.update(object.db_fields(peer_object))
94                         sync = True
95                         dbg = "changed"
96                     else:
97                         sync = False
98                         dbg = None
99
100                     # Restore foreign identifier
101                     peer_object[object.primary_key] = peer_object_id
102                 else:
103                     # Add new object
104                     object = classobj(self.api, peer_object)
105                     # Replace foreign identifier with new local identifier
106                     del object[object.primary_key]
107                     sync = True
108                     dbg = "added"
109
110                 if sync:
111                     try:
112                         object.sync(commit = False)
113                     except PLCInvalidArgument, err:
114                         # Skip if validation fails
115                         # XXX Log an event instead of printing to logfile
116                         print >> log, "Warning: Skipping invalid", \
117                               peer['peername'], object.__class__.__name__, \
118                               ":", peer_object, ":", err
119                         continue
120
121                 synced[peer_object_id] = object
122
123                 if dbg:
124                     print >> log, peer['peername'], classobj(self.api).__class__.__name__, object[object.primary_key], dbg
125
126             return synced
127
128         #
129         # Synchronize foreign sites
130         #
131
132         start = time.time()
133
134         # Compare only the columns returned by the GetPeerData() call
135         if peer_tables['Sites']:
136             columns = peer_tables['Sites'][0].keys()
137         else:
138             columns = None
139
140         # Keyed on foreign site_id
141         old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
142         sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
143
144         # Synchronize new set (still keyed on foreign site_id)
145         peer_sites = sync(old_peer_sites, sites_at_peer, Site)
146
147         for peer_site_id, site in peer_sites.iteritems():
148             # Bind any newly cached sites to peer
149             if peer_site_id not in old_peer_sites:
150                 peer.add_site(site, peer_site_id, commit = False)
151                 site['peer_id'] = peer_id
152                 site['peer_site_id'] = peer_site_id
153
154         timers['site'] = time.time() - start
155
156         #
157         # XXX Synchronize foreign key types
158         #
159
160         key_types = KeyTypes(self.api).dict()
161
162         #
163         # Synchronize foreign keys
164         #
165
166         start = time.time()
167
168         # Compare only the columns returned by the GetPeerData() call
169         if peer_tables['Keys']:
170             columns = peer_tables['Keys'][0].keys()
171         else:
172             columns = None
173
174         # Keyed on foreign key_id
175         old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
176         keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
177
178         # Fix up key_type references
179         for peer_key_id, key in keys_at_peer.items():
180             if key['key_type'] not in key_types:
181                 # XXX Log an event instead of printing to logfile
182                 print >> log, "Warning: Skipping invalid %s key:" % peer['peername'], \
183                       key, ": invalid key type", key['key_type']
184                 del keys_at_peer[peer_key_id]
185                 continue
186
187         # Synchronize new set (still keyed on foreign key_id)
188         peer_keys = sync(old_peer_keys, keys_at_peer, Key)
189         for peer_key_id, key in peer_keys.iteritems():
190             # Bind any newly cached keys to peer
191             if peer_key_id not in old_peer_keys:
192                 peer.add_key(key, peer_key_id, commit = False)
193                 key['peer_id'] = peer_id
194                 key['peer_key_id'] = peer_key_id
195
196         timers['keys'] = time.time() - start
197
198         #
199         # Synchronize foreign users
200         #
201
202         start = time.time()
203
204         # Compare only the columns returned by the GetPeerData() call
205         if peer_tables['Persons']:
206             columns = peer_tables['Persons'][0].keys()
207         else:
208             columns = None
209
210         # Keyed on foreign person_id
211         old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
212         persons_at_peer = dict([(peer_person['person_id'], peer_person) \
213                                 for peer_person in peer_tables['Persons']])
214
215         # XXX Do we care about membership in foreign site(s)?
216
217         # Synchronize new set (still keyed on foreign person_id)
218         peer_persons = sync(old_peer_persons, persons_at_peer, Person)
219
220         for peer_person_id, person in peer_persons.iteritems():
221             # Bind any newly cached users to peer
222             if peer_person_id not in old_peer_persons:
223                 peer.add_person(person, peer_person_id, commit = False)
224                 person['peer_id'] = peer_id
225                 person['peer_person_id'] = peer_person_id
226                 person['key_ids'] = []
227
228             # User as viewed by peer
229             peer_person = persons_at_peer[peer_person_id]
230             
231             # Foreign keys currently belonging to the user
232             old_person_keys = dict(filter(lambda (peer_key_id, key): \
233                                           key['key_id'] in person['key_ids'],
234                                           peer_keys.items()))
235
236             # Foreign keys that should belong to the user
237             person_keys = dict(filter(lambda (peer_key_id, key): \
238                                       peer_key_id in peer_person['key_ids'],
239                                       peer_keys.items()))
240
241             # Remove stale keys from user
242             for peer_key_id in (set(old_person_keys.keys()) - set(person_keys.keys())):
243                 person.remove_key(old_person_keys[peer_key_id], commit = False)
244
245             # Add new keys to user
246             for peer_key_id in (set(person_keys.keys()) - set(old_person_keys.keys())):
247                 person.add_key(person_keys[peer_key_id], commit = False)
248
249         timers['persons'] = time.time() - start
250
251         #
252         # XXX Synchronize foreign boot states
253         #
254
255         boot_states = BootStates(self.api).dict()
256
257         #
258         # Synchronize foreign nodes
259         #
260
261         start = time.time()
262
263         # Compare only the columns returned by the GetPeerData() call
264         if peer_tables['Nodes']:
265             columns = peer_tables['Nodes'][0].keys()
266         else:
267             columns = None
268
269         # Keyed on foreign node_id
270         old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
271         nodes_at_peer = dict([(node['node_id'], node) \
272                               for node in peer_tables['Nodes']])
273
274         # Fix up site_id and boot_states references
275         for peer_node_id, node in nodes_at_peer.items():
276             errors = []
277             if node['site_id'] not in peer_sites:
278                 errors.append("invalid site %d" % node['site_id'])
279             if node['boot_state'] not in boot_states:
280                 errors.append("invalid boot state %s" % node['boot_state'])
281             if errors:
282                 # XXX Log an event instead of printing to logfile
283                 print >> log, "Warning: Skipping invalid %s node:" % peer['peername'], \
284                       node, ":", ", ".join(errors)
285                 del nodes_at_peer[peer_node_id]
286                 continue
287             else:
288                 node['site_id'] = peer_sites[node['site_id']]['site_id']
289
290         # Synchronize new set
291         peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
292
293         for peer_node_id, node in peer_nodes.iteritems():
294             # Bind any newly cached foreign nodes to peer
295             if peer_node_id not in old_peer_nodes:
296                 peer.add_node(node, peer_node_id, commit = False)
297                 node['peer_id'] = peer_id
298                 node['peer_node_id'] = peer_node_id
299
300         timers['nodes'] = time.time() - start
301
302         #
303         # Synchronize local nodes
304         #
305
306         start = time.time()
307
308         # Keyed on local node_id
309         local_nodes = Nodes(self.api).dict()
310
311         for node in peer_tables['PeerNodes']:
312             # Foreign identifier for our node as maintained by peer
313             peer_node_id = node['node_id']
314             # Local identifier for our node as cached by peer
315             node_id = node['peer_node_id']
316             if node_id in local_nodes:
317                 # Still a valid local node, add it to the synchronized
318                 # set of local node objects keyed on foreign node_id.
319                 peer_nodes[peer_node_id] = local_nodes[node_id]
320
321         timers['local_nodes'] = time.time() - start
322
323         #
324         # XXX Synchronize foreign slice instantiation states
325         #
326
327         slice_instantiations = SliceInstantiations(self.api).dict()
328
329         #
330         # Synchronize foreign slices
331         #
332
333         start = time.time()
334
335         # Compare only the columns returned by the GetPeerData() call
336         if peer_tables['Slices']:
337             columns = peer_tables['Slices'][0].keys()
338         else:
339             columns = None
340
341         # Keyed on foreign slice_id
342         old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
343         slices_at_peer = dict([(slice['slice_id'], slice) \
344                                for slice in peer_tables['Slices']])
345
346         # Fix up site_id, instantiation, and creator_person_id references
347         for peer_slice_id, slice in slices_at_peer.items():
348             errors = []
349             if slice['site_id'] not in peer_sites:
350                 errors.append("invalid site %d" % slice['site_id'])
351             if slice['instantiation'] not in slice_instantiations:
352                 errors.append("invalid instantiation %s" % slice['instantiation'])
353             if slice['creator_person_id'] not in peer_persons:
354                 errors.append("invalid creator %d" % slice['creator_person_id'])
355             if errors:
356                 print >> log, "Warning: Skipping invalid %s slice:" % peer['peername'], \
357                       slice, ":", ", ".join(errors)
358                 del slices_at_peer[peer_slice_id]
359                 continue
360             else:
361                 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
362                 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
363
364         # Synchronize new set
365         peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
366
367         for peer_slice_id, slice in peer_slices.iteritems():
368             # Bind any newly cached foreign slices to peer
369             if peer_slice_id not in old_peer_slices:
370                 peer.add_slice(slice, peer_slice_id, commit = False)
371                 slice['peer_id'] = peer_id
372                 slice['peer_slice_id'] = peer_slice_id
373                 slice['node_ids'] = []
374                 slice['person_ids'] = []
375
376             # Slice as viewed by peer
377             peer_slice = slices_at_peer[peer_slice_id]
378
379             # Nodes that are currently part of the slice
380             old_slice_nodes = dict(filter(lambda (peer_node_id, node): \
381                                           node['node_id'] in slice['node_ids'],
382                                           peer_nodes.items()))
383
384             # Nodes that should be part of the slice
385             slice_nodes = dict(filter(lambda (peer_node_id, node): \
386                                       peer_node_id in peer_slice['node_ids'],
387                                       peer_nodes.items()))
388
389             # Remove stale nodes from slice
390             for node_id in (set(old_slice_nodes.keys()) - set(slice_nodes.keys())):
391                 slice.remove_node(old_slice_nodes[node_id], commit = False)
392
393             # Add new nodes to slice
394             for node_id in (set(slice_nodes.keys()) - set(old_slice_nodes.keys())):
395                 slice.add_node(slice_nodes[node_id], commit = False)
396
397             # N.B.: Local nodes that may have been added to the slice
398             # by hand, are removed. In other words, don't do this.
399
400             # Foreign users that are currently part of the slice
401             old_slice_persons = dict(filter(lambda (peer_person_id, person): \
402                                             person['person_id'] in slice['person_ids'],
403                                             peer_persons.items()))
404
405             # Foreign users that should be part of the slice
406             slice_persons = dict(filter(lambda (peer_person_id, person): \
407                                         peer_person_id in peer_slice['person_ids'],
408                                         peer_persons.items()))
409
410             # Remove stale users from slice
411             for peer_person_id in (set(old_slice_persons.keys()) - set(slice_persons.keys())):
412                 slice.remove_person(old_slice_persons[peer_person_id], commit = False)
413
414             # Add new users to slice
415             for peer_person_id in (set(slice_persons.keys()) - set(old_slice_persons.keys())):
416                 slice.add_person(slice_persons[peer_person_id], commit = False)
417
418             # N.B.: Local users that may have been added to the slice
419             # by hand, are not touched.
420
421         timers['slices'] = time.time() - start
422
423         # Update peer itself and commit
424         peer.sync(commit = True)
425
426         return timers