attempt to fix key error issue
[plcapi.git] / PLC / Methods / RefreshPeer.py
1 #
2 # Thierry Parmentelat - INRIA
3
4
5 import time
6
7 from PLC.Debug import log
8 from PLC.Faults import *
9 from PLC.Method import Method
10 from PLC.Parameter import Parameter, Mixed
11 from PLC.Auth import Auth
12
13 from PLC.Peers import Peer, Peers
14 from PLC.Sites import Site, Sites
15 from PLC.Persons import Person, Persons
16 from PLC.KeyTypes import KeyType, KeyTypes
17 from PLC.Keys import Key, Keys
18 from PLC.BootStates import BootState, BootStates
19 from PLC.Nodes import Node, Nodes
20 from PLC.SliceInstantiations import SliceInstantiations
21 from PLC.Slices import Slice, Slices
22
23 class RefreshPeer(Method):
24     """
25     Fetches node and slice data from the specified peer and caches it
26     locally; also deletes stale entries. Returns 1 if successful,
27     faults otherwise.
28     """
29
30     roles = ['admin']
31
32     accepts = [
33         Auth(),
34         Mixed(Peer.fields['peer_id'],
35               Peer.fields['peername']),
36         ]
37
38     returns = Parameter(int, "1 if successful")
39
40     def call(self, auth, peer_id_or_peername):
41         # Get peer
42         peers = Peers(self.api, [peer_id_or_peername])
43         if not peers:
44             raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
45         peer = peers[0]
46         peer_id = peer['peer_id']
47
48         # Connect to peer API
49         peer.connect()
50
51         timers = {}
52
53         # Get peer data
54         start = time.time()
55         peer_tables = peer.GetPeerData()
56         timers['transport'] = time.time() - start - peer_tables['db_time']
57         timers['peer_db'] = peer_tables['db_time']
58
59         def sync(objects, peer_objects, classobj):
60             """
61             Synchronizes two dictionaries of objects. objects should
62             be a dictionary of local objects keyed on their foreign
63             identifiers. peer_objects should be a dictionary of
64             foreign objects keyed on their local (i.e., foreign to us)
65             identifiers. Returns a final dictionary of local objects
66             keyed on their foreign identifiers.
67             """
68
69             synced = {}
70
71             # Delete stale objects
72             for peer_object_id, object in objects.iteritems():
73                 if peer_object_id not in peer_objects:
74                     object.delete(commit = False)
75                     print classobj(self.api).__class__.__name__, "object %s deleted" % object[object.class_key]
76
77             # Add/update new/existing objects
78             for peer_object_id, peer_object in peer_objects.iteritems():
79                 if peer_object_id in objects:
80                     # Update existing object
81                     object = objects[peer_object_id]
82
83                     # Replace foreign identifier with existing local
84                     # identifier temporarily for the purposes of
85                     # comparison.
86                     peer_object[object.primary_key] = object[object.primary_key]
87
88                     # Must use __eq__() instead of == since
89                     # peer_object may be a raw dict instead of a Peer
90                     # object.
91                     if not object.__eq__(peer_object):
92                         # Only update intrinsic fields
93                         object.update(object.db_fields(peer_object))
94                         sync = True
95                         dbg = "changed"
96                     else:
97                         sync = False
98                         dbg = None
99
100                     # Restore foreign identifier
101                     peer_object[object.primary_key] = peer_object_id
102                 else:
103                     # Add new object
104                     object = classobj(self.api, peer_object)
105                     # Replace foreign identifier with new local identifier
106                     del object[object.primary_key]
107                     sync = True
108                     dbg = "added"
109
110                 if sync:
111                     try:
112                         object.sync(commit = False)
113                     except PLCInvalidArgument, err:
114                         # Skip if validation fails
115                         # XXX Log an event instead of printing to logfile
116                         print >> log, "Warning: Skipping invalid", \
117                               peer['peername'], object.__class__.__name__, \
118                               ":", peer_object, ":", err
119                         continue
120
121                 synced[peer_object_id] = object
122
123                 if dbg:
124                     print >> log, peer['peername'], classobj(self.api).__class__.__name__, \
125                         object[object.class_key], object[object.primary_key], dbg
126
127             return synced
128
129         #
130         # Synchronize foreign sites
131         #
132
133         start = time.time()
134
135         # Compare only the columns returned by the GetPeerData() call
136         if peer_tables['Sites']:
137             columns = peer_tables['Sites'][0].keys()
138         else:
139             columns = None
140
141         # Keyed on foreign site_id
142         old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
143         sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
144
145         # Synchronize new set (still keyed on foreign site_id)
146         peer_sites = sync(old_peer_sites, sites_at_peer, Site)
147
148         for peer_site_id, site in peer_sites.iteritems():
149             # Bind any newly cached sites to peer
150             if peer_site_id not in old_peer_sites:
151                 peer.add_site(site, peer_site_id, commit = False)
152                 site['peer_id'] = peer_id
153                 site['peer_site_id'] = peer_site_id
154
155         timers['site'] = time.time() - start
156
157         #
158         # XXX Synchronize foreign key types
159         #
160
161         key_types = KeyTypes(self.api).dict()
162
163         #
164         # Synchronize foreign keys
165         #
166
167         start = time.time()
168
169         # Compare only the columns returned by the GetPeerData() call
170         if peer_tables['Keys']:
171             columns = peer_tables['Keys'][0].keys()
172         else:
173             columns = None
174
175         # Keyed on foreign key_id
176         old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
177         keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
178
179         # Fix up key_type references
180         for peer_key_id, key in keys_at_peer.items():
181             if key['key_type'] not in key_types:
182                 # XXX Log an event instead of printing to logfile
183                 print >> log, "Warning: Skipping invalid %s key:" % peer['peername'], \
184                       key, ": invalid key type", key['key_type']
185                 del keys_at_peer[peer_key_id]
186                 continue
187
188         # Synchronize new set (still keyed on foreign key_id)
189         peer_keys = sync(old_peer_keys, keys_at_peer, Key)
190         for peer_key_id, key in peer_keys.iteritems():
191             # Bind any newly cached keys to peer
192             if peer_key_id not in old_peer_keys:
193                 peer.add_key(key, peer_key_id, commit = False)
194                 key['peer_id'] = peer_id
195                 key['peer_key_id'] = peer_key_id
196
197         timers['keys'] = time.time() - start
198
199         #
200         # Synchronize foreign users
201         #
202
203         start = time.time()
204
205         # Compare only the columns returned by the GetPeerData() call
206         if peer_tables['Persons']:
207             columns = peer_tables['Persons'][0].keys()
208         else:
209             columns = None
210
211         # Keyed on foreign person_id
212         old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
213         persons_at_peer = dict([(peer_person['person_id'], peer_person) \
214                                 for peer_person in peer_tables['Persons']])
215
216         # XXX Do we care about membership in foreign site(s)?
217
218         # Synchronize new set (still keyed on foreign person_id)
219         peer_persons = sync(old_peer_persons, persons_at_peer, Person)
220
221         # transcoder : retrieve a local key_id from a peer_key_id
222         key_transcoder = dict ( [ (key['key_id'],key['peer_key_id']) for key in peer_keys.values()])
223
224         for peer_person_id, person in peer_persons.iteritems():
225             # Bind any newly cached users to peer
226             if peer_person_id not in old_peer_persons:
227                 peer.add_person(person, peer_person_id, commit = False)
228                 person['peer_id'] = peer_id
229                 person['peer_person_id'] = peer_person_id
230                 person['key_ids'] = []
231
232             # User as viewed by peer
233             peer_person = persons_at_peer[peer_person_id]
234             
235             # Foreign keys currently belonging to the user
236             old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
237                                   if key_transcoder[key_id] in peer_keys]
238
239             # Foreign keys that should belong to the user
240             # this is basically peer_person['key_ids'], we just check it makes sense 
241             # (e.g. we might have failed importing it)
242             person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
243
244             # Remove stale keys from user
245             for key_id in (set(old_person_key_ids) - set(person_key_ids)):
246                 person.remove_key(peer_keys[key_id], commit = False)
247                 print >> log, peer['peername'], 'Key', key_id, 'removed from', person['email']
248
249             # Add new keys to user
250             for key_id in (set(person_key_ids) - set(old_person_key_ids)):
251                 person.add_key(peer_keys[key_id], commit = False)
252                 print >> log, peer['peername'], 'Key', key_id, 'added into', person['email']
253
254         timers['persons'] = time.time() - start
255
256         #
257         # XXX Synchronize foreign boot states
258         #
259
260         boot_states = BootStates(self.api).dict()
261
262         #
263         # Synchronize foreign nodes
264         #
265
266         start = time.time()
267
268         # Compare only the columns returned by the GetPeerData() call
269         if peer_tables['Nodes']:
270             columns = peer_tables['Nodes'][0].keys()
271         else:
272             columns = None
273
274         # Keyed on foreign node_id
275         old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
276         nodes_at_peer = dict([(node['node_id'], node) \
277                               for node in peer_tables['Nodes']])
278
279         # Fix up site_id and boot_states references
280         for peer_node_id, node in nodes_at_peer.items():
281             errors = []
282             if node['site_id'] not in peer_sites:
283                 errors.append("invalid site %d" % node['site_id'])
284             if node['boot_state'] not in boot_states:
285                 errors.append("invalid boot state %s" % node['boot_state'])
286             if errors:
287                 # XXX Log an event instead of printing to logfile
288                 print >> log, "Warning: Skipping invalid %s node:" % peer['peername'], \
289                       node, ":", ", ".join(errors)
290                 del nodes_at_peer[peer_node_id]
291                 continue
292             else:
293                 node['site_id'] = peer_sites[node['site_id']]['site_id']
294
295         # Synchronize new set
296         peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
297
298         for peer_node_id, node in peer_nodes.iteritems():
299             # Bind any newly cached foreign nodes to peer
300             if peer_node_id not in old_peer_nodes:
301                 peer.add_node(node, peer_node_id, commit = False)
302                 node['peer_id'] = peer_id
303                 node['peer_node_id'] = peer_node_id
304
305         timers['nodes'] = time.time() - start
306
307         #
308         # Synchronize local nodes
309         #
310
311         start = time.time()
312
313         # Keyed on local node_id
314         local_nodes = Nodes(self.api).dict()
315
316         for node in peer_tables['PeerNodes']:
317             # Foreign identifier for our node as maintained by peer
318             peer_node_id = node['node_id']
319             # Local identifier for our node as cached by peer
320             node_id = node['peer_node_id']
321             if node_id in local_nodes:
322                 # Still a valid local node, add it to the synchronized
323                 # set of local node objects keyed on foreign node_id.
324                 peer_nodes[peer_node_id] = local_nodes[node_id]
325
326         timers['local_nodes'] = time.time() - start
327
328         #
329         # XXX Synchronize foreign slice instantiation states
330         #
331
332         slice_instantiations = SliceInstantiations(self.api).dict()
333
334         #
335         # Synchronize foreign slices
336         #
337
338         start = time.time()
339
340         # Compare only the columns returned by the GetPeerData() call
341         if peer_tables['Slices']:
342             columns = peer_tables['Slices'][0].keys()
343         else:
344             columns = None
345
346         # Keyed on foreign slice_id
347         old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
348         slices_at_peer = dict([(slice['slice_id'], slice) \
349                                for slice in peer_tables['Slices']])
350
351         # Fix up site_id, instantiation, and creator_person_id references
352         for peer_slice_id, slice in slices_at_peer.items():
353             errors = []
354             if slice['site_id'] not in peer_sites:
355                 errors.append("invalid site %d" % slice['site_id'])
356             if slice['instantiation'] not in slice_instantiations:
357                 errors.append("invalid instantiation %s" % slice['instantiation'])
358             if slice['creator_person_id'] not in peer_persons:
359                 # Just NULL it out
360                 slice['creator_person_id'] = None
361             else:
362                 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
363             if errors:
364                 print >> log, "Warning: Skipping invalid %s slice:" % peer['peername'], \
365                       slice, ":", ", ".join(errors)
366                 del slices_at_peer[peer_slice_id]
367                 continue
368             else:
369                 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
370
371         # Synchronize new set
372         peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
373
374         # transcoder : retrieve a local node_id from a peer_node_id
375         node_transcoder = dict ( [ (node['node_id'],node['peer_node_id']) for node in peer_nodes.values()])
376         person_transcoder = dict ( [ (person['person_id'],person['peer_person_id']) for person in peer_persons.values()])
377
378         for peer_slice_id, slice in peer_slices.iteritems():
379             # Bind any newly cached foreign slices to peer
380             if peer_slice_id not in old_peer_slices:
381                 peer.add_slice(slice, peer_slice_id, commit = False)
382                 slice['peer_id'] = peer_id
383                 slice['peer_slice_id'] = peer_slice_id
384                 slice['node_ids'] = []
385                 slice['person_ids'] = []
386
387             # Slice as viewed by peer
388             peer_slice = slices_at_peer[peer_slice_id]
389
390             # Nodes that are currently part of the slice
391             old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
392                                    if node_transcoder[node_id] in peer_nodes]
393
394             # Nodes that should be part of the slice
395             slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
396
397             # Remove stale nodes from slice
398             for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
399                 slice.remove_node(peer_nodes[node_id], commit = False)
400                 print >> log, peer['peername'], 'Node', peer_nodes[node_id]['hostname'], 'removed from', slice['name']
401
402             # Add new nodes to slice
403             for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
404                 slice.add_node(peer_nodes[node_id], commit = False)
405                 print >> log, peer['peername'], 'Node', peer_nodes[node_id]['hostname'], 'added into', slice['name']
406
407             # N.B.: Local nodes that may have been added to the slice
408             # by hand, are removed. In other words, don't do this.
409
410             # Foreign users that are currently part of the slice
411             old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
412                                      if person_transcoder[person_id] in peer_persons]
413
414             # Foreign users that should be part of the slice
415             slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
416
417             # Remove stale users from slice
418             for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
419                 slice.remove_person(peer_persons[person_id], commit = False)
420                 print >> log, peer['peername'], 'User', peer_persons[person_id]['email'], 'removed from', slice['name']
421
422             # Add new users to slice
423             for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
424                 slice.add_person(peer_persons[person_id], commit = False)
425                 print >> log, peer['peername'], 'User', peer_persons[person_id]['email'], 'added into', slice['name']
426
427             # N.B.: Local users that may have been added to the slice
428             # by hand, are not touched.
429
430         timers['slices'] = time.time() - start
431
432         # Update peer itself and commit
433         peer.sync(commit = True)
434
435         return timers