Bug fix
[plcapi.git] / PLC / Methods / RefreshPeer.py
1 #
2 # Thierry Parmentelat - INRIA
3
4
5 import time
6
7 from PLC.Debug import log
8 from PLC.Faults import *
9 from PLC.Method import Method
10 from PLC.Parameter import Parameter, Mixed
11 from PLC.Auth import Auth
12
13 from PLC.Peers import Peer, Peers
14 from PLC.Sites import Site, Sites
15 from PLC.Persons import Person, Persons
16 from PLC.KeyTypes import KeyType, KeyTypes
17 from PLC.Keys import Key, Keys
18 from PLC.BootStates import BootState, BootStates
19 from PLC.Nodes import Node, Nodes
20 from PLC.SliceInstantiations import SliceInstantiations
21 from PLC.Slices import Slice, Slices
22
23 verbose=False
24
25 class RefreshPeer(Method):
26     """
27     Fetches site, node, slice, person and key data from the specified peer
28     and caches it locally; also deletes stale entries.
29     Upon successful completion, returns a dict reporting various timers.
30     Faults otherwise.
31     """
32
33     roles = ['admin']
34
35     accepts = [
36         Auth(),
37         Mixed(Peer.fields['peer_id'],
38               Peer.fields['peername']),
39         ]
40
41     returns = Parameter(int, "1 if successful")
42
43     def call(self, auth, peer_id_or_peername):
44         # Get peer
45         peers = Peers(self.api, [peer_id_or_peername])
46         if not peers:
47             raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
48         peer = peers[0]
49         peer_id = peer['peer_id']
50
51         # Connect to peer API
52         peer.connect()
53
54         timers = {}
55
56         # Get peer data
57         start = time.time()
58         print >>log, 'Issuing GetPeerData'
59         peer_tables = peer.GetPeerData()
60         timers['transport'] = time.time() - start - peer_tables['db_time']
61         timers['peer_db'] = peer_tables['db_time']
62         if verbose:
63             print >>log, 'GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport'])
64
65         def sync(objects, peer_objects, classobj):
66             """
67             Synchronizes two dictionaries of objects. objects should
68             be a dictionary of local objects keyed on their foreign
69             identifiers. peer_objects should be a dictionary of
70             foreign objects keyed on their local (i.e., foreign to us)
71             identifiers. Returns a final dictionary of local objects
72             keyed on their foreign identifiers.
73             """
74
75             if verbose:
76                 print >>log, 'Entering sync on',classobj(self.api).__class__.__name__
77
78             synced = {}
79
80             # Delete stale objects
81             for peer_object_id, object in objects.iteritems():
82                 if peer_object_id not in peer_objects:
83                     object.delete(commit = False)
84                     print >> log, peer['peername'],classobj(self.api).__class__.__name__, object[object.primary_key],"deleted" 
85
86             # Add/update new/existing objects
87             for peer_object_id, peer_object in peer_objects.iteritems():
88                 if peer_object_id in objects:
89                     # Update existing object
90                     object = objects[peer_object_id]
91
92                     # Replace foreign identifier with existing local
93                     # identifier temporarily for the purposes of
94                     # comparison.
95                     peer_object[object.primary_key] = object[object.primary_key]
96
97                     # Must use __eq__() instead of == since
98                     # peer_object may be a raw dict instead of a Peer
99                     # object.
100                     if not object.__eq__(peer_object):
101                         # Only update intrinsic fields
102                         object.update(object.db_fields(peer_object))
103                         sync = True
104                         dbg = "changed"
105                     else:
106                         sync = False
107                         dbg = None
108
109                     # Restore foreign identifier
110                     peer_object[object.primary_key] = peer_object_id
111                 else:
112                     # Add new object
113                     object = classobj(self.api, peer_object)
114                     # Replace foreign identifier with new local identifier
115                     del object[object.primary_key]
116                     sync = True
117                     dbg = "added"
118
119                 if sync:
120                     try:
121                         object.sync(commit = False)
122                     except PLCInvalidArgument, err:
123                         # Skip if validation fails
124                         # XXX Log an event instead of printing to logfile
125                         print >> log, "Warning: Skipping invalid", \
126                               peer['peername'], object.__class__.__name__, \
127                               ":", peer_object, ":", err
128                         continue
129
130                 synced[peer_object_id] = object
131
132                 if dbg:
133                     print >> log, peer['peername'], classobj(self.api).__class__.__name__, object[object.primary_key], dbg
134
135             if verbose:
136                 print >>log, 'Exiting sync on',classobj(self.api).__class__.__name__
137
138             return synced
139
140         #
141         # Synchronize foreign sites
142         #
143
144         start = time.time()
145
146         print >>log, 'Dealing with Sites'
147
148         # Compare only the columns returned by the GetPeerData() call
149         if peer_tables['Sites']:
150             columns = peer_tables['Sites'][0].keys()
151         else:
152             columns = None
153
154         # Keyed on foreign site_id
155         old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
156         sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
157
158         # Synchronize new set (still keyed on foreign site_id)
159         peer_sites = sync(old_peer_sites, sites_at_peer, Site)
160
161         for peer_site_id, site in peer_sites.iteritems():
162             # Bind any newly cached sites to peer
163             if peer_site_id not in old_peer_sites:
164                 peer.add_site(site, peer_site_id, commit = False)
165                 site['peer_id'] = peer_id
166                 site['peer_site_id'] = peer_site_id
167
168         timers['site'] = time.time() - start
169
170         #
171         # XXX Synchronize foreign key types
172         #
173
174         print >>log, 'Dealing with Keys'
175
176         key_types = KeyTypes(self.api).dict()
177
178         #
179         # Synchronize foreign keys
180         #
181
182         start = time.time()
183
184         # Compare only the columns returned by the GetPeerData() call
185         if peer_tables['Keys']:
186             columns = peer_tables['Keys'][0].keys()
187         else:
188             columns = None
189
190         # Keyed on foreign key_id
191         old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
192         keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
193
194         # Fix up key_type references
195         for peer_key_id, key in keys_at_peer.items():
196             if key['key_type'] not in key_types:
197                 # XXX Log an event instead of printing to logfile
198                 print >> log, "Warning: Skipping invalid %s key:" % peer['peername'], \
199                       key, ": invalid key type", key['key_type']
200                 del keys_at_peer[peer_key_id]
201                 continue
202
203         # Synchronize new set (still keyed on foreign key_id)
204         peer_keys = sync(old_peer_keys, keys_at_peer, Key)
205         for peer_key_id, key in peer_keys.iteritems():
206             # Bind any newly cached keys to peer
207             if peer_key_id not in old_peer_keys:
208                 peer.add_key(key, peer_key_id, commit = False)
209                 key['peer_id'] = peer_id
210                 key['peer_key_id'] = peer_key_id
211
212         timers['keys'] = time.time() - start
213
214         #
215         # Synchronize foreign users
216         #
217
218         start = time.time()
219
220         print >>log, 'Dealing with Persons'
221
222         # Compare only the columns returned by the GetPeerData() call
223         if peer_tables['Persons']:
224             columns = peer_tables['Persons'][0].keys()
225         else:
226             columns = None
227
228         # Keyed on foreign person_id
229         old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
230         persons_at_peer = dict([(peer_person['person_id'], peer_person) \
231                                 for peer_person in peer_tables['Persons']])
232
233         # XXX Do we care about membership in foreign site(s)?
234
235         # Synchronize new set (still keyed on foreign person_id)
236         peer_persons = sync(old_peer_persons, persons_at_peer, Person)
237
238         # transcoder : retrieve a local key_id from a peer_key_id
239         key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
240                                   for peer_key_id,key in peer_keys.iteritems()])
241
242         for peer_person_id, person in peer_persons.iteritems():
243             # Bind any newly cached users to peer
244             if peer_person_id not in old_peer_persons:
245                 peer.add_person(person, peer_person_id, commit = False)
246                 person['peer_id'] = peer_id
247                 person['peer_person_id'] = peer_person_id
248                 person['key_ids'] = []
249
250             # User as viewed by peer
251             peer_person = persons_at_peer[peer_person_id]
252             
253             # Foreign keys currently belonging to the user
254             old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
255                                   if key_transcoder[key_id] in peer_keys]
256
257             # Foreign keys that should belong to the user
258             # this is basically peer_person['key_ids'], we just check it makes sense 
259             # (e.g. we might have failed importing it)
260             person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
261
262             # Remove stale keys from user
263             for key_id in (set(old_person_key_ids) - set(person_key_ids)):
264                 person.remove_key(peer_keys[key_id], commit = False)
265                 print >> log, peer['peername'], 'Key', key_id, 'removed from', person['email']
266
267             # Add new keys to user
268             for key_id in (set(person_key_ids) - set(old_person_key_ids)):
269                 person.add_key(peer_keys[key_id], commit = False)
270                 print >> log, peer['peername'], 'Key', key_id, 'added into', person['email']
271
272         timers['persons'] = time.time() - start
273
274         #
275         # XXX Synchronize foreign boot states
276         #
277
278         boot_states = BootStates(self.api).dict()
279
280         #
281         # Synchronize foreign nodes
282         #
283
284         start = time.time()
285
286         print >>log, 'Dealing with Nodes'
287
288         # Compare only the columns returned by the GetPeerData() call
289         if peer_tables['Nodes']:
290             columns = peer_tables['Nodes'][0].keys()
291         else:
292             columns = None
293
294         # Keyed on foreign node_id
295         old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
296         nodes_at_peer = dict([(node['node_id'], node) \
297                               for node in peer_tables['Nodes']])
298
299         # Fix up site_id and boot_states references
300         for peer_node_id, node in nodes_at_peer.items():
301             errors = []
302             if node['site_id'] not in peer_sites:
303                 errors.append("invalid site %d" % node['site_id'])
304             if node['boot_state'] not in boot_states:
305                 errors.append("invalid boot state %s" % node['boot_state'])
306             if errors:
307                 # XXX Log an event instead of printing to logfile
308                 print >> log, "Warning: Skipping invalid %s node:" % peer['peername'], \
309                       node, ":", ", ".join(errors)
310                 del nodes_at_peer[peer_node_id]
311                 continue
312             else:
313                 node['site_id'] = peer_sites[node['site_id']]['site_id']
314
315         # Synchronize new set
316         peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
317
318         for peer_node_id, node in peer_nodes.iteritems():
319             # Bind any newly cached foreign nodes to peer
320             if peer_node_id not in old_peer_nodes:
321                 peer.add_node(node, peer_node_id, commit = False)
322                 node['peer_id'] = peer_id
323                 node['peer_node_id'] = peer_node_id
324
325         timers['nodes'] = time.time() - start
326
327         #
328         # Synchronize local nodes
329         #
330
331         start = time.time()
332
333         # Keyed on local node_id
334         local_nodes = Nodes(self.api).dict()
335
336         for node in peer_tables['PeerNodes']:
337             # Foreign identifier for our node as maintained by peer
338             peer_node_id = node['node_id']
339             # Local identifier for our node as cached by peer
340             node_id = node['peer_node_id']
341             if node_id in local_nodes:
342                 # Still a valid local node, add it to the synchronized
343                 # set of local node objects keyed on foreign node_id.
344                 peer_nodes[peer_node_id] = local_nodes[node_id]
345
346         timers['local_nodes'] = time.time() - start
347
348         #
349         # XXX Synchronize foreign slice instantiation states
350         #
351
352         slice_instantiations = SliceInstantiations(self.api).dict()
353
354         #
355         # Synchronize foreign slices
356         #
357
358         start = time.time()
359
360         print >>log, 'Dealing with Slices'
361
362         # Compare only the columns returned by the GetPeerData() call
363         if peer_tables['Slices']:
364             columns = peer_tables['Slices'][0].keys()
365         else:
366             columns = None
367
368         # Keyed on foreign slice_id
369         old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
370         slices_at_peer = dict([(slice['slice_id'], slice) \
371                                for slice in peer_tables['Slices']])
372
373         # Fix up site_id, instantiation, and creator_person_id references
374         for peer_slice_id, slice in slices_at_peer.items():
375             errors = []
376             if slice['site_id'] not in peer_sites:
377                 errors.append("invalid site %d" % slice['site_id'])
378             if slice['instantiation'] not in slice_instantiations:
379                 errors.append("invalid instantiation %s" % slice['instantiation'])
380             if slice['creator_person_id'] not in peer_persons:
381                 # Just NULL it out
382                 slice['creator_person_id'] = None
383             else:
384                 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
385             if errors:
386                 print >> log, "Warning: Skipping invalid %s slice:" % peer['peername'], \
387                       slice, ":", ", ".join(errors)
388                 del slices_at_peer[peer_slice_id]
389                 continue
390             else:
391                 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
392
393         # Synchronize new set
394         peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
395
396         # transcoder : retrieve a local node_id from a peer_node_id
397         node_transcoder = dict ( [ (node['node_id'],peer_node_id) \
398                                    for peer_node_id,node in peer_nodes.iteritems()])
399         person_transcoder = dict ( [ (person['person_id'],peer_person_id) \
400                                      for peer_person_id,person in peer_persons.iteritems()])
401
402         for peer_slice_id, slice in peer_slices.iteritems():
403             # Bind any newly cached foreign slices to peer
404             if peer_slice_id not in old_peer_slices:
405                 peer.add_slice(slice, peer_slice_id, commit = False)
406                 slice['peer_id'] = peer_id
407                 slice['peer_slice_id'] = peer_slice_id
408                 slice['node_ids'] = []
409                 slice['person_ids'] = []
410
411             # Slice as viewed by peer
412             peer_slice = slices_at_peer[peer_slice_id]
413
414             # Nodes that are currently part of the slice
415             old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
416                                    if node_transcoder[node_id] in peer_nodes]
417
418             # Nodes that should be part of the slice
419             slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
420
421             # Remove stale nodes from slice
422             for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
423                 slice.remove_node(peer_nodes[node_id], commit = False)
424                 print >> log, peer['peername'], 'Node', peer_nodes[node_id]['hostname'], 'removed from', slice['name']
425
426             # Add new nodes to slice
427             for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
428                 slice.add_node(peer_nodes[node_id], commit = False)
429                 print >> log, peer['peername'], 'Node', peer_nodes[node_id]['hostname'], 'added into', slice['name']
430
431             # N.B.: Local nodes that may have been added to the slice
432             # by hand, are removed. In other words, don't do this.
433
434             # Foreign users that are currently part of the slice
435             #old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
436             #                if person_transcoder[person_id] in peer_persons]
437             # An issue occurred with a user who registered on both sites (same email)
438             # So the remote person could not get cached locally
439             # The one-line map/filter style is nicer but ineffective here
440             old_slice_person_ids = []
441             for person_id in slice['person_ids']:
442                 if not person_transcoder.has_key(person_id):
443                     print >> log, 'WARNING : person_id %d in %s not transcodable (1) - skipped'%(person_id,slice['name'])
444                 elif person_transcoder[person_id] not in peer_persons:
445                     print >> log, 'WARNING : person_id %d in %s not transcodable (2) - skipped'%(person_id,slice['name'])
446                 else:
447                     old_slice_person_ids += [person_transcoder[person_id]]
448
449             # Foreign users that should be part of the slice
450             slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
451
452             # Remove stale users from slice
453             for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
454                 slice.remove_person(peer_persons[person_id], commit = False)
455                 print >> log, peer['peername'], 'User', peer_persons[person_id]['email'], 'removed from', slice['name']
456
457             # Add new users to slice
458             for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
459                 slice.add_person(peer_persons[person_id], commit = False)
460                 print >> log, peer['peername'], 'User', peer_persons[person_id]['email'], 'added into', slice['name']
461
462             # N.B.: Local users that may have been added to the slice
463             # by hand, are not touched.
464
465         timers['slices'] = time.time() - start
466
467         # Update peer itself and commit
468         peer.sync(commit = True)
469
470         return timers