timestamped logs
[plcapi.git] / PLC / Methods / RefreshPeer.py
1 #
2 # Thierry Parmentelat - INRIA
3
4 # $Id$
5
6 import time
7
8 from PLC.Debug import log
9 from PLC.Faults import *
10 from PLC.Method import Method
11 from PLC.Parameter import Parameter, Mixed
12 from PLC.Auth import Auth
13
14 from PLC.Peers import Peer, Peers
15 from PLC.Sites import Site, Sites
16 from PLC.Persons import Person, Persons
17 from PLC.KeyTypes import KeyType, KeyTypes
18 from PLC.Keys import Key, Keys
19 from PLC.BootStates import BootState, BootStates
20 from PLC.Nodes import Node, Nodes
21 from PLC.SliceInstantiations import SliceInstantiations
22 from PLC.Slices import Slice, Slices
23
24 verbose=False
25
26 def message (to_print=None,verbose_only=False):
27     if verbose_only and not verbose:
28         return
29     print >> log, time.strftime("%m-%d-%H-%M-%S:"),
30     if to_print:
31         print >>log, to_print
32
33 def message_verbose(to_print=None):
34     message(to_print,verbose_only=True)
35
36 class RefreshPeer(Method):
37     """
38     Fetches site, node, slice, person and key data from the specified peer
39     and caches it locally; also deletes stale entries.
40     Upon successful completion, returns a dict reporting various timers.
41     Faults otherwise.
42     """
43
44     roles = ['admin']
45
46     accepts = [
47         Auth(),
48         Mixed(Peer.fields['peer_id'],
49               Peer.fields['peername']),
50         ]
51
52     returns = Parameter(int, "1 if successful")
53
54     def call(self, auth, peer_id_or_peername):
55         # Get peer
56         peers = Peers(self.api, [peer_id_or_peername])
57         if not peers:
58             raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
59         peer = peers[0]
60         peer_id = peer['peer_id']
61
62         # Connect to peer API
63         peer.connect()
64
65         timers = {}
66
67         # Get peer data
68         start = time.time()
69         message('Issuing GetPeerData')
70         peer_tables = peer.GetPeerData()
71         timers['transport'] = time.time() - start - peer_tables['db_time']
72         timers['peer_db'] = peer_tables['db_time']
73         message_verbose('GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport']))
74
75         def sync(objects, peer_objects, classobj):
76             """
77             Synchronizes two dictionaries of objects. objects should
78             be a dictionary of local objects keyed on their foreign
79             identifiers. peer_objects should be a dictionary of
80             foreign objects keyed on their local (i.e., foreign to us)
81             identifiers. Returns a final dictionary of local objects
82             keyed on their foreign identifiers.
83             """
84
85             classname=classobj(self.api).__class__.__name__
86             message_verbose('Entering sync on %s'%classname)
87
88             synced = {}
89
90             # Delete stale objects
91             for peer_object_id, object in objects.iteritems():
92                 if peer_object_id not in peer_objects:
93                     object.delete(commit = False)
94                     message("%s %s %s deleted"%(peer['peername'],classname, object[object.primary_key]))
95
96             total = len(peer_objects)
97             count=1
98             # Add/update new/existing objects
99             for peer_object_id, peer_object in peer_objects.iteritems():
100                 message_verbose ('DBG %s peer_object_id=%d (%d/%d)'%(classname,peer_object_id,count,total))
101                 count += 1
102                 if classname == 'Node':
103                     message_verbose ('DBG>> hostname=%s'%peer_object['hostname'])
104                 elif classname == "Slice":
105                     message_verbose ('DBG>> slicename=%s'%peer_object['name'])
106                 if peer_object_id in objects:
107                     # Update existing object
108                     object = objects[peer_object_id]
109
110                     # Replace foreign identifier with existing local
111                     # identifier temporarily for the purposes of
112                     # comparison.
113                     peer_object[object.primary_key] = object[object.primary_key]
114
115                     # Must use __eq__() instead of == since
116                     # peer_object may be a raw dict instead of a Peer
117                     # object.
118                     if not object.__eq__(peer_object):
119                         # Only update intrinsic fields
120                         object.update(object.db_fields(peer_object))
121                         sync = True
122                         dbg = "changed"
123                     else:
124                         sync = False
125                         dbg = None
126
127                     # Restore foreign identifier
128                     peer_object[object.primary_key] = peer_object_id
129                 else:
130                     # Add new object
131                     object = classobj(self.api, peer_object)
132                     # Replace foreign identifier with new local identifier
133                     del object[object.primary_key]
134                     sync = True
135                     dbg = "added"
136
137                 if sync:
138                     try:
139                         object.sync(commit = False)
140                     except PLCInvalidArgument, err:
141                         # Skip if validation fails
142                         # XXX Log an event instead of printing to logfile
143                         message("Warning: %s Skipping invalid %s %r : %r"%(\
144                                 peer['peername'], classname, peer_object, err))
145                         continue
146
147                 synced[peer_object_id] = object
148
149                 if dbg:
150                     message("%s: %s %d %s"%(peer['peername'], classname, object[object.primary_key], dbg))
151
152             message_verbose("Exiting sync on %s"%classname)
153
154             return synced
155
156         #
157         # Synchronize foreign sites
158         #
159
160         start = time.time()
161
162         message('Dealing with Sites')
163
164         # Compare only the columns returned by the GetPeerData() call
165         if peer_tables['Sites']:
166             columns = peer_tables['Sites'][0].keys()
167         else:
168             columns = None
169
170         # Keyed on foreign site_id
171         old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
172         sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
173
174         # Synchronize new set (still keyed on foreign site_id)
175         peer_sites = sync(old_peer_sites, sites_at_peer, Site)
176
177         for peer_site_id, site in peer_sites.iteritems():
178             # Bind any newly cached sites to peer
179             if peer_site_id not in old_peer_sites:
180                 peer.add_site(site, peer_site_id, commit = False)
181                 site['peer_id'] = peer_id
182                 site['peer_site_id'] = peer_site_id
183
184         timers['site'] = time.time() - start
185
186         #
187         # XXX Synchronize foreign key types
188         #
189
190         message('Dealing with Keys')
191
192         key_types = KeyTypes(self.api).dict()
193
194         #
195         # Synchronize foreign keys
196         #
197
198         start = time.time()
199
200         # Compare only the columns returned by the GetPeerData() call
201         if peer_tables['Keys']:
202             columns = peer_tables['Keys'][0].keys()
203         else:
204             columns = None
205
206         # Keyed on foreign key_id
207         old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
208         keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
209
210         # Fix up key_type references
211         for peer_key_id, key in keys_at_peer.items():
212             if key['key_type'] not in key_types:
213                 # XXX Log an event instead of printing to logfile
214                 message("Warning: Skipping invalid %s key %r" % ( peer['peername'], key))
215                 del keys_at_peer[peer_key_id]
216                 continue
217
218         # Synchronize new set (still keyed on foreign key_id)
219         peer_keys = sync(old_peer_keys, keys_at_peer, Key)
220         for peer_key_id, key in peer_keys.iteritems():
221             # Bind any newly cached keys to peer
222             if peer_key_id not in old_peer_keys:
223                 peer.add_key(key, peer_key_id, commit = False)
224                 key['peer_id'] = peer_id
225                 key['peer_key_id'] = peer_key_id
226
227         timers['keys'] = time.time() - start
228
229         #
230         # Synchronize foreign users
231         #
232
233         start = time.time()
234
235         message('Dealing with Persons')
236
237         # Compare only the columns returned by the GetPeerData() call
238         if peer_tables['Persons']:
239             columns = peer_tables['Persons'][0].keys()
240         else:
241             columns = None
242
243         # Keyed on foreign person_id
244         old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
245
246         # artificially attach the persons returned by GetPeerData to the new peer 
247         # this is because validate_email needs peer_id to be correct when checking for duplicates 
248         for person in peer_tables['Persons']: 
249             person['peer_id']=peer_id
250         persons_at_peer = dict([(peer_person['person_id'], peer_person) \
251                                 for peer_person in peer_tables['Persons']])
252
253         # XXX Do we care about membership in foreign site(s)?
254
255         # Synchronize new set (still keyed on foreign person_id)
256         peer_persons = sync(old_peer_persons, persons_at_peer, Person)
257
258         # transcoder : retrieve a local key_id from a peer_key_id
259         key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
260                                   for peer_key_id,key in peer_keys.iteritems()])
261
262         for peer_person_id, person in peer_persons.iteritems():
263             # Bind any newly cached users to peer
264             if peer_person_id not in old_peer_persons:
265                 peer.add_person(person, peer_person_id, commit = False)
266                 person['peer_id'] = peer_id
267                 person['peer_person_id'] = peer_person_id
268                 person['key_ids'] = []
269
270             # User as viewed by peer
271             peer_person = persons_at_peer[peer_person_id]
272             
273             # Foreign keys currently belonging to the user
274             old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
275                                   if key_transcoder[key_id] in peer_keys]
276
277             # Foreign keys that should belong to the user
278             # this is basically peer_person['key_ids'], we just check it makes sense 
279             # (e.g. we might have failed importing it)
280             person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
281
282             # Remove stale keys from user
283             for key_id in (set(old_person_key_ids) - set(person_key_ids)):
284                 person.remove_key(peer_keys[key_id], commit = False)
285                 message ("%s Key %d removed from person %s"%(peer['peername'], key_id, person['email']))
286
287             # Add new keys to user
288             for key_id in (set(person_key_ids) - set(old_person_key_ids)):
289                 person.add_key(peer_keys[key_id], commit = False)
290                 message ("%s Key %d added into person %s"%(peer['peername'],key_id, person['email']))
291
292         timers['persons'] = time.time() - start
293
294         #
295         # XXX Synchronize foreign boot states
296         #
297
298         boot_states = BootStates(self.api).dict()
299
300         #
301         # Synchronize foreign nodes
302         #
303
304         start = time.time()
305
306         message('Dealing with Nodes (1)')
307
308         # Compare only the columns returned by the GetPeerData() call
309         if peer_tables['Nodes']:
310             columns = peer_tables['Nodes'][0].keys()
311         else:
312             columns = None
313
314         # Keyed on foreign node_id
315         old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
316         nodes_at_peer = dict([(node['node_id'], node) \
317                               for node in peer_tables['Nodes']])
318
319         # Fix up site_id and boot_states references
320         for peer_node_id, node in nodes_at_peer.items():
321             errors = []
322             if node['site_id'] not in peer_sites:
323                 errors.append("invalid site %d" % node['site_id'])
324             if node['boot_state'] not in boot_states:
325                 errors.append("invalid boot state %s" % node['boot_state'])
326             if errors:
327                 # XXX Log an event instead of printing to logfile
328                 message ("Warning: Skipping invalid %s node %r : " % (peer['peername'], node)\
329                              + ", ".join(errors))
330                 del nodes_at_peer[peer_node_id]
331                 continue
332             else:
333                 node['site_id'] = peer_sites[node['site_id']]['site_id']
334
335         # Synchronize new set
336         peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
337
338         for peer_node_id, node in peer_nodes.iteritems():
339             # Bind any newly cached foreign nodes to peer
340             if peer_node_id not in old_peer_nodes:
341                 peer.add_node(node, peer_node_id, commit = False)
342                 node['peer_id'] = peer_id
343                 node['peer_node_id'] = peer_node_id
344
345         timers['nodes'] = time.time() - start
346
347         #
348         # Synchronize local nodes
349         #
350
351         start = time.time()
352         message('Dealing with Nodes (2)')
353
354         # Keyed on local node_id
355         local_nodes = Nodes(self.api).dict()
356
357         for node in peer_tables['PeerNodes']:
358             # Foreign identifier for our node as maintained by peer
359             peer_node_id = node['node_id']
360             # Local identifier for our node as cached by peer
361             node_id = node['peer_node_id']
362             if node_id in local_nodes:
363                 # Still a valid local node, add it to the synchronized
364                 # set of local node objects keyed on foreign node_id.
365                 peer_nodes[peer_node_id] = local_nodes[node_id]
366
367         timers['local_nodes'] = time.time() - start
368
369         #
370         # XXX Synchronize foreign slice instantiation states
371         #
372
373         slice_instantiations = SliceInstantiations(self.api).dict()
374
375         #
376         # Synchronize foreign slices
377         #
378
379         start = time.time()
380
381         message('Dealing with Slices (1)')
382
383         # Compare only the columns returned by the GetPeerData() call
384         if peer_tables['Slices']:
385             columns = peer_tables['Slices'][0].keys()
386         else:
387             columns = None
388
389         # Keyed on foreign slice_id
390         old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
391         slices_at_peer = dict([(slice['slice_id'], slice) \
392                                for slice in peer_tables['Slices']])
393
394         # Fix up site_id, instantiation, and creator_person_id references
395         for peer_slice_id, slice in slices_at_peer.items():
396             errors = []
397             if slice['site_id'] not in peer_sites:
398                 errors.append("invalid site %d" % slice['site_id'])
399             if slice['instantiation'] not in slice_instantiations:
400                 errors.append("invalid instantiation %s" % slice['instantiation'])
401             if slice['creator_person_id'] not in peer_persons:
402                 # Just NULL it out
403                 slice['creator_person_id'] = None
404             else:
405                 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
406             if errors:
407                 message("Warning: Skipping invalid %s slice %r : " % (peer['peername'], slice) \
408                             + ", ".join(errors))
409                 del slices_at_peer[peer_slice_id]
410                 continue
411             else:
412                 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
413
414         # Synchronize new set
415         peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
416
417         message('Dealing with Slices (2)')
418         # transcoder : retrieve a local node_id from a peer_node_id
419         node_transcoder = dict ( [ (node['node_id'],peer_node_id) \
420                                    for peer_node_id,node in peer_nodes.iteritems()])
421         person_transcoder = dict ( [ (person['person_id'],peer_person_id) \
422                                      for peer_person_id,person in peer_persons.iteritems()])
423
424         for peer_slice_id, slice in peer_slices.iteritems():
425             # Bind any newly cached foreign slices to peer
426             if peer_slice_id not in old_peer_slices:
427                 peer.add_slice(slice, peer_slice_id, commit = False)
428                 slice['peer_id'] = peer_id
429                 slice['peer_slice_id'] = peer_slice_id
430                 slice['node_ids'] = []
431                 slice['person_ids'] = []
432
433             # Slice as viewed by peer
434             peer_slice = slices_at_peer[peer_slice_id]
435
436             # Nodes that are currently part of the slice
437             old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
438                                    if node_transcoder[node_id] in peer_nodes]
439
440             # Nodes that should be part of the slice
441             slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
442
443             # Remove stale nodes from slice
444             for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
445                 slice.remove_node(peer_nodes[node_id], commit = False)
446                 message ("%s node %s removed from slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
447
448             # Add new nodes to slice
449             for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
450                 slice.add_node(peer_nodes[node_id], commit = False)
451                 message ("%s node %s added into slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
452
453             # N.B.: Local nodes that may have been added to the slice
454             # by hand, are removed. In other words, don't do this.
455
456             # Foreign users that are currently part of the slice
457             #old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
458             #                if person_transcoder[person_id] in peer_persons]
459             # An issue occurred with a user who registered on both sites (same email)
460             # So the remote person could not get cached locally
461             # The one-line map/filter style is nicer but ineffective here
462             old_slice_person_ids = []
463             for person_id in slice['person_ids']:
464                 if not person_transcoder.has_key(person_id):
465                     message ('WARNING : person_id %d in %s not transcodable (1) - skipped'%(person_id,slice['name']))
466                 elif person_transcoder[person_id] not in peer_persons:
467                     message('WARNING : person_id %d in %s not transcodable (2) - skipped'%(person_id,slice['name']))
468                 else:
469                     old_slice_person_ids += [person_transcoder[person_id]]
470
471             # Foreign users that should be part of the slice
472             slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
473
474             # Remove stale users from slice
475             for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
476                 slice.remove_person(peer_persons[person_id], commit = False)
477                 message ("%s user %s removed from slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
478
479             # Add new users to slice
480             for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
481                 slice.add_person(peer_persons[person_id], commit = False)
482                 message ("%s user %s added into slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
483
484             # N.B.: Local users that may have been added to the slice
485             # by hand, are not touched.
486
487         timers['slices'] = time.time() - start
488
489         # Update peer itself and commit
490         peer.sync(commit = True)
491
492         return timers