logs changes in the xrefs as well
[plcapi.git] / PLC / Methods / RefreshPeer.py
1 #
2 # Thierry Parmentelat - INRIA
3
4
5 import time
6
7 from PLC.Debug import log
8 from PLC.Faults import *
9 from PLC.Method import Method
10 from PLC.Parameter import Parameter, Mixed
11 from PLC.Auth import Auth
12
13 from PLC.Peers import Peer, Peers
14 from PLC.Sites import Site, Sites
15 from PLC.Persons import Person, Persons
16 from PLC.KeyTypes import KeyType, KeyTypes
17 from PLC.Keys import Key, Keys
18 from PLC.BootStates import BootState, BootStates
19 from PLC.Nodes import Node, Nodes
20 from PLC.SliceInstantiations import SliceInstantiations
21 from PLC.Slices import Slice, Slices
22
23 class RefreshPeer(Method):
24     """
25     Fetches node and slice data from the specified peer and caches it
26     locally; also deletes stale entries. Returns 1 if successful,
27     faults otherwise.
28     """
29
30     roles = ['admin']
31
32     accepts = [
33         Auth(),
34         Mixed(Peer.fields['peer_id'],
35               Peer.fields['peername']),
36         ]
37
38     returns = Parameter(int, "1 if successful")
39
40     def call(self, auth, peer_id_or_peername):
41         # Get peer
42         peers = Peers(self.api, [peer_id_or_peername])
43         if not peers:
44             raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
45         peer = peers[0]
46         peer_id = peer['peer_id']
47
48         # Connect to peer API
49         peer.connect()
50
51         timers = {}
52
53         # Get peer data
54         start = time.time()
55         peer_tables = peer.GetPeerData()
56         timers['transport'] = time.time() - start - peer_tables['db_time']
57         timers['peer_db'] = peer_tables['db_time']
58
59         def sync(objects, peer_objects, classobj):
60             """
61             Synchronizes two dictionaries of objects. objects should
62             be a dictionary of local objects keyed on their foreign
63             identifiers. peer_objects should be a dictionary of
64             foreign objects keyed on their local (i.e., foreign to us)
65             identifiers. Returns a final dictionary of local objects
66             keyed on their foreign identifiers.
67             """
68
69             synced = {}
70
71             # Delete stale objects
72             for peer_object_id, object in objects.iteritems():
73                 if peer_object_id not in peer_objects:
74                     object.delete(commit = False)
75                     print classobj, "object %d deleted" % object[object.primary_key]
76
77             # Add/update new/existing objects
78             for peer_object_id, peer_object in peer_objects.iteritems():
79                 if peer_object_id in objects:
80                     # Update existing object
81                     object = objects[peer_object_id]
82
83                     # Replace foreign identifier with existing local
84                     # identifier temporarily for the purposes of
85                     # comparison.
86                     peer_object[object.primary_key] = object[object.primary_key]
87
88                     # Must use __eq__() instead of == since
89                     # peer_object may be a raw dict instead of a Peer
90                     # object.
91                     if not object.__eq__(peer_object):
92                         # Only update intrinsic fields
93                         object.update(object.db_fields(peer_object))
94                         sync = True
95                         dbg = "changed"
96                     else:
97                         sync = False
98                         dbg = None
99
100                     # Restore foreign identifier
101                     peer_object[object.primary_key] = peer_object_id
102                 else:
103                     # Add new object
104                     object = classobj(self.api, peer_object)
105                     # Replace foreign identifier with new local identifier
106                     del object[object.primary_key]
107                     sync = True
108                     dbg = "added"
109
110                 if sync:
111                     try:
112                         object.sync(commit = False)
113                     except PLCInvalidArgument, err:
114                         # Skip if validation fails
115                         # XXX Log an event instead of printing to logfile
116                         print >> log, "Warning: Skipping invalid", \
117                               peer['peername'], object.__class__.__name__, \
118                               ":", peer_object, ":", err
119                         continue
120
121                 synced[peer_object_id] = object
122
123                 if dbg:
124                     print >> log, peer['peername'], classobj(self.api).__class__.__name__, object[object.primary_key], dbg
125
126             return synced
127
128         #
129         # Synchronize foreign sites
130         #
131
132         start = time.time()
133
134         # Compare only the columns returned by the GetPeerData() call
135         if peer_tables['Sites']:
136             columns = peer_tables['Sites'][0].keys()
137         else:
138             columns = None
139
140         # Keyed on foreign site_id
141         old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
142         sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
143
144         # Synchronize new set (still keyed on foreign site_id)
145         peer_sites = sync(old_peer_sites, sites_at_peer, Site)
146
147         for peer_site_id, site in peer_sites.iteritems():
148             # Bind any newly cached sites to peer
149             if peer_site_id not in old_peer_sites:
150                 peer.add_site(site, peer_site_id, commit = False)
151                 site['peer_id'] = peer_id
152                 site['peer_site_id'] = peer_site_id
153
154         timers['site'] = time.time() - start
155
156         #
157         # XXX Synchronize foreign key types
158         #
159
160         key_types = KeyTypes(self.api).dict()
161
162         #
163         # Synchronize foreign keys
164         #
165
166         start = time.time()
167
168         # Compare only the columns returned by the GetPeerData() call
169         if peer_tables['Keys']:
170             columns = peer_tables['Keys'][0].keys()
171         else:
172             columns = None
173
174         # Keyed on foreign key_id
175         old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
176         keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
177
178         # Fix up key_type references
179         for peer_key_id, key in keys_at_peer.items():
180             if key['key_type'] not in key_types:
181                 # XXX Log an event instead of printing to logfile
182                 print >> log, "Warning: Skipping invalid %s key:" % peer['peername'], \
183                       key, ": invalid key type", key['key_type']
184                 del keys_at_peer[peer_key_id]
185                 continue
186
187         # Synchronize new set (still keyed on foreign key_id)
188         peer_keys = sync(old_peer_keys, keys_at_peer, Key)
189         for peer_key_id, key in peer_keys.iteritems():
190             # Bind any newly cached keys to peer
191             if peer_key_id not in old_peer_keys:
192                 peer.add_key(key, peer_key_id, commit = False)
193                 key['peer_id'] = peer_id
194                 key['peer_key_id'] = peer_key_id
195
196         timers['keys'] = time.time() - start
197
198         #
199         # Synchronize foreign users
200         #
201
202         start = time.time()
203
204         # Compare only the columns returned by the GetPeerData() call
205         if peer_tables['Persons']:
206             columns = peer_tables['Persons'][0].keys()
207         else:
208             columns = None
209
210         # Keyed on foreign person_id
211         old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
212         persons_at_peer = dict([(peer_person['person_id'], peer_person) \
213                                 for peer_person in peer_tables['Persons']])
214
215         # XXX Do we care about membership in foreign site(s)?
216
217         # Synchronize new set (still keyed on foreign person_id)
218         peer_persons = sync(old_peer_persons, persons_at_peer, Person)
219
220         # transcoder : retrieve a local key_id from a peer_key_id
221         key_transcoder = dict ( [ (key['key_id'],key['peer_key_id']) for key in peer_keys.values()])
222
223         for peer_person_id, person in peer_persons.iteritems():
224             # Bind any newly cached users to peer
225             if peer_person_id not in old_peer_persons:
226                 peer.add_person(person, peer_person_id, commit = False)
227                 person['peer_id'] = peer_id
228                 person['peer_person_id'] = peer_person_id
229                 person['key_ids'] = []
230
231             # User as viewed by peer
232             peer_person = persons_at_peer[peer_person_id]
233             
234             # Foreign keys currently belonging to the user
235             old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids']]
236
237             # Foreign keys that should belong to the user
238             # this is basically peer_person['key_ids'], we just check it makes sense 
239             # (e.g. we might have failed importing it)
240             person_key_ids = [ x for x in peer_person['key_ids'] if x in peer_keys]
241
242             # Remove stale keys from user
243             for key_id in (set(old_person_key_ids) - set(person_key_ids)):
244                 person.remove_key(peer_keys[key_id], commit = False)
245                 print >> log, peer['peername'], 'Key', key_id, 'removed from', person['email']
246
247             # Add new keys to user
248             for key_id in (set(person_key_ids) - set(old_person_key_ids)):
249                 person.add_key(peer_keys[key_id], commit = False)
250                 print >> log, peer['peername'], 'Key', key_id, 'added into', person['email']
251
252         timers['persons'] = time.time() - start
253
254         #
255         # XXX Synchronize foreign boot states
256         #
257
258         boot_states = BootStates(self.api).dict()
259
260         #
261         # Synchronize foreign nodes
262         #
263
264         start = time.time()
265
266         # Compare only the columns returned by the GetPeerData() call
267         if peer_tables['Nodes']:
268             columns = peer_tables['Nodes'][0].keys()
269         else:
270             columns = None
271
272         # Keyed on foreign node_id
273         old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
274         nodes_at_peer = dict([(node['node_id'], node) \
275                               for node in peer_tables['Nodes']])
276
277         # Fix up site_id and boot_states references
278         for peer_node_id, node in nodes_at_peer.items():
279             errors = []
280             if node['site_id'] not in peer_sites:
281                 errors.append("invalid site %d" % node['site_id'])
282             if node['boot_state'] not in boot_states:
283                 errors.append("invalid boot state %s" % node['boot_state'])
284             if errors:
285                 # XXX Log an event instead of printing to logfile
286                 print >> log, "Warning: Skipping invalid %s node:" % peer['peername'], \
287                       node, ":", ", ".join(errors)
288                 del nodes_at_peer[peer_node_id]
289                 continue
290             else:
291                 node['site_id'] = peer_sites[node['site_id']]['site_id']
292
293         # Synchronize new set
294         peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
295
296         for peer_node_id, node in peer_nodes.iteritems():
297             # Bind any newly cached foreign nodes to peer
298             if peer_node_id not in old_peer_nodes:
299                 peer.add_node(node, peer_node_id, commit = False)
300                 node['peer_id'] = peer_id
301                 node['peer_node_id'] = peer_node_id
302
303         timers['nodes'] = time.time() - start
304
305         #
306         # Synchronize local nodes
307         #
308
309         start = time.time()
310
311         # Keyed on local node_id
312         local_nodes = Nodes(self.api).dict()
313
314         for node in peer_tables['PeerNodes']:
315             # Foreign identifier for our node as maintained by peer
316             peer_node_id = node['node_id']
317             # Local identifier for our node as cached by peer
318             node_id = node['peer_node_id']
319             if node_id in local_nodes:
320                 # Still a valid local node, add it to the synchronized
321                 # set of local node objects keyed on foreign node_id.
322                 peer_nodes[peer_node_id] = local_nodes[node_id]
323
324         timers['local_nodes'] = time.time() - start
325
326         #
327         # XXX Synchronize foreign slice instantiation states
328         #
329
330         slice_instantiations = SliceInstantiations(self.api).dict()
331
332         #
333         # Synchronize foreign slices
334         #
335
336         start = time.time()
337
338         # Compare only the columns returned by the GetPeerData() call
339         if peer_tables['Slices']:
340             columns = peer_tables['Slices'][0].keys()
341         else:
342             columns = None
343
344         # Keyed on foreign slice_id
345         old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
346         slices_at_peer = dict([(slice['slice_id'], slice) \
347                                for slice in peer_tables['Slices']])
348
349         # Fix up site_id, instantiation, and creator_person_id references
350         for peer_slice_id, slice in slices_at_peer.items():
351             errors = []
352             if slice['site_id'] not in peer_sites:
353                 errors.append("invalid site %d" % slice['site_id'])
354             if slice['instantiation'] not in slice_instantiations:
355                 errors.append("invalid instantiation %s" % slice['instantiation'])
356             if slice['creator_person_id'] not in peer_persons:
357                 # Just NULL it out
358                 slice['creator_person_id'] = None
359             else:
360                 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
361             if errors:
362                 print >> log, "Warning: Skipping invalid %s slice:" % peer['peername'], \
363                       slice, ":", ", ".join(errors)
364                 del slices_at_peer[peer_slice_id]
365                 continue
366             else:
367                 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
368
369         # Synchronize new set
370         peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
371
372         # transcoder : retrieve a local node_id from a peer_node_id
373         node_transcoder = dict ( [ (node['node_id'],node['peer_node_id']) for node in peer_nodes.values()])
374         person_transcoder = dict ( [ (person['person_id'],person['peer_person_id']) for person in peer_persons.values()])
375
376         for peer_slice_id, slice in peer_slices.iteritems():
377             # Bind any newly cached foreign slices to peer
378             if peer_slice_id not in old_peer_slices:
379                 peer.add_slice(slice, peer_slice_id, commit = False)
380                 slice['peer_id'] = peer_id
381                 slice['peer_slice_id'] = peer_slice_id
382                 slice['node_ids'] = []
383                 slice['person_ids'] = []
384
385             # Slice as viewed by peer
386             peer_slice = slices_at_peer[peer_slice_id]
387
388             # Nodes that are currently part of the slice
389             old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids']]
390
391             # Nodes that should be part of the slice
392             slice_node_ids = [ x for x in peer_slice['node_ids'] if x in peer_nodes]
393
394             # Remove stale nodes from slice
395             for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
396                 slice.remove_node(peer_nodes[node_id], commit = False)
397                 print >> log, peer['peername'], 'Node', node_id, 'removed from', slice['name']
398
399             # Add new nodes to slice
400             for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
401                 slice.add_node(peer_nodes[node_id], commit = False)
402                 print >> log, peer['peername'], 'Node', node_id, 'added into', slice['name']
403
404             # N.B.: Local nodes that may have been added to the slice
405             # by hand, are removed. In other words, don't do this.
406
407             # Foreign users that are currently part of the slice
408             old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids']]
409
410             # Foreign users that should be part of the slice
411             slice_person_ids = [ x for x in peer_slice['person_ids'] if x in peer_persons ]
412
413             # Remove stale users from slice
414             for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
415                 slice.remove_person(peer_persons[person_id], commit = False)
416                 print >> log, peer['peername'], 'User', person_id, 'removed from', slice['name']
417
418             # Add new users to slice
419             for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
420                 slice.add_person(peer_persons[person_id], commit = False)
421                 print >> log, peer['peername'], 'User', person_id, 'added into', slice['name']
422
423             # N.B.: Local users that may have been added to the slice
424             # by hand, are not touched.
425
426         timers['slices'] = time.time() - start
427
428         # Update peer itself and commit
429         peer.sync(commit = True)
430
431         return timers