for robustness
[plcapi.git] / PLC / Methods / RefreshPeer.py
1 #
2 # Thierry Parmentelat - INRIA
3
4 # $Id$
5
6 import time
7
8 from PLC.Debug import log
9 from PLC.Faults import *
10 from PLC.Method import Method
11 from PLC.Parameter import Parameter, Mixed
12 from PLC.Auth import Auth
13
14 from PLC.Peers import Peer, Peers
15 from PLC.Sites import Site, Sites
16 from PLC.Persons import Person, Persons
17 from PLC.KeyTypes import KeyType, KeyTypes
18 from PLC.Keys import Key, Keys
19 from PLC.BootStates import BootState, BootStates
20 from PLC.Nodes import Node, Nodes
21 from PLC.SliceInstantiations import SliceInstantiations
22 from PLC.Slices import Slice, Slices
23
24 verbose=False
25
26 # initial version was doing only one final commit
27 # * set commit_mode to False to get that behaviour
28 # * set comit_mode to True to get everything synced at once
29 commit_mode=True
30
31 def message (to_print=None,verbose_only=False):
32     if verbose_only and not verbose:
33         return
34     print >> log, time.strftime("%m-%d-%H-%M-%S:"),
35     if to_print:
36         print >>log, to_print
37
38 def message_verbose(to_print=None):
39     message(to_print,verbose_only=True)
40
41 class RefreshPeer(Method):
42     """
43     Fetches site, node, slice, person and key data from the specified peer
44     and caches it locally; also deletes stale entries.
45     Upon successful completion, returns a dict reporting various timers.
46     Faults otherwise.
47     """
48
49     roles = ['admin']
50
51     accepts = [
52         Auth(),
53         Mixed(Peer.fields['peer_id'],
54               Peer.fields['peername']),
55         ]
56
57     returns = Parameter(int, "1 if successful")
58
59     def call(self, auth, peer_id_or_peername):
60         # Get peer
61         peers = Peers(self.api, [peer_id_or_peername])
62         if not peers:
63             raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
64         peer = peers[0]
65         peer_id = peer['peer_id']
66
67         # Connect to peer API
68         peer.connect()
69
70         timers = {}
71
72         # Get peer data
73         start = time.time()
74         message('RefreshPeer starting up (commit_mode=%r)'%commit_mode)
75         message('Issuing GetPeerData')
76         peer_tables = peer.GetPeerData()
77         # for smooth federation with 4.2 - ignore fields that are useless anyway, and rewrite boot_state
78         boot_state_rewrite={'dbg':'safeboot','diag':'safeboot','disable':'disabled',
79                             'inst':'reinstall','rins':'reinstall','new':'reinstall','rcnf':'reinstall'}
80         for node in peer_tables['Nodes']:
81             for key in ['nodenetwork_ids','dummybox_id']:
82                 if key in node:
83                     del node[key]
84             if node['boot_state'] in boot_state_rewrite: node['boot_state']=boot_state_rewrite[node['boot_state']]
85         for slice in peer_tables['Slices']:
86             for key in ['slice_attribute_ids']:
87                 if key in slice:
88                     del slice[key]
89         timers['transport'] = time.time() - start - peer_tables['db_time']
90         timers['peer_db'] = peer_tables['db_time']
91         message_verbose('GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport']))
92
93         def sync(objects, peer_objects, classobj):
94             """
95             Synchronizes two dictionaries of objects. objects should
96             be a dictionary of local objects keyed on their foreign
97             identifiers. peer_objects should be a dictionary of
98             foreign objects keyed on their local (i.e., foreign to us)
99             identifiers. Returns a final dictionary of local objects
100             keyed on their foreign identifiers.
101             """
102
103             classname=classobj(self.api).__class__.__name__
104             message_verbose('Entering sync on %s'%classname)
105
106             synced = {}
107
108             # Delete stale objects
109             for peer_object_id, object in objects.iteritems():
110                 if peer_object_id not in peer_objects:
111                     object.delete(commit = commit_mode)
112                     message("%s %s %s deleted"%(peer['peername'],classname, object[object.primary_key]))
113
114             total = len(peer_objects)
115             count=1
116             # set this to something realistic to trace down a given object(s)
117             trace_type="Node"
118             trace_ids=[]
119             def trace (message):
120                 if classname == trace_type and peer_object_id in trace_ids:
121                     message_verbose('TRACE>>'+message)
122                 
123             # Add/update new/existing objects
124             for peer_object_id, peer_object in peer_objects.iteritems():
125                 message_verbose ('DBG %s peer_object_id=%d (%d/%d)'%(classname,peer_object_id,count,total))
126                 count += 1
127                 if classname == 'Node':
128                     message_verbose ('DBG>> hostname=%s'%peer_object['hostname'])
129                 elif classname == "Slice":
130                     message_verbose ('DBG>> slicename=%s'%peer_object['name'])
131                 if peer_object_id in objects:
132                     # Update existing object
133                     object = objects[peer_object_id]
134
135                     # Replace foreign identifier with existing local
136                     # identifier temporarily for the purposes of
137                     # comparison.
138                     peer_object[object.primary_key] = object[object.primary_key]
139
140                     # Must use __eq__() instead of == since
141                     # peer_object may be a raw dict instead of a Peer
142                     # object.
143                     trace ("in objects : comparing")
144                     if not object.__eq__(peer_object):
145                         # Only update intrinsic fields
146                         trace ("updating")
147                         object.update(object.db_fields(peer_object))
148                         trace ("updated")
149                         sync = True
150                         dbg = "changed"
151                     else:
152                         trace ("intact")
153                         sync = False
154                         dbg = None
155
156                     # Restore foreign identifier
157                     peer_object[object.primary_key] = peer_object_id
158                 else:
159                     trace ("not in objects -- creating")
160                     # Add new object
161                     object = classobj(self.api, peer_object)
162                     trace ("created")
163                     # Replace foreign identifier with new local identifier
164                     del object[object.primary_key]
165                     trace ("forced clean id")
166                     sync = True
167                     dbg = "added"
168
169                 if sync:
170                     message_verbose("DBG>> syncing %s %d - commit_mode=%r"%(classname,peer_object_id,commit_mode))
171                     try:
172                         object.sync(commit = commit_mode)
173                     except PLCInvalidArgument, err:
174                         # Skip if validation fails
175                         # XXX Log an event instead of printing to logfile
176                         message("Warning: %s Skipping invalid %s %r : %r"%(\
177                                 peer['peername'], classname, peer_object, err))
178                         continue
179
180                 synced[peer_object_id] = object
181
182                 if dbg:
183                     message("%s: %s %d %s"%(peer['peername'], classname, object[object.primary_key], dbg))
184
185             message_verbose("Exiting sync on %s"%classname)
186
187             return synced
188
189         #
190         # Synchronize foreign sites
191         #
192
193         start = time.time()
194
195         message('Dealing with Sites')
196
197         # Compare only the columns returned by the GetPeerData() call
198         if peer_tables['Sites']:
199             columns = peer_tables['Sites'][0].keys()
200         else:
201             columns = None
202
203         # Keyed on foreign site_id
204         old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
205         sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
206
207         # Synchronize new set (still keyed on foreign site_id)
208         peer_sites = sync(old_peer_sites, sites_at_peer, Site)
209
210         for peer_site_id, site in peer_sites.iteritems():
211             # Bind any newly cached sites to peer
212             if peer_site_id not in old_peer_sites:
213                 peer.add_site(site, peer_site_id, commit = commit_mode)
214                 site['peer_id'] = peer_id
215                 site['peer_site_id'] = peer_site_id
216
217         timers['site'] = time.time() - start
218
219         #
220         # XXX Synchronize foreign key types
221         #
222
223         message('Dealing with Keys')
224
225         key_types = KeyTypes(self.api).dict()
226
227         #
228         # Synchronize foreign keys
229         #
230
231         start = time.time()
232
233         # Compare only the columns returned by the GetPeerData() call
234         if peer_tables['Keys']:
235             columns = peer_tables['Keys'][0].keys()
236         else:
237             columns = None
238
239         # Keyed on foreign key_id
240         old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
241         keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
242
243         # Fix up key_type references
244         for peer_key_id, key in keys_at_peer.items():
245             if key['key_type'] not in key_types:
246                 # XXX Log an event instead of printing to logfile
247                 message("Warning: Skipping invalid %s key %r" % ( peer['peername'], key))
248                 del keys_at_peer[peer_key_id]
249                 continue
250
251         # Synchronize new set (still keyed on foreign key_id)
252         peer_keys = sync(old_peer_keys, keys_at_peer, Key)
253         for peer_key_id, key in peer_keys.iteritems():
254             # Bind any newly cached keys to peer
255             if peer_key_id not in old_peer_keys:
256                 peer.add_key(key, peer_key_id, commit = commit_mode)
257                 key['peer_id'] = peer_id
258                 key['peer_key_id'] = peer_key_id
259
260         timers['keys'] = time.time() - start
261
262         #
263         # Synchronize foreign users
264         #
265
266         start = time.time()
267
268         message('Dealing with Persons')
269
270         # Compare only the columns returned by the GetPeerData() call
271         if peer_tables['Persons']:
272             columns = peer_tables['Persons'][0].keys()
273         else:
274             columns = None
275
276         # Keyed on foreign person_id
277         old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
278
279         # artificially attach the persons returned by GetPeerData to the new peer 
280         # this is because validate_email needs peer_id to be correct when checking for duplicates 
281         for person in peer_tables['Persons']: 
282             person['peer_id']=peer_id
283         persons_at_peer = dict([(peer_person['person_id'], peer_person) \
284                                 for peer_person in peer_tables['Persons']])
285
286         # XXX Do we care about membership in foreign site(s)?
287
288         # Synchronize new set (still keyed on foreign person_id)
289         peer_persons = sync(old_peer_persons, persons_at_peer, Person)
290
291         # transcoder : retrieve a local key_id from a peer_key_id
292         key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
293                                   for peer_key_id,key in peer_keys.iteritems()])
294
295         for peer_person_id, person in peer_persons.iteritems():
296             # Bind any newly cached users to peer
297             if peer_person_id not in old_peer_persons:
298                 peer.add_person(person, peer_person_id, commit = commit_mode)
299                 person['peer_id'] = peer_id
300                 person['peer_person_id'] = peer_person_id
301                 person['key_ids'] = []
302
303             # User as viewed by peer
304             peer_person = persons_at_peer[peer_person_id]
305             
306             # Foreign keys currently belonging to the user
307             old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
308                                   if key_transcoder[key_id] in peer_keys]
309
310             # Foreign keys that should belong to the user
311             # this is basically peer_person['key_ids'], we just check it makes sense 
312             # (e.g. we might have failed importing it)
313             person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
314
315             # Remove stale keys from user
316             for key_id in (set(old_person_key_ids) - set(person_key_ids)):
317                 person.remove_key(peer_keys[key_id], commit = commit_mode)
318                 message ("%s Key %d removed from person %s"%(peer['peername'], key_id, person['email']))
319
320             # Add new keys to user
321             for key_id in (set(person_key_ids) - set(old_person_key_ids)):
322                 person.add_key(peer_keys[key_id], commit = commit_mode)
323                 message ("%s Key %d added into person %s"%(peer['peername'],key_id, person['email']))
324
325         timers['persons'] = time.time() - start
326
327         #
328         # XXX Synchronize foreign boot states
329         #
330
331         boot_states = BootStates(self.api).dict()
332
333         #
334         # Synchronize foreign nodes
335         #
336
337         start = time.time()
338
339         message('Dealing with Nodes (1)')
340
341         # Compare only the columns returned by the GetPeerData() call
342         if peer_tables['Nodes']:
343             columns = peer_tables['Nodes'][0].keys()
344         else:
345             # smooth federation with a 4.2 peer - ignore these fields that are useless anyway
346             columns = Node.fields
347             if 'interface_ids' in columns: columns.remove('interface_ids')
348             if 'dummybox_id' in columns: columns.remove('dummybox_id')
349
350         # Keyed on foreign node_id
351         old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
352         nodes_at_peer = dict([(node['node_id'], node) \
353                               for node in peer_tables['Nodes']])
354
355         # Fix up site_id and boot_states references
356         for peer_node_id, node in nodes_at_peer.items():
357             errors = []
358             if node['site_id'] not in peer_sites:
359                 errors.append("invalid site %d" % node['site_id'])
360             if node['boot_state'] not in boot_states:
361                 errors.append("invalid boot state %s" % node['boot_state'])
362             if errors:
363                 # XXX Log an event instead of printing to logfile
364                 message ("Warning: Skipping invalid %s node %r : " % (peer['peername'], node)\
365                              + ", ".join(errors))
366                 del nodes_at_peer[peer_node_id]
367                 continue
368             else:
369                 node['site_id'] = peer_sites[node['site_id']]['site_id']
370
371         # Synchronize new set
372         peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
373
374         for peer_node_id, node in peer_nodes.iteritems():
375             # Bind any newly cached foreign nodes to peer
376             if peer_node_id not in old_peer_nodes:
377                 peer.add_node(node, peer_node_id, commit = commit_mode)
378                 node['peer_id'] = peer_id
379                 node['peer_node_id'] = peer_node_id
380
381         timers['nodes'] = time.time() - start
382
383         #
384         # Synchronize local nodes
385         #
386
387         start = time.time()
388         message('Dealing with Nodes (2)')
389
390         # Keyed on local node_id
391         local_nodes = Nodes(self.api).dict()
392
393         for node in peer_tables['PeerNodes']:
394             # Foreign identifier for our node as maintained by peer
395             peer_node_id = node['node_id']
396             # Local identifier for our node as cached by peer
397             node_id = node['peer_node_id']
398             if node_id in local_nodes:
399                 # Still a valid local node, add it to the synchronized
400                 # set of local node objects keyed on foreign node_id.
401                 peer_nodes[peer_node_id] = local_nodes[node_id]
402
403         timers['local_nodes'] = time.time() - start
404
405         #
406         # XXX Synchronize foreign slice instantiation states
407         #
408
409         slice_instantiations = SliceInstantiations(self.api).dict()
410
411         #
412         # Synchronize foreign slices
413         #
414
415         start = time.time()
416
417         message('Dealing with Slices (1)')
418
419         # Compare only the columns returned by the GetPeerData() call
420         if peer_tables['Slices']:
421             columns = peer_tables['Slices'][0].keys()
422         else:
423             columns = None
424
425         # Keyed on foreign slice_id
426         old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
427         slices_at_peer = dict([(slice['slice_id'], slice) \
428                                for slice in peer_tables['Slices']])
429
430         # Fix up site_id, instantiation, and creator_person_id references
431         for peer_slice_id, slice in slices_at_peer.items():
432             errors = []
433             if slice['site_id'] not in peer_sites:
434                 errors.append("invalid site %d" % slice['site_id'])
435             if slice['instantiation'] not in slice_instantiations:
436                 errors.append("invalid instantiation %s" % slice['instantiation'])
437             if slice['creator_person_id'] not in peer_persons:
438                 # Just NULL it out
439                 slice['creator_person_id'] = None
440             else:
441                 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
442             if errors:
443                 message("Warning: Skipping invalid %s slice %r : " % (peer['peername'], slice) \
444                             + ", ".join(errors))
445                 del slices_at_peer[peer_slice_id]
446                 continue
447             else:
448                 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
449
450         # Synchronize new set
451         peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
452
453         message('Dealing with Slices (2)')
454         # transcoder : retrieve a local node_id from a peer_node_id
455         node_transcoder = dict ( [ (node['node_id'],peer_node_id) \
456                                    for peer_node_id,node in peer_nodes.iteritems()])
457         person_transcoder = dict ( [ (person['person_id'],peer_person_id) \
458                                      for peer_person_id,person in peer_persons.iteritems()])
459
460         for peer_slice_id, slice in peer_slices.iteritems():
461             # Bind any newly cached foreign slices to peer
462             if peer_slice_id not in old_peer_slices:
463                 peer.add_slice(slice, peer_slice_id, commit = commit_mode)
464                 slice['peer_id'] = peer_id
465                 slice['peer_slice_id'] = peer_slice_id
466                 slice['node_ids'] = []
467                 slice['person_ids'] = []
468
469             # Slice as viewed by peer
470             peer_slice = slices_at_peer[peer_slice_id]
471
472             # Nodes that are currently part of the slice
473             old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
474                                    if node_id in node_transcoder and node_transcoder[node_id] in peer_nodes]
475
476             # Nodes that should be part of the slice
477             slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
478
479             # Remove stale nodes from slice
480             for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
481                 slice.remove_node(peer_nodes[node_id], commit = commit_mode)
482                 message ("%s node %s removed from slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
483
484             # Add new nodes to slice
485             for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
486                 slice.add_node(peer_nodes[node_id], commit = commit_mode)
487                 message ("%s node %s added into slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
488
489             # N.B.: Local nodes that may have been added to the slice
490             # by hand, are removed. In other words, don't do this.
491
492             # Foreign users that are currently part of the slice
493             #old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
494             #                if person_transcoder[person_id] in peer_persons]
495             # An issue occurred with a user who registered on both sites (same email)
496             # So the remote person could not get cached locally
497             # The one-line map/filter style is nicer but ineffective here
498             old_slice_person_ids = []
499             for person_id in slice['person_ids']:
500                 if not person_transcoder.has_key(person_id):
501                     message ('WARNING : person_id %d in %s not transcodable (1) - skipped'%(person_id,slice['name']))
502                 elif person_transcoder[person_id] not in peer_persons:
503                     message('WARNING : person_id %d in %s not transcodable (2) - skipped'%(person_id,slice['name']))
504                 else:
505                     old_slice_person_ids += [person_transcoder[person_id]]
506
507             # Foreign users that should be part of the slice
508             slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
509
510             # Remove stale users from slice
511             for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
512                 slice.remove_person(peer_persons[person_id], commit = commit_mode)
513                 message ("%s user %s removed from slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
514
515             # Add new users to slice
516             for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
517                 slice.add_person(peer_persons[person_id], commit = commit_mode)
518                 message ("%s user %s added into slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
519
520             # N.B.: Local users that may have been added to the slice
521             # by hand, are not touched.
522
523         timers['slices'] = time.time() - start
524
525         # Update peer itself and commit
526         peer.sync(commit = True)
527
528         return timers