compatibility mode, support for 2 ends running different api releases
[plcapi.git] / PLC / Methods / RefreshPeer.py
1 #
2 # Thierry Parmentelat - INRIA
3 #
4 import os
5 import sys
6 import fcntl
7 import time
8
9 from PLC.Debug import log
10 from PLC.Faults import *
11 from PLC.Method import Method
12 from PLC.Parameter import Parameter, Mixed
13 from PLC.Auth import Auth
14
15 from PLC.Peers import Peer, Peers
16 from PLC.Sites import Site, Sites
17 from PLC.Persons import Person, Persons
18 from PLC.KeyTypes import KeyType, KeyTypes
19 from PLC.Keys import Key, Keys
20 from PLC.BootStates import BootState, BootStates
21 from PLC.Nodes import Node, Nodes
22 from PLC.SliceInstantiations import SliceInstantiations
23 from PLC.Slices import Slice, Slices
24
25 #################### settings
26 # initial version was doing only one final commit
27 # * set commit_mode to False to get that behaviour
28 # * set comit_mode to True to get everything synced at once
29 # the issue with the 'one-commit-at-the-end' approach is 
30 # that the db gets basically totally locked during too long
31 # causing various issues/crashes in the rest of the system
32 commit_mode=True
33
34 # turn this to False only if both ends have the same db schema
35 # compatibility mode is a bit slower but probably safer on the long run
36 compatibility=True
37
38 # for verbose output
39 verbose=False
40 # for debugging specific entries - display detailed info on selected objs 
41 focus_type=None # set to e.g. 'Person'
42 focus_ids=[]    # set to a list of ids (e.g. person_ids) - remote or local ids should work
43 #verbose=True
44 #focus_type='Person'
45 #focus_ids=[29103,239578,28825]
46
47 #################### helpers
48 def message (to_print=None,verbose_only=False):
49     if verbose_only and not verbose:
50         return
51     print >> log, time.strftime("%m-%d-%H-%M-%S:"),
52     if to_print:
53         print >>log, to_print
54
55 def message_verbose(to_print=None, header='VERBOSE'):
56     message("%s> %r"%(header,to_print),verbose_only=True)
57
58
59 #################### to avoid several instances running at the same time
60 class FileLock:
61     """
62     Lock/Unlock file
63     """
64     def __init__(self, file_path, expire = 60 * 60 * 2):
65         self.expire = expire
66         self.fpath = file_path
67         self.fd = None
68
69     def lock(self):
70         if os.path.exists(self.fpath):
71             if (time.time() - os.stat(self.fpath).st_ctime) > self.expire:
72                 try:
73                     os.unlink(self.fpath)
74                 except Exception, e:
75                     message('FileLock.lock(%s) : %s' % (self.fpath, e))
76                     return False
77         try:
78             self.fd = open(self.fpath, 'w')
79             fcntl.flock(self.fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
80         except IOError, e:
81             message('FileLock.lock(%s) : %s' % (self.fpath, e))
82             return False
83         return True
84
85     def unlock(self):
86         try:
87             fcntl.flock(self.fd, fcntl.LOCK_UN | fcntl.LOCK_NB)
88             self.fd.close()
89         except IOError, e:
90             message('FileLock.unlock(%s) : %s' % (self.fpath, e))
91
92
93 class RefreshPeer(Method):
94     """
95     Fetches site, node, slice, person and key data from the specified peer
96     and caches it locally; also deletes stale entries.
97     Upon successful completion, returns a dict reporting various timers.
98     Faults otherwise.
99     """
100
101     roles = ['admin']
102
103     accepts = [
104         Auth(),
105         Mixed(Peer.fields['peer_id'],
106               Peer.fields['peername']),
107         ]
108
109     returns = Parameter(int, "1 if successful")
110
111     def call(self, auth, peer_id_or_peername):
112         ret_val = None
113         peername = Peers(self.api, [peer_id_or_peername], ['peername'])[0]['peername']
114         file_lock = FileLock("/tmp/refresh-peer-%s.lock" % peername)
115         if not file_lock.lock():
116             raise Exception, "Another instance of RefreshPeer is running."
117         try:
118             ret_val = self.real_call(auth, peer_id_or_peername)
119         except Exception, e:
120             file_lock.unlock()
121             message("RefreshPeer caught exception - BEG")
122             import traceback
123             traceback.print_exc()
124             message("RefreshPeer caught exception - END")
125             raise Exception, e
126         file_lock.unlock()
127         return ret_val
128
129
130     def real_call(self, auth, peer_id_or_peername):
131         # Get peer
132         peers = Peers(self.api, [peer_id_or_peername])
133         if not peers:
134             raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
135         peer = peers[0]
136         peer_id = peer['peer_id']
137
138         # Connect to peer API
139         peer.connect()
140
141         timers = {}
142
143         # Get peer data
144         start = time.time()
145         message('RefreshPeer starting up (commit_mode=%r)'%commit_mode)
146         message('Issuing GetPeerData')
147         peer_tables = peer.GetPeerData()
148         # for smooth federation with 4.2 - ignore fields that are useless anyway, and rewrite boot_state
149         boot_state_rewrite={'dbg':'safeboot','diag':'safeboot','disable':'disabled',
150                             'inst':'reinstall','rins':'reinstall','new':'reinstall','rcnf':'reinstall'}
151         for node in peer_tables['Nodes']:
152             for key in ['nodenetwork_ids','dummybox_id']:
153                 if key in node:
154                     del node[key]
155             if node['boot_state'] in boot_state_rewrite: node['boot_state']=boot_state_rewrite[node['boot_state']]
156         for slice in peer_tables['Slices']:
157             for key in ['slice_attribute_ids']:
158                 if key in slice:
159                     del slice[key]
160         timers['transport'] = time.time() - start - peer_tables['db_time']
161         timers['peer_db'] = peer_tables['db_time']
162         message_verbose('GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport']))
163
164         def sync(objects, peer_objects, classobj, columns):
165             """
166             Synchronizes two dictionaries of objects. objects should
167             be a dictionary of local objects keyed on their foreign
168             identifiers. peer_objects should be a dictionary of
169             foreign objects keyed on their local (i.e., foreign to us)
170             identifiers. Returns a final dictionary of local objects
171             keyed on their foreign identifiers.
172             """
173
174             classname=classobj(self.api).__class__.__name__
175             primary_key=getattr(classobj,'primary_key')
176             # display all peer objects of these types while looping
177             secondary_keys={'Node':'hostname','Slice':'name','Person':'email','Site':'login_base'}
178             secondary_key=None
179             if classname in secondary_keys: secondary_key=secondary_keys[classname]
180
181             message_verbose('Entering sync on %s (%s)'%(classname,primary_key))
182
183             synced = {}
184
185             # Delete stale objects
186             for peer_object_id, object in objects.iteritems():
187                 if peer_object_id not in peer_objects:
188                     object.delete(commit = commit_mode)
189                     message("%s %s %s deleted"%(peer['peername'],classname, object[primary_key]))
190
191             total = len(peer_objects)
192             count=1
193
194             # peer_object_id, peer_object and object are dynamically bound in the loop below...
195             # (local) object might be None if creating a new one
196             def message_focus (message):
197                 if classname != focus_type: return
198                 if peer_object_id in focus_ids or \
199                         (object and primary_key in object and object[primary_key] in focus_ids):
200                     # always show remote
201                     message_verbose("peer_obj : %d [[%r]]"%(peer_object_id,peer_object),
202                                     header='FOCUS '+message)
203                     # show local object if a match was found
204                     if object: message_verbose("local_obj : <<%r>>"%(object),
205                                                header='FOCUS '+message);
206
207             # the function to compare a local object with its cadidate peer obj
208             # xxx probably faster when compatibility is False...
209             def equal_fields (object, peer_object, columns):
210                 # fast version: must use __eq__() instead of == since
211                 # peer_object may be a raw dict instead of a Peer object.
212                 if not compatibility: return object.__eq__(peer_object)
213                 else:
214                     for column in columns: 
215                         if object[column] != peer_object[column]: return False
216                     return True
217
218             # Add/update new/existing objects
219             for peer_object_id, peer_object in peer_objects.iteritems():
220                 peer_object_name=""
221                 if secondary_key: peer_object_name="(%s)"%peer_object[secondary_key]
222                 message_verbose ('%s peer_object_id=%d %s (%d/%d)'%(classname,peer_object_id,peer_object_name,count,total))
223                 count += 1
224                 if peer_object_id in synced:
225                     message("Warning: %s Skipping already added %s: %r"%(
226                             peer['peername'], classname, peer_object))
227                     continue
228
229                 if peer_object_id in objects:
230                     # Update existing object
231                     object = objects[peer_object_id]
232
233                     # Replace foreign identifier with existing local
234                     # identifier temporarily for the purposes of
235                     # comparison.
236                     peer_object[primary_key] = object[primary_key]
237
238                     if equal_fields(object,peer_object, columns):
239                         # Only update intrinsic fields
240                         object.update(object.db_fields(peer_object))
241                         message_focus ("DIFFERENCES : updated / syncing")
242                         sync = True
243                         action = "changed"
244                     else:
245                         message_focus ("UNCHANGED - left intact / not syncing")
246                         sync = False
247                         action = None
248
249                     # Restore foreign identifier
250                     peer_object[primary_key] = peer_object_id
251                 else:
252                     object=None
253                     # Add new object
254                     object = classobj(self.api, peer_object)
255                     # Replace foreign identifier with new local identifier
256                     del object[primary_key]
257                     message_focus ("NEW -- created with clean id - syncing")
258                     sync = True
259                     action = "added"
260
261                 if sync:
262                     message_verbose("syncing %s %d - commit_mode=%r"%(classname,peer_object_id,commit_mode))
263                     try:
264                         object.sync(commit = commit_mode)
265                     except PLCInvalidArgument, err:
266                         # Skip if validation fails
267                         # XXX Log an event instead of printing to logfile
268                         message("Warning: %s Skipping invalid %s %r : %r"%(\
269                                 peer['peername'], classname, peer_object, err))
270                         continue
271
272                 synced[peer_object_id] = object
273
274                 if action:
275                     message("%s: %s %d %s %s"%(peer['peername'], classname, object[primary_key], peer_object_name, action))
276
277             message_verbose("Exiting sync on %s"%classname)
278
279             return synced
280
281         ### over time, we've had issues with a given column being
282         ### added on one side and not on the other
283         ### this helper function computes the intersection of two list of fields/columns
284         def intersect (l1,l2): 
285             if compatibility: return list (set(l1).intersection(set(l2))) 
286             else: return l1
287
288         #
289         # Synchronize foreign sites
290         #
291
292         start = time.time()
293
294         message('Dealing with Sites')
295
296         # Compare only the columns returned by the GetPeerData() call
297         if peer_tables['Sites']:
298             columns = peer_tables['Sites'][0].keys()
299             columns = intersect (columns, Site.fields)
300         else:
301             columns = None
302
303         # Keyed on foreign site_id
304         old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
305         sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
306
307         # Synchronize new set (still keyed on foreign site_id)
308         peer_sites = sync(old_peer_sites, sites_at_peer, Site, columns)
309
310         for peer_site_id, site in peer_sites.iteritems():
311             # Bind any newly cached sites to peer
312             if peer_site_id not in old_peer_sites:
313                 peer.add_site(site, peer_site_id, commit = commit_mode)
314                 site['peer_id'] = peer_id
315                 site['peer_site_id'] = peer_site_id
316
317         timers['site'] = time.time() - start
318
319         #
320         # XXX Synchronize foreign key types
321         #
322
323         message('Dealing with Keys')
324
325         key_types = KeyTypes(self.api).dict()
326
327         #
328         # Synchronize foreign keys
329         #
330
331         start = time.time()
332
333         # Compare only the columns returned by the GetPeerData() call
334         if peer_tables['Keys']:
335             columns = peer_tables['Keys'][0].keys()
336             columns = intersect (columns, Key.fields)
337         else:
338             columns = None
339
340         # Keyed on foreign key_id
341         old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
342         keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
343
344         # Fix up key_type references
345         for peer_key_id, key in keys_at_peer.items():
346             if key['key_type'] not in key_types:
347                 # XXX Log an event instead of printing to logfile
348                 message("Warning: Skipping invalid %s key %r" % ( peer['peername'], key))
349                 del keys_at_peer[peer_key_id]
350                 continue
351
352         # Synchronize new set (still keyed on foreign key_id)
353         peer_keys = sync(old_peer_keys, keys_at_peer, Key, columns)
354         for peer_key_id, key in peer_keys.iteritems():
355             # Bind any newly cached keys to peer
356             if peer_key_id not in old_peer_keys:
357                 peer.add_key(key, peer_key_id, commit = commit_mode)
358                 key['peer_id'] = peer_id
359                 key['peer_key_id'] = peer_key_id
360
361         timers['keys'] = time.time() - start
362
363         #
364         # Synchronize foreign users
365         #
366
367         start = time.time()
368
369         message('Dealing with Persons')
370
371         # Compare only the columns returned by the GetPeerData() call
372         if peer_tables['Persons']:
373             columns = peer_tables['Persons'][0].keys()
374             columns = intersect (columns, Person.fields)
375         else:
376             columns = None
377
378         # Keyed on foreign person_id
379         old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
380
381         # artificially attach the persons returned by GetPeerData to the new peer
382         # this is because validate_email needs peer_id to be correct when checking for duplicates
383         for person in peer_tables['Persons']:
384             person['peer_id']=peer_id
385         persons_at_peer = dict([(peer_person['person_id'], peer_person) \
386                                 for peer_person in peer_tables['Persons']])
387
388         # XXX Do we care about membership in foreign site(s)?
389
390         # Synchronize new set (still keyed on foreign person_id)
391         peer_persons = sync(old_peer_persons, persons_at_peer, Person, columns)
392
393         # transcoder : retrieve a local key_id from a peer_key_id
394         key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
395                                   for peer_key_id,key in peer_keys.iteritems()])
396
397         for peer_person_id, person in peer_persons.iteritems():
398             # Bind any newly cached users to peer
399             if peer_person_id not in old_peer_persons:
400                 peer.add_person(person, peer_person_id, commit = commit_mode)
401                 person['peer_id'] = peer_id
402                 person['peer_person_id'] = peer_person_id
403                 person['key_ids'] = []
404
405             # User as viewed by peer
406             peer_person = persons_at_peer[peer_person_id]
407
408             # Foreign keys currently belonging to the user
409             old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
410                                   if key_transcoder[key_id] in peer_keys]
411
412             # Foreign keys that should belong to the user
413             # this is basically peer_person['key_ids'], we just check it makes sense
414             # (e.g. we might have failed importing it)
415             person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
416
417             # Remove stale keys from user
418             for key_id in (set(old_person_key_ids) - set(person_key_ids)):
419                 person.remove_key(peer_keys[key_id], commit = commit_mode)
420                 message ("%s Key %d removed from person %s"%(peer['peername'], key_id, person['email']))
421
422             # Add new keys to user
423             for key_id in (set(person_key_ids) - set(old_person_key_ids)):
424                 message ("before add_key, passing person=%r"%person)
425                 message ("before add_key, passing key=%r"%peer_keys[key_id])
426                 person.add_key(peer_keys[key_id], commit = commit_mode)
427                 message ("%s Key %d added into person %s"%(peer['peername'],key_id, person['email']))
428
429         timers['persons'] = time.time() - start
430
431         #
432         # XXX Synchronize foreign boot states
433         #
434
435         boot_states = BootStates(self.api).dict()
436
437         #
438         # Synchronize foreign nodes
439         #
440
441         start = time.time()
442
443         message('Dealing with Nodes (1)')
444
445         # Compare only the columns returned by the GetPeerData() call
446         if peer_tables['Nodes']:
447             columns = peer_tables['Nodes'][0].keys()
448             columns = intersect (columns, Node.fields)
449         else:
450             columns = Node.fields
451
452         # Keyed on foreign node_id
453         old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
454         nodes_at_peer = dict([(node['node_id'], node) \
455                               for node in peer_tables['Nodes']])
456
457         # Fix up site_id and boot_states references
458         for peer_node_id, node in nodes_at_peer.items():
459             errors = []
460             if node['site_id'] not in peer_sites:
461                 errors.append("invalid site %d" % node['site_id'])
462             if node['boot_state'] not in boot_states:
463                 errors.append("invalid boot state %s" % node['boot_state'])
464             if errors:
465                 # XXX Log an event instead of printing to logfile
466                 message ("Warning: Skipping invalid %s node %r : " % (peer['peername'], node)\
467                              + ", ".join(errors))
468                 del nodes_at_peer[peer_node_id]
469                 continue
470             else:
471                 node['site_id'] = peer_sites[node['site_id']]['site_id']
472
473         # Synchronize new set
474         peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node, columns)
475
476         for peer_node_id, node in peer_nodes.iteritems():
477             # Bind any newly cached foreign nodes to peer
478             if peer_node_id not in old_peer_nodes:
479                 peer.add_node(node, peer_node_id, commit = commit_mode)
480                 node['peer_id'] = peer_id
481                 node['peer_node_id'] = peer_node_id
482
483         timers['nodes'] = time.time() - start
484
485         #
486         # Synchronize local nodes
487         #
488
489         start = time.time()
490         message('Dealing with Nodes (2)')
491
492         # Keyed on local node_id
493         local_nodes = Nodes(self.api).dict()
494
495         for node in peer_tables['PeerNodes']:
496             # Foreign identifier for our node as maintained by peer
497             peer_node_id = node['node_id']
498             # Local identifier for our node as cached by peer
499             node_id = node['peer_node_id']
500             if node_id in local_nodes:
501                 # Still a valid local node, add it to the synchronized
502                 # set of local node objects keyed on foreign node_id.
503                 peer_nodes[peer_node_id] = local_nodes[node_id]
504
505         timers['local_nodes'] = time.time() - start
506
507         #
508         # XXX Synchronize foreign slice instantiation states
509         #
510
511         slice_instantiations = SliceInstantiations(self.api).dict()
512
513         #
514         # Synchronize foreign slices
515         #
516
517         start = time.time()
518
519         message('Dealing with Slices (1)')
520
521         # Compare only the columns returned by the GetPeerData() call
522         if peer_tables['Slices']:
523             columns = peer_tables['Slices'][0].keys()
524             columns = intersect (columns, Slice.fields)
525         else:
526             columns = None
527
528         # Keyed on foreign slice_id
529         old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
530         slices_at_peer = dict([(slice['slice_id'], slice) \
531                                for slice in peer_tables['Slices']])
532
533         # Fix up site_id, instantiation, and creator_person_id references
534         for peer_slice_id, slice in slices_at_peer.items():
535             errors = []
536             if slice['site_id'] not in peer_sites:
537                 errors.append("invalid site %d" % slice['site_id'])
538             if slice['instantiation'] not in slice_instantiations:
539                 errors.append("invalid instantiation %s" % slice['instantiation'])
540             if slice['creator_person_id'] not in peer_persons:
541                 # Just NULL it out
542                 slice['creator_person_id'] = None
543             else:
544                 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
545             if errors:
546                 message("Warning: Skipping invalid %s slice %r : " % (peer['peername'], slice) \
547                             + ", ".join(errors))
548                 del slices_at_peer[peer_slice_id]
549                 continue
550             else:
551                 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
552
553         # Synchronize new set
554         peer_slices = sync(old_peer_slices, slices_at_peer, Slice, columns)
555
556         message('Dealing with Slices (2)')
557         # transcoder : retrieve a local node_id from a peer_node_id
558         node_transcoder = dict ( [ (node['node_id'],peer_node_id) \
559                                    for peer_node_id,node in peer_nodes.iteritems()])
560         person_transcoder = dict ( [ (person['person_id'],peer_person_id) \
561                                      for peer_person_id,person in peer_persons.iteritems()])
562
563         for peer_slice_id, slice in peer_slices.iteritems():
564             # Bind any newly cached foreign slices to peer
565             if peer_slice_id not in old_peer_slices:
566                 peer.add_slice(slice, peer_slice_id, commit = commit_mode)
567                 slice['peer_id'] = peer_id
568                 slice['peer_slice_id'] = peer_slice_id
569                 slice['node_ids'] = []
570                 slice['person_ids'] = []
571
572             # Slice as viewed by peer
573             peer_slice = slices_at_peer[peer_slice_id]
574
575             # Nodes that are currently part of the slice
576             old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
577                                    if node_id in node_transcoder and node_transcoder[node_id] in peer_nodes]
578
579             # Nodes that should be part of the slice
580             slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
581
582             # Remove stale nodes from slice
583             for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
584                 slice.remove_node(peer_nodes[node_id], commit = commit_mode)
585                 message ("%s node %s removed from slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
586
587             # Add new nodes to slice
588             for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
589                 slice.add_node(peer_nodes[node_id], commit = commit_mode)
590                 message ("%s node %s added into slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
591
592             # N.B.: Local nodes that may have been added to the slice
593             # by hand, are removed. In other words, don't do this.
594
595             # Foreign users that are currently part of the slice
596             #old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
597             #                if person_transcoder[person_id] in peer_persons]
598             # An issue occurred with a user who registered on both sites (same email)
599             # So the remote person could not get cached locally
600             # The one-line map/filter style is nicer but ineffective here
601             old_slice_person_ids = []
602             for person_id in slice['person_ids']:
603                 if not person_transcoder.has_key(person_id):
604                     message ('WARNING : person_id %d in %s not transcodable (1) - skipped'%(person_id,slice['name']))
605                 elif person_transcoder[person_id] not in peer_persons:
606                     message('WARNING : person_id %d in %s not transcodable (2) - skipped'%(person_id,slice['name']))
607                 else:
608                     old_slice_person_ids += [person_transcoder[person_id]]
609
610             # Foreign users that should be part of the slice
611             slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
612
613             # Remove stale users from slice
614             for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
615                 slice.remove_person(peer_persons[person_id], commit = commit_mode)
616                 message ("%s user %s removed from slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
617
618             # Add new users to slice
619             for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
620                 slice.add_person(peer_persons[person_id], commit = commit_mode)
621                 message ("%s user %s added into slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
622
623             # N.B.: Local users that may have been added to the slice
624             # by hand, are not touched.
625
626         timers['slices'] = time.time() - start
627
628         # Update peer itself and commit
629         peer.sync(commit = True)
630
631         return timers