set svn:keywords property for proper keywords expansion
[plcapi.git] / PLC / Methods / RefreshPeer.py
1 #
2 # Thierry Parmentelat - INRIA
3
4 # $Id$
5
6 import time
7
8 from PLC.Debug import log
9 from PLC.Faults import *
10 from PLC.Method import Method
11 from PLC.Parameter import Parameter, Mixed
12 from PLC.Auth import Auth
13
14 from PLC.Peers import Peer, Peers
15 from PLC.Sites import Site, Sites
16 from PLC.Persons import Person, Persons
17 from PLC.KeyTypes import KeyType, KeyTypes
18 from PLC.Keys import Key, Keys
19 from PLC.BootStates import BootState, BootStates
20 from PLC.Nodes import Node, Nodes
21 from PLC.SliceInstantiations import SliceInstantiations
22 from PLC.Slices import Slice, Slices
23
24 verbose=False
25
26 class RefreshPeer(Method):
27     """
28     Fetches site, node, slice, person and key data from the specified peer
29     and caches it locally; also deletes stale entries.
30     Upon successful completion, returns a dict reporting various timers.
31     Faults otherwise.
32     """
33
34     roles = ['admin']
35
36     accepts = [
37         Auth(),
38         Mixed(Peer.fields['peer_id'],
39               Peer.fields['peername']),
40         ]
41
42     returns = Parameter(int, "1 if successful")
43
44     def call(self, auth, peer_id_or_peername):
45         # Get peer
46         peers = Peers(self.api, [peer_id_or_peername])
47         if not peers:
48             raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
49         peer = peers[0]
50         peer_id = peer['peer_id']
51
52         # Connect to peer API
53         peer.connect()
54
55         timers = {}
56
57         # Get peer data
58         start = time.time()
59         print >>log, 'Issuing GetPeerData'
60         peer_tables = peer.GetPeerData()
61         timers['transport'] = time.time() - start - peer_tables['db_time']
62         timers['peer_db'] = peer_tables['db_time']
63         if verbose:
64             print >>log, 'GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport'])
65
66         def sync(objects, peer_objects, classobj):
67             """
68             Synchronizes two dictionaries of objects. objects should
69             be a dictionary of local objects keyed on their foreign
70             identifiers. peer_objects should be a dictionary of
71             foreign objects keyed on their local (i.e., foreign to us)
72             identifiers. Returns a final dictionary of local objects
73             keyed on their foreign identifiers.
74             """
75
76             if verbose:
77                 print >>log, 'Entering sync on',classobj(self.api).__class__.__name__
78
79             synced = {}
80
81             # Delete stale objects
82             for peer_object_id, object in objects.iteritems():
83                 if peer_object_id not in peer_objects:
84                     object.delete(commit = False)
85                     print >> log, peer['peername'],classobj(self.api).__class__.__name__, object[object.primary_key],"deleted" 
86
87             # Add/update new/existing objects
88             for peer_object_id, peer_object in peer_objects.iteritems():
89                 if peer_object_id in objects:
90                     # Update existing object
91                     object = objects[peer_object_id]
92
93                     # Replace foreign identifier with existing local
94                     # identifier temporarily for the purposes of
95                     # comparison.
96                     peer_object[object.primary_key] = object[object.primary_key]
97
98                     # Must use __eq__() instead of == since
99                     # peer_object may be a raw dict instead of a Peer
100                     # object.
101                     if not object.__eq__(peer_object):
102                         # Only update intrinsic fields
103                         object.update(object.db_fields(peer_object))
104                         sync = True
105                         dbg = "changed"
106                     else:
107                         sync = False
108                         dbg = None
109
110                     # Restore foreign identifier
111                     peer_object[object.primary_key] = peer_object_id
112                 else:
113                     # Add new object
114                     object = classobj(self.api, peer_object)
115                     # Replace foreign identifier with new local identifier
116                     del object[object.primary_key]
117                     sync = True
118                     dbg = "added"
119
120                 if sync:
121                     try:
122                         object.sync(commit = False)
123                     except PLCInvalidArgument, err:
124                         # Skip if validation fails
125                         # XXX Log an event instead of printing to logfile
126                         print >> log, "Warning: Skipping invalid", \
127                               peer['peername'], object.__class__.__name__, \
128                               ":", peer_object, ":", err
129                         continue
130
131                 synced[peer_object_id] = object
132
133                 if dbg:
134                     print >> log, peer['peername'], classobj(self.api).__class__.__name__, object[object.primary_key], dbg
135
136             if verbose:
137                 print >>log, 'Exiting sync on',classobj(self.api).__class__.__name__
138
139             return synced
140
141         #
142         # Synchronize foreign sites
143         #
144
145         start = time.time()
146
147         print >>log, 'Dealing with Sites'
148
149         # Compare only the columns returned by the GetPeerData() call
150         if peer_tables['Sites']:
151             columns = peer_tables['Sites'][0].keys()
152         else:
153             columns = None
154
155         # Keyed on foreign site_id
156         old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
157         sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
158
159         # Synchronize new set (still keyed on foreign site_id)
160         peer_sites = sync(old_peer_sites, sites_at_peer, Site)
161
162         for peer_site_id, site in peer_sites.iteritems():
163             # Bind any newly cached sites to peer
164             if peer_site_id not in old_peer_sites:
165                 peer.add_site(site, peer_site_id, commit = False)
166                 site['peer_id'] = peer_id
167                 site['peer_site_id'] = peer_site_id
168
169         timers['site'] = time.time() - start
170
171         #
172         # XXX Synchronize foreign key types
173         #
174
175         print >>log, 'Dealing with Keys'
176
177         key_types = KeyTypes(self.api).dict()
178
179         #
180         # Synchronize foreign keys
181         #
182
183         start = time.time()
184
185         # Compare only the columns returned by the GetPeerData() call
186         if peer_tables['Keys']:
187             columns = peer_tables['Keys'][0].keys()
188         else:
189             columns = None
190
191         # Keyed on foreign key_id
192         old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
193         keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
194
195         # Fix up key_type references
196         for peer_key_id, key in keys_at_peer.items():
197             if key['key_type'] not in key_types:
198                 # XXX Log an event instead of printing to logfile
199                 print >> log, "Warning: Skipping invalid %s key:" % peer['peername'], \
200                       key, ": invalid key type", key['key_type']
201                 del keys_at_peer[peer_key_id]
202                 continue
203
204         # Synchronize new set (still keyed on foreign key_id)
205         peer_keys = sync(old_peer_keys, keys_at_peer, Key)
206         for peer_key_id, key in peer_keys.iteritems():
207             # Bind any newly cached keys to peer
208             if peer_key_id not in old_peer_keys:
209                 peer.add_key(key, peer_key_id, commit = False)
210                 key['peer_id'] = peer_id
211                 key['peer_key_id'] = peer_key_id
212
213         timers['keys'] = time.time() - start
214
215         #
216         # Synchronize foreign users
217         #
218
219         start = time.time()
220
221         print >>log, 'Dealing with Persons'
222
223         # Compare only the columns returned by the GetPeerData() call
224         if peer_tables['Persons']:
225             columns = peer_tables['Persons'][0].keys()
226         else:
227             columns = None
228
229         # Keyed on foreign person_id
230         old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
231
232         # artificially attach the persons returned by GetPeerData to the new peer 
233         # this is because validate_email needs peer_id to be correct when checking for duplicates 
234         for person in peer_tables['Persons']: 
235             person['peer_id']=peer_id
236         persons_at_peer = dict([(peer_person['person_id'], peer_person) \
237                                 for peer_person in peer_tables['Persons']])
238
239         # XXX Do we care about membership in foreign site(s)?
240
241         # Synchronize new set (still keyed on foreign person_id)
242         peer_persons = sync(old_peer_persons, persons_at_peer, Person)
243
244         # transcoder : retrieve a local key_id from a peer_key_id
245         key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
246                                   for peer_key_id,key in peer_keys.iteritems()])
247
248         for peer_person_id, person in peer_persons.iteritems():
249             # Bind any newly cached users to peer
250             if peer_person_id not in old_peer_persons:
251                 peer.add_person(person, peer_person_id, commit = False)
252                 person['peer_id'] = peer_id
253                 person['peer_person_id'] = peer_person_id
254                 person['key_ids'] = []
255
256             # User as viewed by peer
257             peer_person = persons_at_peer[peer_person_id]
258             
259             # Foreign keys currently belonging to the user
260             old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
261                                   if key_transcoder[key_id] in peer_keys]
262
263             # Foreign keys that should belong to the user
264             # this is basically peer_person['key_ids'], we just check it makes sense 
265             # (e.g. we might have failed importing it)
266             person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
267
268             # Remove stale keys from user
269             for key_id in (set(old_person_key_ids) - set(person_key_ids)):
270                 person.remove_key(peer_keys[key_id], commit = False)
271                 print >> log, peer['peername'], 'Key', key_id, 'removed from', person['email']
272
273             # Add new keys to user
274             for key_id in (set(person_key_ids) - set(old_person_key_ids)):
275                 person.add_key(peer_keys[key_id], commit = False)
276                 print >> log, peer['peername'], 'Key', key_id, 'added into', person['email']
277
278         timers['persons'] = time.time() - start
279
280         #
281         # XXX Synchronize foreign boot states
282         #
283
284         boot_states = BootStates(self.api).dict()
285
286         #
287         # Synchronize foreign nodes
288         #
289
290         start = time.time()
291
292         print >>log, 'Dealing with Nodes'
293
294         # Compare only the columns returned by the GetPeerData() call
295         if peer_tables['Nodes']:
296             columns = peer_tables['Nodes'][0].keys()
297         else:
298             columns = None
299
300         # Keyed on foreign node_id
301         old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
302         nodes_at_peer = dict([(node['node_id'], node) \
303                               for node in peer_tables['Nodes']])
304
305         # Fix up site_id and boot_states references
306         for peer_node_id, node in nodes_at_peer.items():
307             errors = []
308             if node['site_id'] not in peer_sites:
309                 errors.append("invalid site %d" % node['site_id'])
310             if node['boot_state'] not in boot_states:
311                 errors.append("invalid boot state %s" % node['boot_state'])
312             if errors:
313                 # XXX Log an event instead of printing to logfile
314                 print >> log, "Warning: Skipping invalid %s node:" % peer['peername'], \
315                       node, ":", ", ".join(errors)
316                 del nodes_at_peer[peer_node_id]
317                 continue
318             else:
319                 node['site_id'] = peer_sites[node['site_id']]['site_id']
320
321         # Synchronize new set
322         peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
323
324         for peer_node_id, node in peer_nodes.iteritems():
325             # Bind any newly cached foreign nodes to peer
326             if peer_node_id not in old_peer_nodes:
327                 peer.add_node(node, peer_node_id, commit = False)
328                 node['peer_id'] = peer_id
329                 node['peer_node_id'] = peer_node_id
330
331         timers['nodes'] = time.time() - start
332
333         #
334         # Synchronize local nodes
335         #
336
337         start = time.time()
338
339         # Keyed on local node_id
340         local_nodes = Nodes(self.api).dict()
341
342         for node in peer_tables['PeerNodes']:
343             # Foreign identifier for our node as maintained by peer
344             peer_node_id = node['node_id']
345             # Local identifier for our node as cached by peer
346             node_id = node['peer_node_id']
347             if node_id in local_nodes:
348                 # Still a valid local node, add it to the synchronized
349                 # set of local node objects keyed on foreign node_id.
350                 peer_nodes[peer_node_id] = local_nodes[node_id]
351
352         timers['local_nodes'] = time.time() - start
353
354         #
355         # XXX Synchronize foreign slice instantiation states
356         #
357
358         slice_instantiations = SliceInstantiations(self.api).dict()
359
360         #
361         # Synchronize foreign slices
362         #
363
364         start = time.time()
365
366         print >>log, 'Dealing with Slices'
367
368         # Compare only the columns returned by the GetPeerData() call
369         if peer_tables['Slices']:
370             columns = peer_tables['Slices'][0].keys()
371         else:
372             columns = None
373
374         # Keyed on foreign slice_id
375         old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
376         slices_at_peer = dict([(slice['slice_id'], slice) \
377                                for slice in peer_tables['Slices']])
378
379         # Fix up site_id, instantiation, and creator_person_id references
380         for peer_slice_id, slice in slices_at_peer.items():
381             errors = []
382             if slice['site_id'] not in peer_sites:
383                 errors.append("invalid site %d" % slice['site_id'])
384             if slice['instantiation'] not in slice_instantiations:
385                 errors.append("invalid instantiation %s" % slice['instantiation'])
386             if slice['creator_person_id'] not in peer_persons:
387                 # Just NULL it out
388                 slice['creator_person_id'] = None
389             else:
390                 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
391             if errors:
392                 print >> log, "Warning: Skipping invalid %s slice:" % peer['peername'], \
393                       slice, ":", ", ".join(errors)
394                 del slices_at_peer[peer_slice_id]
395                 continue
396             else:
397                 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
398
399         # Synchronize new set
400         peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
401
402         # transcoder : retrieve a local node_id from a peer_node_id
403         node_transcoder = dict ( [ (node['node_id'],peer_node_id) \
404                                    for peer_node_id,node in peer_nodes.iteritems()])
405         person_transcoder = dict ( [ (person['person_id'],peer_person_id) \
406                                      for peer_person_id,person in peer_persons.iteritems()])
407
408         for peer_slice_id, slice in peer_slices.iteritems():
409             # Bind any newly cached foreign slices to peer
410             if peer_slice_id not in old_peer_slices:
411                 peer.add_slice(slice, peer_slice_id, commit = False)
412                 slice['peer_id'] = peer_id
413                 slice['peer_slice_id'] = peer_slice_id
414                 slice['node_ids'] = []
415                 slice['person_ids'] = []
416
417             # Slice as viewed by peer
418             peer_slice = slices_at_peer[peer_slice_id]
419
420             # Nodes that are currently part of the slice
421             old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
422                                    if node_transcoder[node_id] in peer_nodes]
423
424             # Nodes that should be part of the slice
425             slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
426
427             # Remove stale nodes from slice
428             for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
429                 slice.remove_node(peer_nodes[node_id], commit = False)
430                 print >> log, peer['peername'], 'Node', peer_nodes[node_id]['hostname'], 'removed from', slice['name']
431
432             # Add new nodes to slice
433             for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
434                 slice.add_node(peer_nodes[node_id], commit = False)
435                 print >> log, peer['peername'], 'Node', peer_nodes[node_id]['hostname'], 'added into', slice['name']
436
437             # N.B.: Local nodes that may have been added to the slice
438             # by hand, are removed. In other words, don't do this.
439
440             # Foreign users that are currently part of the slice
441             #old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
442             #                if person_transcoder[person_id] in peer_persons]
443             # An issue occurred with a user who registered on both sites (same email)
444             # So the remote person could not get cached locally
445             # The one-line map/filter style is nicer but ineffective here
446             old_slice_person_ids = []
447             for person_id in slice['person_ids']:
448                 if not person_transcoder.has_key(person_id):
449                     print >> log, 'WARNING : person_id %d in %s not transcodable (1) - skipped'%(person_id,slice['name'])
450                 elif person_transcoder[person_id] not in peer_persons:
451                     print >> log, 'WARNING : person_id %d in %s not transcodable (2) - skipped'%(person_id,slice['name'])
452                 else:
453                     old_slice_person_ids += [person_transcoder[person_id]]
454
455             # Foreign users that should be part of the slice
456             slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
457
458             # Remove stale users from slice
459             for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
460                 slice.remove_person(peer_persons[person_id], commit = False)
461                 print >> log, peer['peername'], 'User', peer_persons[person_id]['email'], 'removed from', slice['name']
462
463             # Add new users to slice
464             for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
465                 slice.add_person(peer_persons[person_id], commit = False)
466                 print >> log, peer['peername'], 'User', peer_persons[person_id]['email'], 'added into', slice['name']
467
468             # N.B.: Local users that may have been added to the slice
469             # by hand, are not touched.
470
471         timers['slices'] = time.time() - start
472
473         # Update peer itself and commit
474         peer.sync(commit = True)
475
476         return timers