fixes transcoding issue for nodes - local nodes were transcoded to None
[plcapi.git] / PLC / Methods / RefreshPeer.py
1 #
2 # Thierry Parmentelat - INRIA
3
4
5 import time
6
7 from PLC.Debug import log
8 from PLC.Faults import *
9 from PLC.Method import Method
10 from PLC.Parameter import Parameter, Mixed
11 from PLC.Auth import Auth
12
13 from PLC.Peers import Peer, Peers
14 from PLC.Sites import Site, Sites
15 from PLC.Persons import Person, Persons
16 from PLC.KeyTypes import KeyType, KeyTypes
17 from PLC.Keys import Key, Keys
18 from PLC.BootStates import BootState, BootStates
19 from PLC.Nodes import Node, Nodes
20 from PLC.SliceInstantiations import SliceInstantiations
21 from PLC.Slices import Slice, Slices
22
23 class RefreshPeer(Method):
24     """
25     Fetches node and slice data from the specified peer and caches it
26     locally; also deletes stale entries. Returns 1 if successful,
27     faults otherwise.
28     """
29
30     roles = ['admin']
31
32     accepts = [
33         Auth(),
34         Mixed(Peer.fields['peer_id'],
35               Peer.fields['peername']),
36         ]
37
38     returns = Parameter(int, "1 if successful")
39
40     def call(self, auth, peer_id_or_peername):
41         # Get peer
42         peers = Peers(self.api, [peer_id_or_peername])
43         if not peers:
44             raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
45         peer = peers[0]
46         peer_id = peer['peer_id']
47
48         # Connect to peer API
49         peer.connect()
50
51         timers = {}
52
53         # Get peer data
54         start = time.time()
55         peer_tables = peer.GetPeerData()
56         timers['transport'] = time.time() - start - peer_tables['db_time']
57         timers['peer_db'] = peer_tables['db_time']
58
59         def sync(objects, peer_objects, classobj):
60             """
61             Synchronizes two dictionaries of objects. objects should
62             be a dictionary of local objects keyed on their foreign
63             identifiers. peer_objects should be a dictionary of
64             foreign objects keyed on their local (i.e., foreign to us)
65             identifiers. Returns a final dictionary of local objects
66             keyed on their foreign identifiers.
67             """
68
69             synced = {}
70
71             # Delete stale objects
72             for peer_object_id, object in objects.iteritems():
73                 if peer_object_id not in peer_objects:
74                     object.delete(commit = False)
75                     print classobj(self.api).__class__.__name__, "object %s deleted" % object[object.class_key]
76
77             # Add/update new/existing objects
78             for peer_object_id, peer_object in peer_objects.iteritems():
79                 if peer_object_id in objects:
80                     # Update existing object
81                     object = objects[peer_object_id]
82
83                     # Replace foreign identifier with existing local
84                     # identifier temporarily for the purposes of
85                     # comparison.
86                     peer_object[object.primary_key] = object[object.primary_key]
87
88                     # Must use __eq__() instead of == since
89                     # peer_object may be a raw dict instead of a Peer
90                     # object.
91                     if not object.__eq__(peer_object):
92                         # Only update intrinsic fields
93                         object.update(object.db_fields(peer_object))
94                         sync = True
95                         dbg = "changed"
96                     else:
97                         sync = False
98                         dbg = None
99
100                     # Restore foreign identifier
101                     peer_object[object.primary_key] = peer_object_id
102                 else:
103                     # Add new object
104                     object = classobj(self.api, peer_object)
105                     # Replace foreign identifier with new local identifier
106                     del object[object.primary_key]
107                     sync = True
108                     dbg = "added"
109
110                 if sync:
111                     try:
112                         object.sync(commit = False)
113                     except PLCInvalidArgument, err:
114                         # Skip if validation fails
115                         # XXX Log an event instead of printing to logfile
116                         print >> log, "Warning: Skipping invalid", \
117                               peer['peername'], object.__class__.__name__, \
118                               ":", peer_object, ":", err
119                         continue
120
121                 synced[peer_object_id] = object
122
123                 if dbg:
124                     print >> log, peer['peername'], classobj(self.api).__class__.__name__, \
125                         object[object.class_key], object[object.primary_key], dbg
126
127             return synced
128
129         #
130         # Synchronize foreign sites
131         #
132
133         start = time.time()
134
135         # Compare only the columns returned by the GetPeerData() call
136         if peer_tables['Sites']:
137             columns = peer_tables['Sites'][0].keys()
138         else:
139             columns = None
140
141         # Keyed on foreign site_id
142         old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
143         sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
144
145         # Synchronize new set (still keyed on foreign site_id)
146         peer_sites = sync(old_peer_sites, sites_at_peer, Site)
147
148         for peer_site_id, site in peer_sites.iteritems():
149             # Bind any newly cached sites to peer
150             if peer_site_id not in old_peer_sites:
151                 peer.add_site(site, peer_site_id, commit = False)
152                 site['peer_id'] = peer_id
153                 site['peer_site_id'] = peer_site_id
154
155         timers['site'] = time.time() - start
156
157         #
158         # XXX Synchronize foreign key types
159         #
160
161         key_types = KeyTypes(self.api).dict()
162
163         #
164         # Synchronize foreign keys
165         #
166
167         start = time.time()
168
169         # Compare only the columns returned by the GetPeerData() call
170         if peer_tables['Keys']:
171             columns = peer_tables['Keys'][0].keys()
172         else:
173             columns = None
174
175         # Keyed on foreign key_id
176         old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
177         keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
178
179         # Fix up key_type references
180         for peer_key_id, key in keys_at_peer.items():
181             if key['key_type'] not in key_types:
182                 # XXX Log an event instead of printing to logfile
183                 print >> log, "Warning: Skipping invalid %s key:" % peer['peername'], \
184                       key, ": invalid key type", key['key_type']
185                 del keys_at_peer[peer_key_id]
186                 continue
187
188         # Synchronize new set (still keyed on foreign key_id)
189         peer_keys = sync(old_peer_keys, keys_at_peer, Key)
190         for peer_key_id, key in peer_keys.iteritems():
191             # Bind any newly cached keys to peer
192             if peer_key_id not in old_peer_keys:
193                 peer.add_key(key, peer_key_id, commit = False)
194                 key['peer_id'] = peer_id
195                 key['peer_key_id'] = peer_key_id
196
197         timers['keys'] = time.time() - start
198
199         #
200         # Synchronize foreign users
201         #
202
203         start = time.time()
204
205         # Compare only the columns returned by the GetPeerData() call
206         if peer_tables['Persons']:
207             columns = peer_tables['Persons'][0].keys()
208         else:
209             columns = None
210
211         # Keyed on foreign person_id
212         old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
213         persons_at_peer = dict([(peer_person['person_id'], peer_person) \
214                                 for peer_person in peer_tables['Persons']])
215
216         # XXX Do we care about membership in foreign site(s)?
217
218         # Synchronize new set (still keyed on foreign person_id)
219         peer_persons = sync(old_peer_persons, persons_at_peer, Person)
220
221         # transcoder : retrieve a local key_id from a peer_key_id
222         key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
223                                   for peer_key_id,key in peer_keys.iteritems()])
224
225         for peer_person_id, person in peer_persons.iteritems():
226             # Bind any newly cached users to peer
227             if peer_person_id not in old_peer_persons:
228                 peer.add_person(person, peer_person_id, commit = False)
229                 person['peer_id'] = peer_id
230                 person['peer_person_id'] = peer_person_id
231                 person['key_ids'] = []
232
233             # User as viewed by peer
234             peer_person = persons_at_peer[peer_person_id]
235             
236             # Foreign keys currently belonging to the user
237             old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
238                                   if key_transcoder[key_id] in peer_keys]
239
240             # Foreign keys that should belong to the user
241             # this is basically peer_person['key_ids'], we just check it makes sense 
242             # (e.g. we might have failed importing it)
243             person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
244
245             # Remove stale keys from user
246             for key_id in (set(old_person_key_ids) - set(person_key_ids)):
247                 person.remove_key(peer_keys[key_id], commit = False)
248                 print >> log, peer['peername'], 'Key', key_id, 'removed from', person['email']
249
250             # Add new keys to user
251             for key_id in (set(person_key_ids) - set(old_person_key_ids)):
252                 person.add_key(peer_keys[key_id], commit = False)
253                 print >> log, peer['peername'], 'Key', key_id, 'added into', person['email']
254
255         timers['persons'] = time.time() - start
256
257         #
258         # XXX Synchronize foreign boot states
259         #
260
261         boot_states = BootStates(self.api).dict()
262
263         #
264         # Synchronize foreign nodes
265         #
266
267         start = time.time()
268
269         # Compare only the columns returned by the GetPeerData() call
270         if peer_tables['Nodes']:
271             columns = peer_tables['Nodes'][0].keys()
272         else:
273             columns = None
274
275         # Keyed on foreign node_id
276         old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
277         nodes_at_peer = dict([(node['node_id'], node) \
278                               for node in peer_tables['Nodes']])
279
280         # Fix up site_id and boot_states references
281         for peer_node_id, node in nodes_at_peer.items():
282             errors = []
283             if node['site_id'] not in peer_sites:
284                 errors.append("invalid site %d" % node['site_id'])
285             if node['boot_state'] not in boot_states:
286                 errors.append("invalid boot state %s" % node['boot_state'])
287             if errors:
288                 # XXX Log an event instead of printing to logfile
289                 print >> log, "Warning: Skipping invalid %s node:" % peer['peername'], \
290                       node, ":", ", ".join(errors)
291                 del nodes_at_peer[peer_node_id]
292                 continue
293             else:
294                 node['site_id'] = peer_sites[node['site_id']]['site_id']
295
296         # Synchronize new set
297         peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
298
299         for peer_node_id, node in peer_nodes.iteritems():
300             # Bind any newly cached foreign nodes to peer
301             if peer_node_id not in old_peer_nodes:
302                 peer.add_node(node, peer_node_id, commit = False)
303                 node['peer_id'] = peer_id
304                 node['peer_node_id'] = peer_node_id
305
306         timers['nodes'] = time.time() - start
307
308         #
309         # Synchronize local nodes
310         #
311
312         start = time.time()
313
314         # Keyed on local node_id
315         local_nodes = Nodes(self.api).dict()
316
317         for node in peer_tables['PeerNodes']:
318             # Foreign identifier for our node as maintained by peer
319             peer_node_id = node['node_id']
320             # Local identifier for our node as cached by peer
321             node_id = node['peer_node_id']
322             if node_id in local_nodes:
323                 # Still a valid local node, add it to the synchronized
324                 # set of local node objects keyed on foreign node_id.
325                 peer_nodes[peer_node_id] = local_nodes[node_id]
326
327         timers['local_nodes'] = time.time() - start
328
329         #
330         # XXX Synchronize foreign slice instantiation states
331         #
332
333         slice_instantiations = SliceInstantiations(self.api).dict()
334
335         #
336         # Synchronize foreign slices
337         #
338
339         start = time.time()
340
341         # Compare only the columns returned by the GetPeerData() call
342         if peer_tables['Slices']:
343             columns = peer_tables['Slices'][0].keys()
344         else:
345             columns = None
346
347         # Keyed on foreign slice_id
348         old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
349         slices_at_peer = dict([(slice['slice_id'], slice) \
350                                for slice in peer_tables['Slices']])
351
352         # Fix up site_id, instantiation, and creator_person_id references
353         for peer_slice_id, slice in slices_at_peer.items():
354             errors = []
355             if slice['site_id'] not in peer_sites:
356                 errors.append("invalid site %d" % slice['site_id'])
357             if slice['instantiation'] not in slice_instantiations:
358                 errors.append("invalid instantiation %s" % slice['instantiation'])
359             if slice['creator_person_id'] not in peer_persons:
360                 # Just NULL it out
361                 slice['creator_person_id'] = None
362             else:
363                 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
364             if errors:
365                 print >> log, "Warning: Skipping invalid %s slice:" % peer['peername'], \
366                       slice, ":", ", ".join(errors)
367                 del slices_at_peer[peer_slice_id]
368                 continue
369             else:
370                 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
371
372         # Synchronize new set
373         peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
374
375         # transcoder : retrieve a local node_id from a peer_node_id
376         node_transcoder = dict ( [ (node['node_id'],peer_node_id) \
377                                    for peer_node_id,node in peer_nodes.iteritems()])
378         person_transcoder = dict ( [ (person['person_id'],peer_person_id) \
379                                      for peer_person_id,person in peer_persons.iteritems()])
380
381         for peer_slice_id, slice in peer_slices.iteritems():
382             # Bind any newly cached foreign slices to peer
383             if peer_slice_id not in old_peer_slices:
384                 peer.add_slice(slice, peer_slice_id, commit = False)
385                 slice['peer_id'] = peer_id
386                 slice['peer_slice_id'] = peer_slice_id
387                 slice['node_ids'] = []
388                 slice['person_ids'] = []
389
390             # Slice as viewed by peer
391             peer_slice = slices_at_peer[peer_slice_id]
392
393             # Nodes that are currently part of the slice
394             old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
395                                    if node_transcoder[node_id] in peer_nodes]
396
397             # Nodes that should be part of the slice
398             slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
399
400             # Remove stale nodes from slice
401             for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
402                 slice.remove_node(peer_nodes[node_id], commit = False)
403                 print >> log, peer['peername'], 'Node', peer_nodes[node_id]['hostname'], 'removed from', slice['name']
404
405             # Add new nodes to slice
406             for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
407                 slice.add_node(peer_nodes[node_id], commit = False)
408                 print >> log, peer['peername'], 'Node', peer_nodes[node_id]['hostname'], 'added into', slice['name']
409
410             # N.B.: Local nodes that may have been added to the slice
411             # by hand, are removed. In other words, don't do this.
412
413             # Foreign users that are currently part of the slice
414             old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
415                                      if person_transcoder[person_id] in peer_persons]
416
417             # Foreign users that should be part of the slice
418             slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
419
420             # Remove stale users from slice
421             for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
422                 slice.remove_person(peer_persons[person_id], commit = False)
423                 print >> log, peer['peername'], 'User', peer_persons[person_id]['email'], 'removed from', slice['name']
424
425             # Add new users to slice
426             for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
427                 slice.add_person(peer_persons[person_id], commit = False)
428                 print >> log, peer['peername'], 'User', peer_persons[person_id]['email'], 'added into', slice['name']
429
430             # N.B.: Local users that may have been added to the slice
431             # by hand, are not touched.
432
433         timers['slices'] = time.time() - start
434
435         # Update peer itself and commit
436         peer.sync(commit = True)
437
438         return timers