adds Id tag
[plcapi.git] / PLC / Methods / RefreshPeer.py
1 #
2 # Thierry Parmentelat - INRIA
3
4 # $Id$
5
6 import time
7
8 from PLC.Debug import log
9 from PLC.Faults import *
10 from PLC.Method import Method
11 from PLC.Parameter import Parameter, Mixed
12 from PLC.Auth import Auth
13
14 from PLC.Peers import Peer, Peers
15 from PLC.Sites import Site, Sites
16 from PLC.Persons import Person, Persons
17 from PLC.KeyTypes import KeyType, KeyTypes
18 from PLC.Keys import Key, Keys
19 from PLC.BootStates import BootState, BootStates
20 from PLC.Nodes import Node, Nodes
21 from PLC.SliceInstantiations import SliceInstantiations
22 from PLC.Slices import Slice, Slices
23
24 verbose=False
25
26 class RefreshPeer(Method):
27     """
28     Fetches site, node, slice, person and key data from the specified peer
29     and caches it locally; also deletes stale entries.
30     Upon successful completion, returns a dict reporting various timers.
31     Faults otherwise.
32     """
33
34     roles = ['admin']
35
36     accepts = [
37         Auth(),
38         Mixed(Peer.fields['peer_id'],
39               Peer.fields['peername']),
40         ]
41
42     returns = Parameter(int, "1 if successful")
43
44     def call(self, auth, peer_id_or_peername):
45         # Get peer
46         peers = Peers(self.api, [peer_id_or_peername])
47         if not peers:
48             raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
49         peer = peers[0]
50         peer_id = peer['peer_id']
51
52         # Connect to peer API
53         peer.connect()
54
55         timers = {}
56
57         # Get peer data
58         start = time.time()
59         print >>log, 'Issuing GetPeerData'
60         peer_tables = peer.GetPeerData()
61         timers['transport'] = time.time() - start - peer_tables['db_time']
62         timers['peer_db'] = peer_tables['db_time']
63         if verbose:
64             print >>log, 'GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport'])
65
66         def sync(objects, peer_objects, classobj):
67             """
68             Synchronizes two dictionaries of objects. objects should
69             be a dictionary of local objects keyed on their foreign
70             identifiers. peer_objects should be a dictionary of
71             foreign objects keyed on their local (i.e., foreign to us)
72             identifiers. Returns a final dictionary of local objects
73             keyed on their foreign identifiers.
74             """
75
76             if verbose:
77                 print >>log, 'Entering sync on',classobj(self.api).__class__.__name__
78
79             synced = {}
80
81             # Delete stale objects
82             for peer_object_id, object in objects.iteritems():
83                 if peer_object_id not in peer_objects:
84                     object.delete(commit = False)
85                     print >> log, peer['peername'],classobj(self.api).__class__.__name__, object[object.primary_key],"deleted" 
86
87             # Add/update new/existing objects
88             for peer_object_id, peer_object in peer_objects.iteritems():
89                 if peer_object_id in objects:
90                     # Update existing object
91                     object = objects[peer_object_id]
92
93                     # Replace foreign identifier with existing local
94                     # identifier temporarily for the purposes of
95                     # comparison.
96                     peer_object[object.primary_key] = object[object.primary_key]
97
98                     # Must use __eq__() instead of == since
99                     # peer_object may be a raw dict instead of a Peer
100                     # object.
101                     if not object.__eq__(peer_object):
102                         # Only update intrinsic fields
103                         object.update(object.db_fields(peer_object))
104                         sync = True
105                         dbg = "changed"
106                     else:
107                         sync = False
108                         dbg = None
109
110                     # Restore foreign identifier
111                     peer_object[object.primary_key] = peer_object_id
112                 else:
113                     # Add new object
114                     object = classobj(self.api, peer_object)
115                     # Replace foreign identifier with new local identifier
116                     del object[object.primary_key]
117                     sync = True
118                     dbg = "added"
119
120                 if sync:
121                     try:
122                         object.sync(commit = False)
123                     except PLCInvalidArgument, err:
124                         # Skip if validation fails
125                         # XXX Log an event instead of printing to logfile
126                         print >> log, "Warning: Skipping invalid", \
127                               peer['peername'], object.__class__.__name__, \
128                               ":", peer_object, ":", err
129                         continue
130
131                 synced[peer_object_id] = object
132
133                 if dbg:
134                     print >> log, peer['peername'], classobj(self.api).__class__.__name__, object[object.primary_key], dbg
135
136             if verbose:
137                 print >>log, 'Exiting sync on',classobj(self.api).__class__.__name__
138
139             return synced
140
141         #
142         # Synchronize foreign sites
143         #
144
145         start = time.time()
146
147         print >>log, 'Dealing with Sites'
148
149         # Compare only the columns returned by the GetPeerData() call
150         if peer_tables['Sites']:
151             columns = peer_tables['Sites'][0].keys()
152         else:
153             columns = None
154
155         # Keyed on foreign site_id
156         old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
157         sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
158
159         # Synchronize new set (still keyed on foreign site_id)
160         peer_sites = sync(old_peer_sites, sites_at_peer, Site)
161
162         for peer_site_id, site in peer_sites.iteritems():
163             # Bind any newly cached sites to peer
164             if peer_site_id not in old_peer_sites:
165                 peer.add_site(site, peer_site_id, commit = False)
166                 site['peer_id'] = peer_id
167                 site['peer_site_id'] = peer_site_id
168
169         timers['site'] = time.time() - start
170
171         #
172         # XXX Synchronize foreign key types
173         #
174
175         print >>log, 'Dealing with Keys'
176
177         key_types = KeyTypes(self.api).dict()
178
179         #
180         # Synchronize foreign keys
181         #
182
183         start = time.time()
184
185         # Compare only the columns returned by the GetPeerData() call
186         if peer_tables['Keys']:
187             columns = peer_tables['Keys'][0].keys()
188         else:
189             columns = None
190
191         # Keyed on foreign key_id
192         old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
193         keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
194
195         # Fix up key_type references
196         for peer_key_id, key in keys_at_peer.items():
197             if key['key_type'] not in key_types:
198                 # XXX Log an event instead of printing to logfile
199                 print >> log, "Warning: Skipping invalid %s key:" % peer['peername'], \
200                       key, ": invalid key type", key['key_type']
201                 del keys_at_peer[peer_key_id]
202                 continue
203
204         # Synchronize new set (still keyed on foreign key_id)
205         peer_keys = sync(old_peer_keys, keys_at_peer, Key)
206         for peer_key_id, key in peer_keys.iteritems():
207             # Bind any newly cached keys to peer
208             if peer_key_id not in old_peer_keys:
209                 peer.add_key(key, peer_key_id, commit = False)
210                 key['peer_id'] = peer_id
211                 key['peer_key_id'] = peer_key_id
212
213         timers['keys'] = time.time() - start
214
215         #
216         # Synchronize foreign users
217         #
218
219         start = time.time()
220
221         print >>log, 'Dealing with Persons'
222
223         # Compare only the columns returned by the GetPeerData() call
224         if peer_tables['Persons']:
225             columns = peer_tables['Persons'][0].keys()
226         else:
227             columns = None
228
229         # Keyed on foreign person_id
230         old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
231         persons_at_peer = dict([(peer_person['person_id'], peer_person) \
232                                 for peer_person in peer_tables['Persons']])
233
234         # XXX Do we care about membership in foreign site(s)?
235
236         # Synchronize new set (still keyed on foreign person_id)
237         peer_persons = sync(old_peer_persons, persons_at_peer, Person)
238
239         # transcoder : retrieve a local key_id from a peer_key_id
240         key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
241                                   for peer_key_id,key in peer_keys.iteritems()])
242
243         for peer_person_id, person in peer_persons.iteritems():
244             # Bind any newly cached users to peer
245             if peer_person_id not in old_peer_persons:
246                 peer.add_person(person, peer_person_id, commit = False)
247                 person['peer_id'] = peer_id
248                 person['peer_person_id'] = peer_person_id
249                 person['key_ids'] = []
250
251             # User as viewed by peer
252             peer_person = persons_at_peer[peer_person_id]
253             
254             # Foreign keys currently belonging to the user
255             old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
256                                   if key_transcoder[key_id] in peer_keys]
257
258             # Foreign keys that should belong to the user
259             # this is basically peer_person['key_ids'], we just check it makes sense 
260             # (e.g. we might have failed importing it)
261             person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
262
263             # Remove stale keys from user
264             for key_id in (set(old_person_key_ids) - set(person_key_ids)):
265                 person.remove_key(peer_keys[key_id], commit = False)
266                 print >> log, peer['peername'], 'Key', key_id, 'removed from', person['email']
267
268             # Add new keys to user
269             for key_id in (set(person_key_ids) - set(old_person_key_ids)):
270                 person.add_key(peer_keys[key_id], commit = False)
271                 print >> log, peer['peername'], 'Key', key_id, 'added into', person['email']
272
273         timers['persons'] = time.time() - start
274
275         #
276         # XXX Synchronize foreign boot states
277         #
278
279         boot_states = BootStates(self.api).dict()
280
281         #
282         # Synchronize foreign nodes
283         #
284
285         start = time.time()
286
287         print >>log, 'Dealing with Nodes'
288
289         # Compare only the columns returned by the GetPeerData() call
290         if peer_tables['Nodes']:
291             columns = peer_tables['Nodes'][0].keys()
292         else:
293             columns = None
294
295         # Keyed on foreign node_id
296         old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
297         nodes_at_peer = dict([(node['node_id'], node) \
298                               for node in peer_tables['Nodes']])
299
300         # Fix up site_id and boot_states references
301         for peer_node_id, node in nodes_at_peer.items():
302             errors = []
303             if node['site_id'] not in peer_sites:
304                 errors.append("invalid site %d" % node['site_id'])
305             if node['boot_state'] not in boot_states:
306                 errors.append("invalid boot state %s" % node['boot_state'])
307             if errors:
308                 # XXX Log an event instead of printing to logfile
309                 print >> log, "Warning: Skipping invalid %s node:" % peer['peername'], \
310                       node, ":", ", ".join(errors)
311                 del nodes_at_peer[peer_node_id]
312                 continue
313             else:
314                 node['site_id'] = peer_sites[node['site_id']]['site_id']
315
316         # Synchronize new set
317         peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
318
319         for peer_node_id, node in peer_nodes.iteritems():
320             # Bind any newly cached foreign nodes to peer
321             if peer_node_id not in old_peer_nodes:
322                 peer.add_node(node, peer_node_id, commit = False)
323                 node['peer_id'] = peer_id
324                 node['peer_node_id'] = peer_node_id
325
326         timers['nodes'] = time.time() - start
327
328         #
329         # Synchronize local nodes
330         #
331
332         start = time.time()
333
334         # Keyed on local node_id
335         local_nodes = Nodes(self.api).dict()
336
337         for node in peer_tables['PeerNodes']:
338             # Foreign identifier for our node as maintained by peer
339             peer_node_id = node['node_id']
340             # Local identifier for our node as cached by peer
341             node_id = node['peer_node_id']
342             if node_id in local_nodes:
343                 # Still a valid local node, add it to the synchronized
344                 # set of local node objects keyed on foreign node_id.
345                 peer_nodes[peer_node_id] = local_nodes[node_id]
346
347         timers['local_nodes'] = time.time() - start
348
349         #
350         # XXX Synchronize foreign slice instantiation states
351         #
352
353         slice_instantiations = SliceInstantiations(self.api).dict()
354
355         #
356         # Synchronize foreign slices
357         #
358
359         start = time.time()
360
361         print >>log, 'Dealing with Slices'
362
363         # Compare only the columns returned by the GetPeerData() call
364         if peer_tables['Slices']:
365             columns = peer_tables['Slices'][0].keys()
366         else:
367             columns = None
368
369         # Keyed on foreign slice_id
370         old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
371         slices_at_peer = dict([(slice['slice_id'], slice) \
372                                for slice in peer_tables['Slices']])
373
374         # Fix up site_id, instantiation, and creator_person_id references
375         for peer_slice_id, slice in slices_at_peer.items():
376             errors = []
377             if slice['site_id'] not in peer_sites:
378                 errors.append("invalid site %d" % slice['site_id'])
379             if slice['instantiation'] not in slice_instantiations:
380                 errors.append("invalid instantiation %s" % slice['instantiation'])
381             if slice['creator_person_id'] not in peer_persons:
382                 # Just NULL it out
383                 slice['creator_person_id'] = None
384             else:
385                 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
386             if errors:
387                 print >> log, "Warning: Skipping invalid %s slice:" % peer['peername'], \
388                       slice, ":", ", ".join(errors)
389                 del slices_at_peer[peer_slice_id]
390                 continue
391             else:
392                 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
393
394         # Synchronize new set
395         peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
396
397         # transcoder : retrieve a local node_id from a peer_node_id
398         node_transcoder = dict ( [ (node['node_id'],peer_node_id) \
399                                    for peer_node_id,node in peer_nodes.iteritems()])
400         person_transcoder = dict ( [ (person['person_id'],peer_person_id) \
401                                      for peer_person_id,person in peer_persons.iteritems()])
402
403         for peer_slice_id, slice in peer_slices.iteritems():
404             # Bind any newly cached foreign slices to peer
405             if peer_slice_id not in old_peer_slices:
406                 peer.add_slice(slice, peer_slice_id, commit = False)
407                 slice['peer_id'] = peer_id
408                 slice['peer_slice_id'] = peer_slice_id
409                 slice['node_ids'] = []
410                 slice['person_ids'] = []
411
412             # Slice as viewed by peer
413             peer_slice = slices_at_peer[peer_slice_id]
414
415             # Nodes that are currently part of the slice
416             old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
417                                    if node_transcoder[node_id] in peer_nodes]
418
419             # Nodes that should be part of the slice
420             slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
421
422             # Remove stale nodes from slice
423             for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
424                 slice.remove_node(peer_nodes[node_id], commit = False)
425                 print >> log, peer['peername'], 'Node', peer_nodes[node_id]['hostname'], 'removed from', slice['name']
426
427             # Add new nodes to slice
428             for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
429                 slice.add_node(peer_nodes[node_id], commit = False)
430                 print >> log, peer['peername'], 'Node', peer_nodes[node_id]['hostname'], 'added into', slice['name']
431
432             # N.B.: Local nodes that may have been added to the slice
433             # by hand, are removed. In other words, don't do this.
434
435             # Foreign users that are currently part of the slice
436             #old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
437             #                if person_transcoder[person_id] in peer_persons]
438             # An issue occurred with a user who registered on both sites (same email)
439             # So the remote person could not get cached locally
440             # The one-line map/filter style is nicer but ineffective here
441             old_slice_person_ids = []
442             for person_id in slice['person_ids']:
443                 if not person_transcoder.has_key(person_id):
444                     print >> log, 'WARNING : person_id %d in %s not transcodable (1) - skipped'%(person_id,slice['name'])
445                 elif person_transcoder[person_id] not in peer_persons:
446                     print >> log, 'WARNING : person_id %d in %s not transcodable (2) - skipped'%(person_id,slice['name'])
447                 else:
448                     old_slice_person_ids += [person_transcoder[person_id]]
449
450             # Foreign users that should be part of the slice
451             slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
452
453             # Remove stale users from slice
454             for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
455                 slice.remove_person(peer_persons[person_id], commit = False)
456                 print >> log, peer['peername'], 'User', peer_persons[person_id]['email'], 'removed from', slice['name']
457
458             # Add new users to slice
459             for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
460                 slice.add_person(peer_persons[person_id], commit = False)
461                 print >> log, peer['peername'], 'User', peer_persons[person_id]['email'], 'added into', slice['name']
462
463             # N.B.: Local users that may have been added to the slice
464             # by hand, are not touched.
465
466         timers['slices'] = time.time() - start
467
468         # Update peer itself and commit
469         peer.sync(commit = True)
470
471         return timers