safely copies all crt files
[plcapi.git] / PLC / Methods / RefreshPeer.py
1 #
2 # Thierry Parmentelat - INRIA
3
4
5 import time
6
7 from PLC.Debug import log
8 from PLC.Faults import *
9 from PLC.Method import Method
10 from PLC.Parameter import Parameter, Mixed
11 from PLC.Auth import Auth
12
13 from PLC.Peers import Peer, Peers
14 from PLC.Sites import Site, Sites
15 from PLC.Persons import Person, Persons
16 from PLC.KeyTypes import KeyType, KeyTypes
17 from PLC.Keys import Key, Keys
18 from PLC.BootStates import BootState, BootStates
19 from PLC.Nodes import Node, Nodes
20 from PLC.SliceInstantiations import SliceInstantiations
21 from PLC.Slices import Slice, Slices
22
23 class RefreshPeer(Method):
24     """
25     Fetches node and slice data from the specified peer and caches it
26     locally; also deletes stale entries. Returns 1 if successful,
27     faults otherwise.
28     """
29
30     roles = ['admin']
31
32     accepts = [
33         Auth(),
34         Mixed(Peer.fields['peer_id'],
35               Peer.fields['peername']),
36         ]
37
38     returns = Parameter(int, "1 if successful")
39
40     def call(self, auth, peer_id_or_peername):
41
42         start = time.time()
43
44         # Get peer
45         peers = Peers(self.api, [peer_id_or_peername])
46         if not peers:
47             raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
48         peer = peers[0]
49         peer_id = peer['peer_id']
50
51         # Connect to peer API
52         peer.connect()
53
54         timers = {}
55
56         # Get peer data
57         peer_tables = peer.GetPeerData()
58         timers['transport'] = time.time() - start - peer_tables['db_time']
59         timers['peer_db'] = peer_tables['db_time']
60
61         now=time.time()
62         timers['prepare'] = now-start-timers['peer_db']-timers['transport']
63         start=now
64
65         def sync(objects, peer_objects, classobj,debug_dict={}):
66             """
67             Synchronizes two dictionaries of objects. objects should
68             be a dictionary of local objects keyed on their foreign
69             identifiers. peer_objects should be a dictionary of
70             foreign objects keyed on their local (i.e., foreign to us)
71             identifiers. Returns a final dictionary of local objects
72             keyed on their foreign identifiers.
73             """
74
75             for key in ['delete','sync','process','focus','added','deleted','updated','unchanged','synced','screwed']:
76                 debug_dict[key]=0
77
78             synced = {}
79
80             xstart=time.time()
81             # Delete stale objects
82             for peer_object_id, object in objects.iteritems():
83                 if peer_object_id not in peer_objects:
84                     object.delete(commit = False)
85                     print classobj, "object %d deleted" % object[object.primary_key]
86                     debug_dict['deleted'] += 1
87
88             xnow=time.time()
89             debug_dict['delete']=xnow-xstart
90             xstart=xnow
91
92             # Add/update new/existing objects
93             for peer_object_id, peer_object in peer_objects.iteritems():
94
95                 xnow=time.time()
96                 debug_dict['sync'] += (xnow-xstart)
97                 xstart=xnow
98
99                 #if peer_object_id in objects:
100                 if objects.has_key(peer_object_id):
101                     # Update existing object
102                     object = objects[peer_object_id]
103
104                     # Replace foreign identifier with existing local
105                     # identifier temporarily for the purposes of
106                     # comparison.
107                     peer_object[object.primary_key] = object[object.primary_key]
108
109                     # Must use __eq__() instead of == since
110                     # peer_object may be a raw dict instead of a Peer
111                     # object.
112                     if not object.__eq__(peer_object):
113                         # Only update intrinsic fields
114                         object.update(object.db_fields(peer_object))
115                         sync = True
116                         dbg = "changed"
117                         debug_dict['updated'] += 1
118                     else:
119                         sync = False
120                         dbg = None
121                         debug_dict['unchanged'] += 1
122
123                     # Restore foreign identifier
124                     peer_object[object.primary_key] = peer_object_id
125                 else:
126                     # Add new object
127                     object = classobj(self.api, peer_object)
128                     # Replace foreign identifier with new local identifier
129                     del object[object.primary_key]
130                     sync = True
131                     dbg = "added"
132                     debug_dict['added'] += 1
133
134                 xnow=time.time()
135                 debug_dict['process'] += (xnow-xstart)
136                 xstart=xnow
137
138                 if sync:
139                     try:
140                         object.sync(commit = False)
141                         debug_dict['synced'] += 1
142                     except PLCInvalidArgument, err:
143                         # Skip if validation fails
144                         # XXX Log an event instead of printing to logfile
145                         print >> log, "Warning: Skipping invalid", \
146                               peer['peername'], object.__class__.__name__, \
147                               ":", peer_object, ":", err
148                         debug_dict['screwed'] += 1
149                         continue
150
151                 synced[peer_object_id] = object
152
153                 if dbg:
154                     print >> log, peer['peername'], classobj(self.api).__class__.__name__, object[object.primary_key], dbg
155
156             xnow=time.time()
157             debug_dict['sync'] += (xnow-xstart)
158             xstart=xnow
159
160             return synced
161
162         #
163         # Synchronize foreign sites
164         #
165
166         # Compare only the columns returned by the GetPeerData() call
167         if peer_tables['Sites']:
168             columns = peer_tables['Sites'][0].keys()
169         else:
170             columns = None
171
172         # Keyed on foreign site_id
173         old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
174         sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
175
176         # Synchronize new set (still keyed on foreign site_id)
177         peer_sites = sync(old_peer_sites, sites_at_peer, Site)
178
179         for peer_site_id, site in peer_sites.iteritems():
180             # Bind any newly cached sites to peer
181             if peer_site_id not in old_peer_sites:
182                 peer.add_site(site, peer_site_id, commit = False)
183                 site['peer_id'] = peer_id
184                 site['peer_site_id'] = peer_site_id
185
186         now=time.time()
187         timers['site'] = now - start
188         start = now
189
190         #
191         # XXX Synchronize foreign key types
192         #
193
194         key_types = KeyTypes(self.api).dict()
195
196         #
197         # Synchronize foreign keys
198         #
199
200
201         # Compare only the columns returned by the GetPeerData() call
202         if peer_tables['Keys']:
203             columns = peer_tables['Keys'][0].keys()
204         else:
205             columns = None
206
207         # Keyed on foreign key_id
208         old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
209         keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
210
211         # Fix up key_type references
212         for peer_key_id, key in keys_at_peer.items():
213             if key['key_type'] not in key_types:
214                 # XXX Log an event instead of printing to logfile
215                 print >> log, "Warning: Skipping invalid %s key:" % peer['peername'], \
216                       key, ": invalid key type", key['key_type']
217                 del keys_at_peer[peer_key_id]
218                 continue
219
220         # Synchronize new set (still keyed on foreign key_id)
221         peer_keys = sync(old_peer_keys, keys_at_peer, Key)
222         for peer_key_id, key in peer_keys.iteritems():
223             # Bind any newly cached keys to peer
224             if peer_key_id not in old_peer_keys:
225                 peer.add_key(key, peer_key_id, commit = False)
226                 key['peer_id'] = peer_id
227                 key['peer_key_id'] = peer_key_id
228
229         timers['keys'] = time.time() - start
230
231         #
232         # Synchronize foreign users
233         #
234
235         start = time.time()
236         substart = start
237
238         # Compare only the columns returned by the GetPeerData() call
239         if peer_tables['Persons']:
240             columns = peer_tables['Persons'][0].keys()
241         else:
242             columns = None
243
244         # Keyed on foreign person_id
245         old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
246         persons_at_peer = dict([(peer_person['person_id'], peer_person) \
247                                 for peer_person in peer_tables['Persons']])
248
249         now=time.time()
250         timers [ 'persons-1' ] = now - substart
251         substart=now
252
253         # XXX Do we care about membership in foreign site(s)?
254
255         # Synchronize new set (still keyed on foreign person_id)
256         yyy={}
257         peer_persons = sync(old_peer_persons, persons_at_peer, Person,yyy)
258         for key in yyy:
259             timers[ 'persons-'+key ] = yyy[key]
260
261         now=time.time()
262         timers [ 'persons-2' ] = now - substart
263         substart=now
264         subsubstart=now
265         
266         for key in ['persons-31','persons-32','persons-33','persons-34','persons-35','persons-36','person3-added']:
267             timers[key]=0
268
269         # allows to retrieve local_key_id from a peer_key_id, if any
270         peer_key_id_from_local_key_id = dict( \
271             [ (key['key_id'],peer_key_id) for (peer_key_id,key) in peer_keys.items()])
272
273         for peer_person_id, person in peer_persons.iteritems():
274
275             now=time.time()
276             timers [ 'persons-36' ] += (now - subsubstart)
277             subsubstart=now
278
279             # Bind any newly cached users to peer
280             if peer_person_id not in old_peer_persons:
281                 peer.add_person(person, peer_person_id, commit = False)
282                 person['peer_id'] = peer_id
283                 person['peer_person_id'] = peer_person_id
284                 person['key_ids'] = []
285                 timers['person3-added'] += 1
286
287             now=time.time()
288             timers [ 'persons-31' ] += (now - subsubstart)
289             subsubstart=now
290
291             # User as viewed by peer
292             peer_person = persons_at_peer[peer_person_id]
293             
294             # Foreign keys currently belonging to the user
295             old_person_keys = dict(filter(lambda (peer_key_id, key): \
296                                           key['key_id'] in person['key_ids'],
297                                           peer_keys.items()))
298             print 'old_person_keys',old_person_keys.keys()
299
300             old_person_key_ids_set = set(\
301                 [ peer_key_id_from_local_key_id[local_key_id] for local_key_id in person['key_ids']])
302             print 'old_person_keys_set',old_person_key_ids_set
303
304
305             now=time.time()
306             timers [ 'persons-33' ] += (now - subsubstart)
307             subsubstart=now
308
309             # Foreign keys that should belong to the user
310             person_keys = dict(filter(lambda (peer_key_id, key): \
311                                       peer_key_id in peer_person['key_ids'],
312                                       peer_keys.items()))
313             print 'person_keys',person_keys.keys()
314
315             person_keys_new = dict( [ (peer_key_id,peer_keys[peer_key_id]) \
316                                   for peer_key_id in peer_person['key_ids'] ])
317             print 'person_keys_new',person_keys_new.keys()
318
319
320             now=time.time()
321             timers [ 'persons-34' ] += (now - subsubstart)
322             subsubstart=now
323
324             # Remove stale keys from user
325             for peer_key_id in (set(old_person_keys.keys()) - set(person_keys.keys())):
326 #            for peer_key_id in (old_person_key_ids_set - set(person_keys.keys())):
327                 person.remove_key(old_person_keys[peer_key_id], commit = False)
328
329             now=time.time()
330             timers [ 'persons-35' ] += (now - subsubstart)
331             subsubstart=now
332
333             # Add new keys to user
334             for peer_key_id in (set(person_keys.keys()) - set(old_person_keys.keys())):
335 #            for peer_key_id in (set(person_keys.keys()) - old_person_key_ids_set):
336                 person.add_key(person_keys[peer_key_id], commit = False)
337
338             now=time.time()
339             timers [ 'persons-36' ] += (now - subsubstart)
340             subsubstart=now
341
342         now=time.time()
343         timers [ 'persons-3' ] = now - substart
344         substart=now
345
346         timers['persons'] = time.time() - start
347
348         #
349         # XXX Synchronize foreign boot states
350         #
351
352         start = time.time()
353
354         boot_states = BootStates(self.api).dict()
355
356         #
357         # Synchronize foreign nodes
358         #
359
360
361         # Compare only the columns returned by the GetPeerData() call
362         if peer_tables['Nodes']:
363             columns = peer_tables['Nodes'][0].keys()
364         else:
365             columns = None
366
367         # Keyed on foreign node_id
368         old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
369         nodes_at_peer = dict([(node['node_id'], node) \
370                               for node in peer_tables['Nodes']])
371
372         # Fix up site_id and boot_states references
373         for peer_node_id, node in nodes_at_peer.items():
374             errors = []
375             if node['site_id'] not in peer_sites:
376                 errors.append("invalid site %d" % node['site_id'])
377             if node['boot_state'] not in boot_states:
378                 errors.append("invalid boot state %s" % node['boot_state'])
379             if errors:
380                 # XXX Log an event instead of printing to logfile
381                 print >> log, "Warning: Skipping invalid %s node:" % peer['peername'], \
382                       node, ":", ", ".join(errors)
383                 del nodes_at_peer[peer_node_id]
384                 continue
385             else:
386                 node['site_id'] = peer_sites[node['site_id']]['site_id']
387
388         # Synchronize new set
389         peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
390
391         for peer_node_id, node in peer_nodes.iteritems():
392             # Bind any newly cached foreign nodes to peer
393             if peer_node_id not in old_peer_nodes:
394                 peer.add_node(node, peer_node_id, commit = False)
395                 node['peer_id'] = peer_id
396                 node['peer_node_id'] = peer_node_id
397
398         timers['nodes'] = time.time() - start
399
400         #
401         # Synchronize local nodes
402         #
403
404         start = time.time()
405
406         # Keyed on local node_id
407         local_nodes = Nodes(self.api).dict()
408
409         for node in peer_tables['PeerNodes']:
410             # Foreign identifier for our node as maintained by peer
411             peer_node_id = node['node_id']
412             # Local identifier for our node as cached by peer
413             node_id = node['peer_node_id']
414             if node_id in local_nodes:
415                 # Still a valid local node, add it to the synchronized
416                 # set of local node objects keyed on foreign node_id.
417                 peer_nodes[peer_node_id] = local_nodes[node_id]
418
419         timers['local_nodes'] = time.time() - start
420
421         #
422         # XXX Synchronize foreign slice instantiation states
423         #
424
425         start = time.time()
426
427         slice_instantiations = SliceInstantiations(self.api).dict()
428
429         #
430         # Synchronize foreign slices
431         #
432
433         # Compare only the columns returned by the GetPeerData() call
434         if peer_tables['Slices']:
435             columns = peer_tables['Slices'][0].keys()
436         else:
437             columns = None
438
439         # Keyed on foreign slice_id
440         old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
441         slices_at_peer = dict([(slice['slice_id'], slice) \
442                                for slice in peer_tables['Slices']])
443
444         # Fix up site_id, instantiation, and creator_person_id references
445         for peer_slice_id, slice in slices_at_peer.items():
446             errors = []
447             if slice['site_id'] not in peer_sites:
448                 errors.append("invalid site %d" % slice['site_id'])
449             if slice['instantiation'] not in slice_instantiations:
450                 errors.append("invalid instantiation %s" % slice['instantiation'])
451             if slice['creator_person_id'] not in peer_persons:
452                 # Just NULL it out
453                 slice['creator_person_id'] = None
454             else:
455                 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
456             if errors:
457                 print >> log, "Warning: Skipping invalid %s slice:" % peer['peername'], \
458                       slice, ":", ", ".join(errors)
459                 del slices_at_peer[peer_slice_id]
460                 continue
461             else:
462                 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
463
464         # Synchronize new set
465         peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
466
467         for peer_slice_id, slice in peer_slices.iteritems():
468             # Bind any newly cached foreign slices to peer
469             if peer_slice_id not in old_peer_slices:
470                 peer.add_slice(slice, peer_slice_id, commit = False)
471                 slice['peer_id'] = peer_id
472                 slice['peer_slice_id'] = peer_slice_id
473                 slice['node_ids'] = []
474                 slice['person_ids'] = []
475
476             # Slice as viewed by peer
477             peer_slice = slices_at_peer[peer_slice_id]
478
479             # Nodes that are currently part of the slice
480             old_slice_nodes = dict(filter(lambda (peer_node_id, node): \
481                                           node['node_id'] in slice['node_ids'],
482                                           peer_nodes.items()))
483
484             # Nodes that should be part of the slice
485             slice_nodes = dict(filter(lambda (peer_node_id, node): \
486                                       peer_node_id in peer_slice['node_ids'],
487                                       peer_nodes.items()))
488
489             # Remove stale nodes from slice
490             for node_id in (set(old_slice_nodes.keys()) - set(slice_nodes.keys())):
491                 slice.remove_node(old_slice_nodes[node_id], commit = False)
492
493             # Add new nodes to slice
494             for node_id in (set(slice_nodes.keys()) - set(old_slice_nodes.keys())):
495                 slice.add_node(slice_nodes[node_id], commit = False)
496
497             # N.B.: Local nodes that may have been added to the slice
498             # by hand, are removed. In other words, don't do this.
499
500             # Foreign users that are currently part of the slice
501             old_slice_persons = dict(filter(lambda (peer_person_id, person): \
502                                             person['person_id'] in slice['person_ids'],
503                                             peer_persons.items()))
504
505             # Foreign users that should be part of the slice
506             slice_persons = dict(filter(lambda (peer_person_id, person): \
507                                         peer_person_id in peer_slice['person_ids'],
508                                         peer_persons.items()))
509
510             # Remove stale users from slice
511             for peer_person_id in (set(old_slice_persons.keys()) - set(slice_persons.keys())):
512                 slice.remove_person(old_slice_persons[peer_person_id], commit = False)
513
514             # Add new users to slice
515             for peer_person_id in (set(slice_persons.keys()) - set(old_slice_persons.keys())):
516                 slice.add_person(slice_persons[peer_person_id], commit = False)
517
518             # N.B.: Local users that may have been added to the slice
519             # by hand, are not touched.
520
521         timers['slices'] = time.time() - start
522         start=time.time()
523
524         # Update peer itself and commit
525         peer.sync(commit = True)
526
527         timers['sync'] = time.time() - start
528
529         return timers