I forgot the commit this one.
[plcapi.git] / PLC / Methods / RefreshPeer.py
1 #
2 # Thierry Parmentelat - INRIA
3
4 # $Id$
5
6 import os
7 import sys
8 import fcntl
9 import time
10
11 from PLC.Debug import log
12 from PLC.Faults import *
13 from PLC.Method import Method
14 from PLC.Parameter import Parameter, Mixed
15 from PLC.Auth import Auth
16
17 from PLC.Peers import Peer, Peers
18 from PLC.Sites import Site, Sites
19 from PLC.Persons import Person, Persons
20 from PLC.KeyTypes import KeyType, KeyTypes
21 from PLC.Keys import Key, Keys
22 from PLC.BootStates import BootState, BootStates
23 from PLC.Nodes import Node, Nodes
24 from PLC.SliceInstantiations import SliceInstantiations
25 from PLC.Slices import Slice, Slices
26
27 verbose=False
28
29 # initial version was doing only one final commit
30 # * set commit_mode to False to get that behaviour
31 # * set comit_mode to True to get everything synced at once
32 commit_mode=True
33
34 def message (to_print=None,verbose_only=False):
35     if verbose_only and not verbose:
36         return
37     print >> log, time.strftime("%m-%d-%H-%M-%S:"),
38     if to_print:
39         print >>log, to_print
40
41 def message_verbose(to_print=None):
42     message(to_print,verbose_only=True)
43
44
45 class FileLock:
46     """
47     Lock/Unlock file
48     """
49     def __init__(self, file_path, expire = 60 * 60 * 2):
50         self.expire = expire
51         self.fpath = file_path
52         self.fd = None
53
54     def lock(self):
55         if os.path.exists(self.fpath):
56             if (time.time() - os.stat(self.fpath).st_ctime) > self.expire:
57                 try:
58                     os.unlink(self.fpath)
59                 except Exception, e:
60                     message('FileLock.lock(%s) : %s' % (self.fpath, e))
61                     return False
62         try:
63             self.fd = open(self.fpath, 'w')
64             fcntl.flock(self.fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
65         except IOError, e:
66             message('FileLock.lock(%s) : %s' % (self.fpath, e))
67             return False
68         return True
69
70     def unlock(self):
71         try:
72             fcntl.flock(self.fd, fcntl.LOCK_UN | fcntl.LOCK_NB)
73             self.fd.close()
74         except IOError, e:
75             message('FileLock.unlock(%s) : %s' % (self.fpath, e))
76
77
78 class RefreshPeer(Method):
79     """
80     Fetches site, node, slice, person and key data from the specified peer
81     and caches it locally; also deletes stale entries.
82     Upon successful completion, returns a dict reporting various timers.
83     Faults otherwise.
84     """
85
86     roles = ['admin']
87
88     accepts = [
89         Auth(),
90         Mixed(Peer.fields['peer_id'],
91               Peer.fields['peername']),
92         ]
93
94     returns = Parameter(int, "1 if successful")
95
96     def call(self, auth, peer_id_or_peername):
97         ret_val = None
98         peername = Peers(self.api, [peer_id_or_peername], ['peername'])[0]['peername']
99         file_lock = FileLock("/tmp/refresh-peer-%s.lock" % peername)
100         if not file_lock.lock():
101             raise Exception, "Another instance of RefreshPeer is running."
102         try:
103             ret_val = self.real_call(auth, peer_id_or_peername)
104         except Exception, e:
105             file_lock.unlock()
106             raise Exception, e
107         file_lock.unlock()
108         return ret_val
109
110         
111     def real_call(self, auth, peer_id_or_peername):
112         # Get peer
113         peers = Peers(self.api, [peer_id_or_peername])
114         if not peers:
115             raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
116         peer = peers[0]
117         peer_id = peer['peer_id']
118
119         # Connect to peer API
120         peer.connect()
121
122         timers = {}
123
124         # Get peer data
125         start = time.time()
126         message('RefreshPeer starting up (commit_mode=%r)'%commit_mode)
127         message('Issuing GetPeerData')
128         peer_tables = peer.GetPeerData()
129         # for smooth federation with 4.2 - ignore fields that are useless anyway, and rewrite boot_state
130         boot_state_rewrite={'dbg':'safeboot','diag':'safeboot','disable':'disabled',
131                             'inst':'reinstall','rins':'reinstall','new':'reinstall','rcnf':'reinstall'}
132         for node in peer_tables['Nodes']:
133             for key in ['nodenetwork_ids','dummybox_id']:
134                 if key in node:
135                     del node[key]
136             if node['boot_state'] in boot_state_rewrite: node['boot_state']=boot_state_rewrite[node['boot_state']]
137         for slice in peer_tables['Slices']:
138             for key in ['slice_attribute_ids']:
139                 if key in slice:
140                     del slice[key]
141         timers['transport'] = time.time() - start - peer_tables['db_time']
142         timers['peer_db'] = peer_tables['db_time']
143         message_verbose('GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport']))
144
145         def sync(objects, peer_objects, classobj):
146             """
147             Synchronizes two dictionaries of objects. objects should
148             be a dictionary of local objects keyed on their foreign
149             identifiers. peer_objects should be a dictionary of
150             foreign objects keyed on their local (i.e., foreign to us)
151             identifiers. Returns a final dictionary of local objects
152             keyed on their foreign identifiers.
153             """
154
155             classname=classobj(self.api).__class__.__name__
156             message_verbose('Entering sync on %s'%classname)
157
158             synced = {}
159
160             # Delete stale objects
161             for peer_object_id, object in objects.iteritems():
162                 if peer_object_id not in peer_objects:
163                     object.delete(commit = commit_mode)
164                     message("%s %s %s deleted"%(peer['peername'],classname, object[object.primary_key]))
165
166             total = len(peer_objects)
167             count=1
168             # set this to something realistic to trace down a given object(s)
169             trace_type="Node"
170             trace_ids=[]
171             def trace (message):
172                 if classname == trace_type and peer_object_id in trace_ids:
173                     message_verbose('TRACE>>'+message)
174                 
175             # Add/update new/existing objects
176             for peer_object_id, peer_object in peer_objects.iteritems():
177                 message_verbose ('DBG %s peer_object_id=%d (%d/%d)'%(classname,peer_object_id,count,total))
178                 count += 1
179                 if peer_object_id in synced:
180                     message("Warning: %s Skipping already added %s: %r"%(
181                             peer['peername'], classname, peer_object))
182                     continue
183                 if classname == 'Node':
184                     message_verbose ('DBG>> hostname=%s'%peer_object['hostname'])
185                 elif classname == "Slice":
186                     message_verbose ('DBG>> slicename=%s'%peer_object['name'])
187                 if peer_object_id in objects:
188                     # Update existing object
189                     object = objects[peer_object_id]
190
191                     # Replace foreign identifier with existing local
192                     # identifier temporarily for the purposes of
193                     # comparison.
194                     peer_object[object.primary_key] = object[object.primary_key]
195
196                     # Must use __eq__() instead of == since
197                     # peer_object may be a raw dict instead of a Peer
198                     # object.
199                     trace ("in objects : comparing")
200                     if not object.__eq__(peer_object):
201                         # Only update intrinsic fields
202                         trace ("updating")
203                         object.update(object.db_fields(peer_object))
204                         trace ("updated")
205                         sync = True
206                         dbg = "changed"
207                     else:
208                         trace ("intact")
209                         sync = False
210                         dbg = None
211
212                     # Restore foreign identifier
213                     peer_object[object.primary_key] = peer_object_id
214                 else:
215                     trace ("not in objects -- creating")
216                     # Add new object
217                     object = classobj(self.api, peer_object)
218                     trace ("created")
219                     # Replace foreign identifier with new local identifier
220                     del object[object.primary_key]
221                     trace ("forced clean id")
222                     sync = True
223                     dbg = "added"
224
225                 if sync:
226                     message_verbose("DBG>> syncing %s %d - commit_mode=%r"%(classname,peer_object_id,commit_mode))
227                     try:
228                         object.sync(commit = commit_mode)
229                     except PLCInvalidArgument, err:
230                         # Skip if validation fails
231                         # XXX Log an event instead of printing to logfile
232                         message("Warning: %s Skipping invalid %s %r : %r"%(\
233                                 peer['peername'], classname, peer_object, err))
234                         continue
235
236                 synced[peer_object_id] = object
237
238                 if dbg:
239                     message("%s: %s %d %s"%(peer['peername'], classname, object[object.primary_key], dbg))
240
241             message_verbose("Exiting sync on %s"%classname)
242
243             return synced
244
245         #
246         # Synchronize foreign sites
247         #
248
249         start = time.time()
250
251         message('Dealing with Sites')
252
253         # Compare only the columns returned by the GetPeerData() call
254         if peer_tables['Sites']:
255             columns = peer_tables['Sites'][0].keys()
256         else:
257             columns = None
258
259         # Keyed on foreign site_id
260         old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
261         sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
262
263         # Synchronize new set (still keyed on foreign site_id)
264         peer_sites = sync(old_peer_sites, sites_at_peer, Site)
265
266         for peer_site_id, site in peer_sites.iteritems():
267             # Bind any newly cached sites to peer
268             if peer_site_id not in old_peer_sites:
269                 peer.add_site(site, peer_site_id, commit = commit_mode)
270                 site['peer_id'] = peer_id
271                 site['peer_site_id'] = peer_site_id
272
273         timers['site'] = time.time() - start
274
275         #
276         # XXX Synchronize foreign key types
277         #
278
279         message('Dealing with Keys')
280
281         key_types = KeyTypes(self.api).dict()
282
283         #
284         # Synchronize foreign keys
285         #
286
287         start = time.time()
288
289         # Compare only the columns returned by the GetPeerData() call
290         if peer_tables['Keys']:
291             columns = peer_tables['Keys'][0].keys()
292         else:
293             columns = None
294
295         # Keyed on foreign key_id
296         old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
297         keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
298
299         # Fix up key_type references
300         for peer_key_id, key in keys_at_peer.items():
301             if key['key_type'] not in key_types:
302                 # XXX Log an event instead of printing to logfile
303                 message("Warning: Skipping invalid %s key %r" % ( peer['peername'], key))
304                 del keys_at_peer[peer_key_id]
305                 continue
306
307         # Synchronize new set (still keyed on foreign key_id)
308         peer_keys = sync(old_peer_keys, keys_at_peer, Key)
309         for peer_key_id, key in peer_keys.iteritems():
310             # Bind any newly cached keys to peer
311             if peer_key_id not in old_peer_keys:
312                 peer.add_key(key, peer_key_id, commit = commit_mode)
313                 key['peer_id'] = peer_id
314                 key['peer_key_id'] = peer_key_id
315
316         timers['keys'] = time.time() - start
317
318         #
319         # Synchronize foreign users
320         #
321
322         start = time.time()
323
324         message('Dealing with Persons')
325
326         # Compare only the columns returned by the GetPeerData() call
327         if peer_tables['Persons']:
328             columns = peer_tables['Persons'][0].keys()
329         else:
330             columns = None
331
332         # Keyed on foreign person_id
333         old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
334
335         # artificially attach the persons returned by GetPeerData to the new peer 
336         # this is because validate_email needs peer_id to be correct when checking for duplicates 
337         for person in peer_tables['Persons']: 
338             person['peer_id']=peer_id
339         persons_at_peer = dict([(peer_person['person_id'], peer_person) \
340                                 for peer_person in peer_tables['Persons']])
341
342         # XXX Do we care about membership in foreign site(s)?
343
344         # Synchronize new set (still keyed on foreign person_id)
345         peer_persons = sync(old_peer_persons, persons_at_peer, Person)
346
347         # transcoder : retrieve a local key_id from a peer_key_id
348         key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
349                                   for peer_key_id,key in peer_keys.iteritems()])
350
351         for peer_person_id, person in peer_persons.iteritems():
352             # Bind any newly cached users to peer
353             if peer_person_id not in old_peer_persons:
354                 peer.add_person(person, peer_person_id, commit = commit_mode)
355                 person['peer_id'] = peer_id
356                 person['peer_person_id'] = peer_person_id
357                 person['key_ids'] = []
358
359             # User as viewed by peer
360             peer_person = persons_at_peer[peer_person_id]
361             
362             # Foreign keys currently belonging to the user
363             old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
364                                   if key_transcoder[key_id] in peer_keys]
365
366             # Foreign keys that should belong to the user
367             # this is basically peer_person['key_ids'], we just check it makes sense 
368             # (e.g. we might have failed importing it)
369             person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
370
371             # Remove stale keys from user
372             for key_id in (set(old_person_key_ids) - set(person_key_ids)):
373                 person.remove_key(peer_keys[key_id], commit = commit_mode)
374                 message ("%s Key %d removed from person %s"%(peer['peername'], key_id, person['email']))
375
376             # Add new keys to user
377             for key_id in (set(person_key_ids) - set(old_person_key_ids)):
378                 person.add_key(peer_keys[key_id], commit = commit_mode)
379                 message ("%s Key %d added into person %s"%(peer['peername'],key_id, person['email']))
380
381         timers['persons'] = time.time() - start
382
383         #
384         # XXX Synchronize foreign boot states
385         #
386
387         boot_states = BootStates(self.api).dict()
388
389         #
390         # Synchronize foreign nodes
391         #
392
393         start = time.time()
394
395         message('Dealing with Nodes (1)')
396
397         # Compare only the columns returned by the GetPeerData() call
398         if peer_tables['Nodes']:
399             columns = peer_tables['Nodes'][0].keys()
400         else:
401             # smooth federation with a 4.2 peer - ignore these fields that are useless anyway
402             columns = Node.fields
403             if 'interface_ids' in columns: columns.remove('interface_ids')
404             if 'dummybox_id' in columns: columns.remove('dummybox_id')
405
406         # Keyed on foreign node_id
407         old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
408         nodes_at_peer = dict([(node['node_id'], node) \
409                               for node in peer_tables['Nodes']])
410
411         # Fix up site_id and boot_states references
412         for peer_node_id, node in nodes_at_peer.items():
413             errors = []
414             if node['site_id'] not in peer_sites:
415                 errors.append("invalid site %d" % node['site_id'])
416             if node['boot_state'] not in boot_states:
417                 errors.append("invalid boot state %s" % node['boot_state'])
418             if errors:
419                 # XXX Log an event instead of printing to logfile
420                 message ("Warning: Skipping invalid %s node %r : " % (peer['peername'], node)\
421                              + ", ".join(errors))
422                 del nodes_at_peer[peer_node_id]
423                 continue
424             else:
425                 node['site_id'] = peer_sites[node['site_id']]['site_id']
426
427         # Synchronize new set
428         peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
429
430         for peer_node_id, node in peer_nodes.iteritems():
431             # Bind any newly cached foreign nodes to peer
432             if peer_node_id not in old_peer_nodes:
433                 peer.add_node(node, peer_node_id, commit = commit_mode)
434                 node['peer_id'] = peer_id
435                 node['peer_node_id'] = peer_node_id
436
437         timers['nodes'] = time.time() - start
438
439         #
440         # Synchronize local nodes
441         #
442
443         start = time.time()
444         message('Dealing with Nodes (2)')
445
446         # Keyed on local node_id
447         local_nodes = Nodes(self.api).dict()
448
449         for node in peer_tables['PeerNodes']:
450             # Foreign identifier for our node as maintained by peer
451             peer_node_id = node['node_id']
452             # Local identifier for our node as cached by peer
453             node_id = node['peer_node_id']
454             if node_id in local_nodes:
455                 # Still a valid local node, add it to the synchronized
456                 # set of local node objects keyed on foreign node_id.
457                 peer_nodes[peer_node_id] = local_nodes[node_id]
458
459         timers['local_nodes'] = time.time() - start
460
461         #
462         # XXX Synchronize foreign slice instantiation states
463         #
464
465         slice_instantiations = SliceInstantiations(self.api).dict()
466
467         #
468         # Synchronize foreign slices
469         #
470
471         start = time.time()
472
473         message('Dealing with Slices (1)')
474
475         # Compare only the columns returned by the GetPeerData() call
476         if peer_tables['Slices']:
477             columns = peer_tables['Slices'][0].keys()
478         else:
479             columns = None
480
481         # Keyed on foreign slice_id
482         old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
483         slices_at_peer = dict([(slice['slice_id'], slice) \
484                                for slice in peer_tables['Slices']])
485
486         # Fix up site_id, instantiation, and creator_person_id references
487         for peer_slice_id, slice in slices_at_peer.items():
488             errors = []
489             if slice['site_id'] not in peer_sites:
490                 errors.append("invalid site %d" % slice['site_id'])
491             if slice['instantiation'] not in slice_instantiations:
492                 errors.append("invalid instantiation %s" % slice['instantiation'])
493             if slice['creator_person_id'] not in peer_persons:
494                 # Just NULL it out
495                 slice['creator_person_id'] = None
496             else:
497                 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
498             if errors:
499                 message("Warning: Skipping invalid %s slice %r : " % (peer['peername'], slice) \
500                             + ", ".join(errors))
501                 del slices_at_peer[peer_slice_id]
502                 continue
503             else:
504                 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
505
506         # Synchronize new set
507         peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
508
509         message('Dealing with Slices (2)')
510         # transcoder : retrieve a local node_id from a peer_node_id
511         node_transcoder = dict ( [ (node['node_id'],peer_node_id) \
512                                    for peer_node_id,node in peer_nodes.iteritems()])
513         person_transcoder = dict ( [ (person['person_id'],peer_person_id) \
514                                      for peer_person_id,person in peer_persons.iteritems()])
515
516         for peer_slice_id, slice in peer_slices.iteritems():
517             # Bind any newly cached foreign slices to peer
518             if peer_slice_id not in old_peer_slices:
519                 peer.add_slice(slice, peer_slice_id, commit = commit_mode)
520                 slice['peer_id'] = peer_id
521                 slice['peer_slice_id'] = peer_slice_id
522                 slice['node_ids'] = []
523                 slice['person_ids'] = []
524
525             # Slice as viewed by peer
526             peer_slice = slices_at_peer[peer_slice_id]
527
528             # Nodes that are currently part of the slice
529             old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
530                                    if node_id in node_transcoder and node_transcoder[node_id] in peer_nodes]
531
532             # Nodes that should be part of the slice
533             slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
534
535             # Remove stale nodes from slice
536             for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
537                 slice.remove_node(peer_nodes[node_id], commit = commit_mode)
538                 message ("%s node %s removed from slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
539
540             # Add new nodes to slice
541             for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
542                 slice.add_node(peer_nodes[node_id], commit = commit_mode)
543                 message ("%s node %s added into slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
544
545             # N.B.: Local nodes that may have been added to the slice
546             # by hand, are removed. In other words, don't do this.
547
548             # Foreign users that are currently part of the slice
549             #old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
550             #                if person_transcoder[person_id] in peer_persons]
551             # An issue occurred with a user who registered on both sites (same email)
552             # So the remote person could not get cached locally
553             # The one-line map/filter style is nicer but ineffective here
554             old_slice_person_ids = []
555             for person_id in slice['person_ids']:
556                 if not person_transcoder.has_key(person_id):
557                     message ('WARNING : person_id %d in %s not transcodable (1) - skipped'%(person_id,slice['name']))
558                 elif person_transcoder[person_id] not in peer_persons:
559                     message('WARNING : person_id %d in %s not transcodable (2) - skipped'%(person_id,slice['name']))
560                 else:
561                     old_slice_person_ids += [person_transcoder[person_id]]
562
563             # Foreign users that should be part of the slice
564             slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
565
566             # Remove stale users from slice
567             for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
568                 slice.remove_person(peer_persons[person_id], commit = commit_mode)
569                 message ("%s user %s removed from slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
570
571             # Add new users to slice
572             for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
573                 slice.add_person(peer_persons[person_id], commit = commit_mode)
574                 message ("%s user %s added into slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
575
576             # N.B.: Local users that may have been added to the slice
577             # by hand, are not touched.
578
579         timers['slices'] = time.time() - start
580
581         # Update peer itself and commit
582         peer.sync(commit = True)
583         
584         return timers