log revisited for showing plain info rather than abstruse ids
[plcapi.git] / PLC / Methods / RefreshPeer.py
1 #
2 # Thierry Parmentelat - INRIA
3
4
5 import time
6
7 from PLC.Debug import log
8 from PLC.Faults import *
9 from PLC.Method import Method
10 from PLC.Parameter import Parameter, Mixed
11 from PLC.Auth import Auth
12
13 from PLC.Peers import Peer, Peers
14 from PLC.Sites import Site, Sites
15 from PLC.Persons import Person, Persons
16 from PLC.KeyTypes import KeyType, KeyTypes
17 from PLC.Keys import Key, Keys
18 from PLC.BootStates import BootState, BootStates
19 from PLC.Nodes import Node, Nodes
20 from PLC.SliceInstantiations import SliceInstantiations
21 from PLC.Slices import Slice, Slices
22
23 class RefreshPeer(Method):
24     """
25     Fetches node and slice data from the specified peer and caches it
26     locally; also deletes stale entries. Returns 1 if successful,
27     faults otherwise.
28     """
29
30     roles = ['admin']
31
32     accepts = [
33         Auth(),
34         Mixed(Peer.fields['peer_id'],
35               Peer.fields['peername']),
36         ]
37
38     returns = Parameter(int, "1 if successful")
39
40     def call(self, auth, peer_id_or_peername):
41         # Get peer
42         peers = Peers(self.api, [peer_id_or_peername])
43         if not peers:
44             raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
45         peer = peers[0]
46         peer_id = peer['peer_id']
47
48         # Connect to peer API
49         peer.connect()
50
51         timers = {}
52
53         # Get peer data
54         start = time.time()
55         peer_tables = peer.GetPeerData()
56         timers['transport'] = time.time() - start - peer_tables['db_time']
57         timers['peer_db'] = peer_tables['db_time']
58
59         def sync(objects, peer_objects, classobj):
60             """
61             Synchronizes two dictionaries of objects. objects should
62             be a dictionary of local objects keyed on their foreign
63             identifiers. peer_objects should be a dictionary of
64             foreign objects keyed on their local (i.e., foreign to us)
65             identifiers. Returns a final dictionary of local objects
66             keyed on their foreign identifiers.
67             """
68
69             synced = {}
70
71             # Delete stale objects
72             for peer_object_id, object in objects.iteritems():
73                 if peer_object_id not in peer_objects:
74                     object.delete(commit = False)
75                     print classobj, "object %d deleted" % object[object.primary_key]
76
77             # Add/update new/existing objects
78             for peer_object_id, peer_object in peer_objects.iteritems():
79                 if peer_object_id in objects:
80                     # Update existing object
81                     object = objects[peer_object_id]
82
83                     # Replace foreign identifier with existing local
84                     # identifier temporarily for the purposes of
85                     # comparison.
86                     peer_object[object.primary_key] = object[object.primary_key]
87
88                     # Must use __eq__() instead of == since
89                     # peer_object may be a raw dict instead of a Peer
90                     # object.
91                     if not object.__eq__(peer_object):
92                         # Only update intrinsic fields
93                         object.update(object.db_fields(peer_object))
94                         sync = True
95                         dbg = "changed"
96                     else:
97                         sync = False
98                         dbg = None
99
100                     # Restore foreign identifier
101                     peer_object[object.primary_key] = peer_object_id
102                 else:
103                     # Add new object
104                     object = classobj(self.api, peer_object)
105                     # Replace foreign identifier with new local identifier
106                     del object[object.primary_key]
107                     sync = True
108                     dbg = "added"
109
110                 if sync:
111                     try:
112                         object.sync(commit = False)
113                     except PLCInvalidArgument, err:
114                         # Skip if validation fails
115                         # XXX Log an event instead of printing to logfile
116                         print >> log, "Warning: Skipping invalid", \
117                               peer['peername'], object.__class__.__name__, \
118                               ":", peer_object, ":", err
119                         continue
120
121                 synced[peer_object_id] = object
122
123                 if dbg:
124                     print >> log, peer['peername'], classobj(self.api).__class__.__name__, \
125                         object[object.class_key], object[object.primary_key], dbg
126
127             return synced
128
129         #
130         # Synchronize foreign sites
131         #
132
133         start = time.time()
134
135         # Compare only the columns returned by the GetPeerData() call
136         if peer_tables['Sites']:
137             columns = peer_tables['Sites'][0].keys()
138         else:
139             columns = None
140
141         # Keyed on foreign site_id
142         old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
143         sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
144
145         # Synchronize new set (still keyed on foreign site_id)
146         peer_sites = sync(old_peer_sites, sites_at_peer, Site)
147
148         for peer_site_id, site in peer_sites.iteritems():
149             # Bind any newly cached sites to peer
150             if peer_site_id not in old_peer_sites:
151                 peer.add_site(site, peer_site_id, commit = False)
152                 site['peer_id'] = peer_id
153                 site['peer_site_id'] = peer_site_id
154
155         timers['site'] = time.time() - start
156
157         #
158         # XXX Synchronize foreign key types
159         #
160
161         key_types = KeyTypes(self.api).dict()
162
163         #
164         # Synchronize foreign keys
165         #
166
167         start = time.time()
168
169         # Compare only the columns returned by the GetPeerData() call
170         if peer_tables['Keys']:
171             columns = peer_tables['Keys'][0].keys()
172         else:
173             columns = None
174
175         # Keyed on foreign key_id
176         old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
177         keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
178
179         # Fix up key_type references
180         for peer_key_id, key in keys_at_peer.items():
181             if key['key_type'] not in key_types:
182                 # XXX Log an event instead of printing to logfile
183                 print >> log, "Warning: Skipping invalid %s key:" % peer['peername'], \
184                       key, ": invalid key type", key['key_type']
185                 del keys_at_peer[peer_key_id]
186                 continue
187
188         # Synchronize new set (still keyed on foreign key_id)
189         peer_keys = sync(old_peer_keys, keys_at_peer, Key)
190         for peer_key_id, key in peer_keys.iteritems():
191             # Bind any newly cached keys to peer
192             if peer_key_id not in old_peer_keys:
193                 peer.add_key(key, peer_key_id, commit = False)
194                 key['peer_id'] = peer_id
195                 key['peer_key_id'] = peer_key_id
196
197         timers['keys'] = time.time() - start
198
199         #
200         # Synchronize foreign users
201         #
202
203         start = time.time()
204
205         # Compare only the columns returned by the GetPeerData() call
206         if peer_tables['Persons']:
207             columns = peer_tables['Persons'][0].keys()
208         else:
209             columns = None
210
211         # Keyed on foreign person_id
212         old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
213         persons_at_peer = dict([(peer_person['person_id'], peer_person) \
214                                 for peer_person in peer_tables['Persons']])
215
216         # XXX Do we care about membership in foreign site(s)?
217
218         # Synchronize new set (still keyed on foreign person_id)
219         peer_persons = sync(old_peer_persons, persons_at_peer, Person)
220
221         # transcoder : retrieve a local key_id from a peer_key_id
222         key_transcoder = dict ( [ (key['key_id'],key['peer_key_id']) for key in peer_keys.values()])
223
224         for peer_person_id, person in peer_persons.iteritems():
225             # Bind any newly cached users to peer
226             if peer_person_id not in old_peer_persons:
227                 peer.add_person(person, peer_person_id, commit = False)
228                 person['peer_id'] = peer_id
229                 person['peer_person_id'] = peer_person_id
230                 person['key_ids'] = []
231
232             # User as viewed by peer
233             peer_person = persons_at_peer[peer_person_id]
234             
235             # Foreign keys currently belonging to the user
236             old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids']]
237
238             # Foreign keys that should belong to the user
239             # this is basically peer_person['key_ids'], we just check it makes sense 
240             # (e.g. we might have failed importing it)
241             person_key_ids = [ x for x in peer_person['key_ids'] if x in peer_keys]
242
243             # Remove stale keys from user
244             for key_id in (set(old_person_key_ids) - set(person_key_ids)):
245                 person.remove_key(peer_keys[key_id], commit = False)
246                 print >> log, peer['peername'], 'Key', key_id, 'removed from', person['email']
247
248             # Add new keys to user
249             for key_id in (set(person_key_ids) - set(old_person_key_ids)):
250                 person.add_key(peer_keys[key_id], commit = False)
251                 print >> log, peer['peername'], 'Key', key_id, 'added into', person['email']
252
253         timers['persons'] = time.time() - start
254
255         #
256         # XXX Synchronize foreign boot states
257         #
258
259         boot_states = BootStates(self.api).dict()
260
261         #
262         # Synchronize foreign nodes
263         #
264
265         start = time.time()
266
267         # Compare only the columns returned by the GetPeerData() call
268         if peer_tables['Nodes']:
269             columns = peer_tables['Nodes'][0].keys()
270         else:
271             columns = None
272
273         # Keyed on foreign node_id
274         old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
275         nodes_at_peer = dict([(node['node_id'], node) \
276                               for node in peer_tables['Nodes']])
277
278         # Fix up site_id and boot_states references
279         for peer_node_id, node in nodes_at_peer.items():
280             errors = []
281             if node['site_id'] not in peer_sites:
282                 errors.append("invalid site %d" % node['site_id'])
283             if node['boot_state'] not in boot_states:
284                 errors.append("invalid boot state %s" % node['boot_state'])
285             if errors:
286                 # XXX Log an event instead of printing to logfile
287                 print >> log, "Warning: Skipping invalid %s node:" % peer['peername'], \
288                       node, ":", ", ".join(errors)
289                 del nodes_at_peer[peer_node_id]
290                 continue
291             else:
292                 node['site_id'] = peer_sites[node['site_id']]['site_id']
293
294         # Synchronize new set
295         peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
296
297         for peer_node_id, node in peer_nodes.iteritems():
298             # Bind any newly cached foreign nodes to peer
299             if peer_node_id not in old_peer_nodes:
300                 peer.add_node(node, peer_node_id, commit = False)
301                 node['peer_id'] = peer_id
302                 node['peer_node_id'] = peer_node_id
303
304         timers['nodes'] = time.time() - start
305
306         #
307         # Synchronize local nodes
308         #
309
310         start = time.time()
311
312         # Keyed on local node_id
313         local_nodes = Nodes(self.api).dict()
314
315         for node in peer_tables['PeerNodes']:
316             # Foreign identifier for our node as maintained by peer
317             peer_node_id = node['node_id']
318             # Local identifier for our node as cached by peer
319             node_id = node['peer_node_id']
320             if node_id in local_nodes:
321                 # Still a valid local node, add it to the synchronized
322                 # set of local node objects keyed on foreign node_id.
323                 peer_nodes[peer_node_id] = local_nodes[node_id]
324
325         timers['local_nodes'] = time.time() - start
326
327         #
328         # XXX Synchronize foreign slice instantiation states
329         #
330
331         slice_instantiations = SliceInstantiations(self.api).dict()
332
333         #
334         # Synchronize foreign slices
335         #
336
337         start = time.time()
338
339         # Compare only the columns returned by the GetPeerData() call
340         if peer_tables['Slices']:
341             columns = peer_tables['Slices'][0].keys()
342         else:
343             columns = None
344
345         # Keyed on foreign slice_id
346         old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
347         slices_at_peer = dict([(slice['slice_id'], slice) \
348                                for slice in peer_tables['Slices']])
349
350         # Fix up site_id, instantiation, and creator_person_id references
351         for peer_slice_id, slice in slices_at_peer.items():
352             errors = []
353             if slice['site_id'] not in peer_sites:
354                 errors.append("invalid site %d" % slice['site_id'])
355             if slice['instantiation'] not in slice_instantiations:
356                 errors.append("invalid instantiation %s" % slice['instantiation'])
357             if slice['creator_person_id'] not in peer_persons:
358                 # Just NULL it out
359                 slice['creator_person_id'] = None
360             else:
361                 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
362             if errors:
363                 print >> log, "Warning: Skipping invalid %s slice:" % peer['peername'], \
364                       slice, ":", ", ".join(errors)
365                 del slices_at_peer[peer_slice_id]
366                 continue
367             else:
368                 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
369
370         # Synchronize new set
371         peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
372
373         # transcoder : retrieve a local node_id from a peer_node_id
374         node_transcoder = dict ( [ (node['node_id'],node['peer_node_id']) for node in peer_nodes.values()])
375         person_transcoder = dict ( [ (person['person_id'],person['peer_person_id']) for person in peer_persons.values()])
376
377         for peer_slice_id, slice in peer_slices.iteritems():
378             # Bind any newly cached foreign slices to peer
379             if peer_slice_id not in old_peer_slices:
380                 peer.add_slice(slice, peer_slice_id, commit = False)
381                 slice['peer_id'] = peer_id
382                 slice['peer_slice_id'] = peer_slice_id
383                 slice['node_ids'] = []
384                 slice['person_ids'] = []
385
386             # Slice as viewed by peer
387             peer_slice = slices_at_peer[peer_slice_id]
388
389             # Nodes that are currently part of the slice
390             old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids']]
391
392             # Nodes that should be part of the slice
393             slice_node_ids = [ x for x in peer_slice['node_ids'] if x in peer_nodes]
394
395             # Remove stale nodes from slice
396             for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
397                 slice.remove_node(peer_nodes[node_id], commit = False)
398                 print >> log, peer['peername'], 'Node', peer_nodes[node_id]['hostname'], 'removed from', slice['name']
399
400             # Add new nodes to slice
401             for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
402                 slice.add_node(peer_nodes[node_id], commit = False)
403                 print >> log, peer['peername'], 'Node', peer_nodes[node_id]['hostname'], 'added into', slice['name']
404
405             # N.B.: Local nodes that may have been added to the slice
406             # by hand, are removed. In other words, don't do this.
407
408             # Foreign users that are currently part of the slice
409             old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids']]
410
411             # Foreign users that should be part of the slice
412             slice_person_ids = [ x for x in peer_slice['person_ids'] if x in peer_persons ]
413
414             # Remove stale users from slice
415             for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
416                 slice.remove_person(peer_persons[person_id], commit = False)
417                 print >> log, peer['peername'], 'User', peer_persons[person_id]['email'], 'removed from', slice['name']
418
419             # Add new users to slice
420             for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
421                 slice.add_person(peer_persons[person_id], commit = False)
422                 print >> log, peer['peername'], 'User', peer_persons[person_id]['email'], 'added into', slice['name']
423
424             # N.B.: Local users that may have been added to the slice
425             # by hand, are not touched.
426
427         timers['slices'] = time.time() - start
428
429         # Update peer itself and commit
430         peer.sync(commit = True)
431
432         return timers