2 # Thierry Parmentelat - INRIA
9 from PLC.Debug import log
10 from PLC.Faults import *
11 from PLC.Method import Method
12 from PLC.Parameter import Parameter, Mixed
13 from PLC.Auth import Auth
15 from PLC.Peers import Peer, Peers
16 from PLC.Sites import Site, Sites
17 from PLC.Persons import Person, Persons
18 from PLC.KeyTypes import KeyType, KeyTypes
19 from PLC.Keys import Key, Keys
20 from PLC.BootStates import BootState, BootStates
21 from PLC.Nodes import Node, Nodes
22 from PLC.SliceInstantiations import SliceInstantiations
23 from PLC.Slices import Slice, Slices
25 #################### settings
26 # initial version was doing only one final commit
27 # * set commit_mode to False to get that behaviour
28 # * set comit_mode to True to get everything synced at once
29 # the issue with the 'one-commit-at-the-end' approach is
30 # that the db gets basically totally locked during too long
31 # causing various issues/crashes in the rest of the system
34 # turn this to False only if both ends have the same db schema
35 # compatibility mode is a bit slower but probably safer on the long run
40 # for debugging specific entries - display detailed info on selected objs
41 focus_type=None # set to e.g. 'Person'
42 focus_ids=[] # set to a list of ids (e.g. person_ids) - remote or local ids should work
45 #focus_ids=[29103,239578,28825]
47 #################### helpers
48 def message (to_print=None,verbose_only=False):
49 if verbose_only and not verbose:
51 print >> log, time.strftime("%m-%d-%H-%M-%S:"),
55 def message_verbose(to_print=None, header='VERBOSE'):
56 message("%s> %r"%(header,to_print),verbose_only=True)
59 #################### to avoid several instances running at the same time
64 def __init__(self, file_path, expire = 60 * 60 * 2):
66 self.fpath = file_path
70 if os.path.exists(self.fpath):
71 if (time.time() - os.stat(self.fpath).st_ctime) > self.expire:
75 message('FileLock.lock(%s) : %s' % (self.fpath, e))
78 self.fd = open(self.fpath, 'w')
79 fcntl.flock(self.fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
81 message('FileLock.lock(%s) : %s' % (self.fpath, e))
87 fcntl.flock(self.fd, fcntl.LOCK_UN | fcntl.LOCK_NB)
90 message('FileLock.unlock(%s) : %s' % (self.fpath, e))
93 class RefreshPeer(Method):
95 Fetches site, node, slice, person and key data from the specified peer
96 and caches it locally; also deletes stale entries.
97 Upon successful completion, returns a dict reporting various timers.
105 Mixed(Peer.fields['peer_id'],
106 Peer.fields['peername']),
109 returns = Parameter(int, "1 if successful")
111 def call(self, auth, peer_id_or_peername):
113 peername = Peers(self.api, [peer_id_or_peername], ['peername'])[0]['peername']
114 file_lock = FileLock("/tmp/refresh-peer-%s.lock" % peername)
115 if not file_lock.lock():
116 raise Exception, "Another instance of RefreshPeer is running."
118 ret_val = self.real_call(auth, peer_id_or_peername)
121 message("RefreshPeer caught exception - BEG")
123 traceback.print_exc()
124 message("RefreshPeer caught exception - END")
130 def real_call(self, auth, peer_id_or_peername):
132 peers = Peers(self.api, [peer_id_or_peername])
134 raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
136 peer_id = peer['peer_id']
138 # Connect to peer API
145 message('RefreshPeer starting up (commit_mode=%r)'%commit_mode)
146 message('Issuing GetPeerData')
147 peer_tables = peer.GetPeerData()
148 # for smooth federation with 4.2 - ignore fields that are useless anyway, and rewrite boot_state
149 boot_state_rewrite={'dbg':'safeboot','diag':'safeboot','disable':'disabled',
150 'inst':'reinstall','rins':'reinstall','new':'reinstall','rcnf':'reinstall'}
151 for node in peer_tables['Nodes']:
152 for key in ['nodenetwork_ids','dummybox_id']:
155 if node['boot_state'] in boot_state_rewrite: node['boot_state']=boot_state_rewrite[node['boot_state']]
156 for slice in peer_tables['Slices']:
157 for key in ['slice_attribute_ids']:
160 timers['transport'] = time.time() - start - peer_tables['db_time']
161 timers['peer_db'] = peer_tables['db_time']
162 message_verbose('GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport']))
164 def sync(objects, peer_objects, classobj, columns):
166 Synchronizes two dictionaries of objects. objects should
167 be a dictionary of local objects keyed on their foreign
168 identifiers. peer_objects should be a dictionary of
169 foreign objects keyed on their local (i.e., foreign to us)
170 identifiers. Returns a final dictionary of local objects
171 keyed on their foreign identifiers.
174 classname=classobj(self.api).__class__.__name__
175 primary_key=getattr(classobj,'primary_key')
176 # display all peer objects of these types while looping
177 secondary_keys={'Node':'hostname','Slice':'name','Person':'email','Site':'login_base'}
179 if classname in secondary_keys: secondary_key=secondary_keys[classname]
181 message_verbose('Entering sync on %s (%s)'%(classname,primary_key))
185 # Delete stale objects
186 for peer_object_id, object in objects.iteritems():
187 if peer_object_id not in peer_objects:
188 object.delete(commit = commit_mode)
189 message("%s %s %s deleted"%(peer['peername'],classname, object[primary_key]))
191 total = len(peer_objects)
194 # peer_object_id, peer_object and object are dynamically bound in the loop below...
195 # (local) object might be None if creating a new one
196 def message_focus (message):
197 if classname != focus_type: return
198 if peer_object_id in focus_ids or \
199 (object and primary_key in object and object[primary_key] in focus_ids):
201 message_verbose("peer_obj : %d [[%r]]"%(peer_object_id,peer_object),
202 header='FOCUS '+message)
203 # show local object if a match was found
204 if object: message_verbose("local_obj : <<%r>>"%(object),
205 header='FOCUS '+message);
207 # the function to compare a local object with its cadidate peer obj
208 # xxx probably faster when compatibility is False...
209 def equal_fields (object, peer_object, columns):
210 # fast version: must use __eq__() instead of == since
211 # peer_object may be a raw dict instead of a Peer object.
212 if not compatibility: return object.__eq__(peer_object)
214 for column in columns:
215 if object[column] != peer_object[column]: return False
218 # Add/update new/existing objects
219 for peer_object_id, peer_object in peer_objects.iteritems():
221 if secondary_key: peer_object_name="(%s)"%peer_object[secondary_key]
222 message_verbose ('%s peer_object_id=%d %s (%d/%d)'%(classname,peer_object_id,peer_object_name,count,total))
224 if peer_object_id in synced:
225 message("Warning: %s Skipping already added %s: %r"%(
226 peer['peername'], classname, peer_object))
229 if peer_object_id in objects:
230 # Update existing object
231 object = objects[peer_object_id]
233 # Replace foreign identifier with existing local
234 # identifier temporarily for the purposes of
236 peer_object[primary_key] = object[primary_key]
238 if equal_fields(object,peer_object, columns):
239 # Only update intrinsic fields
240 object.update(object.db_fields(peer_object))
241 message_focus ("DIFFERENCES : updated / syncing")
245 message_focus ("UNCHANGED - left intact / not syncing")
249 # Restore foreign identifier
250 peer_object[primary_key] = peer_object_id
254 object = classobj(self.api, peer_object)
255 # Replace foreign identifier with new local identifier
256 del object[primary_key]
257 message_focus ("NEW -- created with clean id - syncing")
262 message_verbose("syncing %s %d - commit_mode=%r"%(classname,peer_object_id,commit_mode))
264 object.sync(commit = commit_mode)
265 except PLCInvalidArgument, err:
266 # Skip if validation fails
267 # XXX Log an event instead of printing to logfile
268 message("Warning: %s Skipping invalid %s %r : %r"%(\
269 peer['peername'], classname, peer_object, err))
272 synced[peer_object_id] = object
275 message("%s: %s %d %s %s"%(peer['peername'], classname, object[primary_key], peer_object_name, action))
277 message_verbose("Exiting sync on %s"%classname)
281 ### over time, we've had issues with a given column being
282 ### added on one side and not on the other
283 ### this helper function computes the intersection of two list of fields/columns
284 def intersect (l1,l2):
285 if compatibility: return list (set(l1).intersection(set(l2)))
289 # Synchronize foreign sites
294 message('Dealing with Sites')
296 # Compare only the columns returned by the GetPeerData() call
297 if peer_tables['Sites']:
298 columns = peer_tables['Sites'][0].keys()
299 columns = intersect (columns, Site.fields)
303 # Keyed on foreign site_id
304 old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
305 sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
307 # Synchronize new set (still keyed on foreign site_id)
308 peer_sites = sync(old_peer_sites, sites_at_peer, Site, columns)
310 for peer_site_id, site in peer_sites.iteritems():
311 # Bind any newly cached sites to peer
312 if peer_site_id not in old_peer_sites:
313 peer.add_site(site, peer_site_id, commit = commit_mode)
314 site['peer_id'] = peer_id
315 site['peer_site_id'] = peer_site_id
317 timers['site'] = time.time() - start
320 # XXX Synchronize foreign key types
323 message('Dealing with Keys')
325 key_types = KeyTypes(self.api).dict()
328 # Synchronize foreign keys
333 # Compare only the columns returned by the GetPeerData() call
334 if peer_tables['Keys']:
335 columns = peer_tables['Keys'][0].keys()
336 columns = intersect (columns, Key.fields)
340 # Keyed on foreign key_id
341 old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
342 keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
344 # Fix up key_type references
345 for peer_key_id, key in keys_at_peer.items():
346 if key['key_type'] not in key_types:
347 # XXX Log an event instead of printing to logfile
348 message("Warning: Skipping invalid %s key %r" % ( peer['peername'], key))
349 del keys_at_peer[peer_key_id]
352 # Synchronize new set (still keyed on foreign key_id)
353 peer_keys = sync(old_peer_keys, keys_at_peer, Key, columns)
354 for peer_key_id, key in peer_keys.iteritems():
355 # Bind any newly cached keys to peer
356 if peer_key_id not in old_peer_keys:
357 peer.add_key(key, peer_key_id, commit = commit_mode)
358 key['peer_id'] = peer_id
359 key['peer_key_id'] = peer_key_id
361 timers['keys'] = time.time() - start
364 # Synchronize foreign users
369 message('Dealing with Persons')
371 # Compare only the columns returned by the GetPeerData() call
372 if peer_tables['Persons']:
373 columns = peer_tables['Persons'][0].keys()
374 columns = intersect (columns, Person.fields)
378 # Keyed on foreign person_id
379 old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
381 # artificially attach the persons returned by GetPeerData to the new peer
382 # this is because validate_email needs peer_id to be correct when checking for duplicates
383 for person in peer_tables['Persons']:
384 person['peer_id']=peer_id
385 persons_at_peer = dict([(peer_person['person_id'], peer_person) \
386 for peer_person in peer_tables['Persons']])
388 # XXX Do we care about membership in foreign site(s)?
390 # Synchronize new set (still keyed on foreign person_id)
391 peer_persons = sync(old_peer_persons, persons_at_peer, Person, columns)
393 # transcoder : retrieve a local key_id from a peer_key_id
394 key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
395 for peer_key_id,key in peer_keys.iteritems()])
397 for peer_person_id, person in peer_persons.iteritems():
398 # Bind any newly cached users to peer
399 if peer_person_id not in old_peer_persons:
400 peer.add_person(person, peer_person_id, commit = commit_mode)
401 person['peer_id'] = peer_id
402 person['peer_person_id'] = peer_person_id
403 person['key_ids'] = []
405 # User as viewed by peer
406 peer_person = persons_at_peer[peer_person_id]
408 # Foreign keys currently belonging to the user
409 old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
410 if key_transcoder[key_id] in peer_keys]
412 # Foreign keys that should belong to the user
413 # this is basically peer_person['key_ids'], we just check it makes sense
414 # (e.g. we might have failed importing it)
415 person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
417 # Remove stale keys from user
418 for key_id in (set(old_person_key_ids) - set(person_key_ids)):
419 person.remove_key(peer_keys[key_id], commit = commit_mode)
420 message ("%s Key %d removed from person %s"%(peer['peername'], key_id, person['email']))
422 # Add new keys to user
423 for key_id in (set(person_key_ids) - set(old_person_key_ids)):
424 message ("before add_key, passing person=%r"%person)
425 message ("before add_key, passing key=%r"%peer_keys[key_id])
426 person.add_key(peer_keys[key_id], commit = commit_mode)
427 message ("%s Key %d added into person %s"%(peer['peername'],key_id, person['email']))
429 timers['persons'] = time.time() - start
432 # XXX Synchronize foreign boot states
435 boot_states = BootStates(self.api).dict()
438 # Synchronize foreign nodes
443 message('Dealing with Nodes (1)')
445 # Compare only the columns returned by the GetPeerData() call
446 if peer_tables['Nodes']:
447 columns = peer_tables['Nodes'][0].keys()
448 columns = intersect (columns, Node.fields)
450 columns = Node.fields
452 # Keyed on foreign node_id
453 old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
454 nodes_at_peer = dict([(node['node_id'], node) \
455 for node in peer_tables['Nodes']])
457 # Fix up site_id and boot_states references
458 for peer_node_id, node in nodes_at_peer.items():
460 if node['site_id'] not in peer_sites:
461 errors.append("invalid site %d" % node['site_id'])
462 if node['boot_state'] not in boot_states:
463 errors.append("invalid boot state %s" % node['boot_state'])
465 # XXX Log an event instead of printing to logfile
466 message ("Warning: Skipping invalid %s node %r : " % (peer['peername'], node)\
468 del nodes_at_peer[peer_node_id]
471 node['site_id'] = peer_sites[node['site_id']]['site_id']
473 # Synchronize new set
474 peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node, columns)
476 for peer_node_id, node in peer_nodes.iteritems():
477 # Bind any newly cached foreign nodes to peer
478 if peer_node_id not in old_peer_nodes:
479 peer.add_node(node, peer_node_id, commit = commit_mode)
480 node['peer_id'] = peer_id
481 node['peer_node_id'] = peer_node_id
483 timers['nodes'] = time.time() - start
486 # Synchronize local nodes
490 message('Dealing with Nodes (2)')
492 # Keyed on local node_id
493 local_nodes = Nodes(self.api).dict()
495 for node in peer_tables['PeerNodes']:
496 # Foreign identifier for our node as maintained by peer
497 peer_node_id = node['node_id']
498 # Local identifier for our node as cached by peer
499 node_id = node['peer_node_id']
500 if node_id in local_nodes:
501 # Still a valid local node, add it to the synchronized
502 # set of local node objects keyed on foreign node_id.
503 peer_nodes[peer_node_id] = local_nodes[node_id]
505 timers['local_nodes'] = time.time() - start
508 # XXX Synchronize foreign slice instantiation states
511 slice_instantiations = SliceInstantiations(self.api).dict()
514 # Synchronize foreign slices
519 message('Dealing with Slices (1)')
521 # Compare only the columns returned by the GetPeerData() call
522 if peer_tables['Slices']:
523 columns = peer_tables['Slices'][0].keys()
524 columns = intersect (columns, Slice.fields)
528 # Keyed on foreign slice_id
529 old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
530 slices_at_peer = dict([(slice['slice_id'], slice) \
531 for slice in peer_tables['Slices']])
533 # Fix up site_id, instantiation, and creator_person_id references
534 for peer_slice_id, slice in slices_at_peer.items():
536 if slice['site_id'] not in peer_sites:
537 errors.append("invalid site %d" % slice['site_id'])
538 if slice['instantiation'] not in slice_instantiations:
539 errors.append("invalid instantiation %s" % slice['instantiation'])
540 if slice['creator_person_id'] not in peer_persons:
542 slice['creator_person_id'] = None
544 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
546 message("Warning: Skipping invalid %s slice %r : " % (peer['peername'], slice) \
548 del slices_at_peer[peer_slice_id]
551 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
553 # Synchronize new set
554 peer_slices = sync(old_peer_slices, slices_at_peer, Slice, columns)
556 message('Dealing with Slices (2)')
557 # transcoder : retrieve a local node_id from a peer_node_id
558 node_transcoder = dict ( [ (node['node_id'],peer_node_id) \
559 for peer_node_id,node in peer_nodes.iteritems()])
560 person_transcoder = dict ( [ (person['person_id'],peer_person_id) \
561 for peer_person_id,person in peer_persons.iteritems()])
563 for peer_slice_id, slice in peer_slices.iteritems():
564 # Bind any newly cached foreign slices to peer
565 if peer_slice_id not in old_peer_slices:
566 peer.add_slice(slice, peer_slice_id, commit = commit_mode)
567 slice['peer_id'] = peer_id
568 slice['peer_slice_id'] = peer_slice_id
569 slice['node_ids'] = []
570 slice['person_ids'] = []
572 # Slice as viewed by peer
573 peer_slice = slices_at_peer[peer_slice_id]
575 # Nodes that are currently part of the slice
576 old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
577 if node_id in node_transcoder and node_transcoder[node_id] in peer_nodes]
579 # Nodes that should be part of the slice
580 slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
582 # Remove stale nodes from slice
583 for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
584 slice.remove_node(peer_nodes[node_id], commit = commit_mode)
585 message ("%s node %s removed from slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
587 # Add new nodes to slice
588 for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
589 slice.add_node(peer_nodes[node_id], commit = commit_mode)
590 message ("%s node %s added into slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
592 # N.B.: Local nodes that may have been added to the slice
593 # by hand, are removed. In other words, don't do this.
595 # Foreign users that are currently part of the slice
596 #old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
597 # if person_transcoder[person_id] in peer_persons]
598 # An issue occurred with a user who registered on both sites (same email)
599 # So the remote person could not get cached locally
600 # The one-line map/filter style is nicer but ineffective here
601 old_slice_person_ids = []
602 for person_id in slice['person_ids']:
603 if not person_transcoder.has_key(person_id):
604 message ('WARNING : person_id %d in %s not transcodable (1) - skipped'%(person_id,slice['name']))
605 elif person_transcoder[person_id] not in peer_persons:
606 message('WARNING : person_id %d in %s not transcodable (2) - skipped'%(person_id,slice['name']))
608 old_slice_person_ids += [person_transcoder[person_id]]
610 # Foreign users that should be part of the slice
611 slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
613 # Remove stale users from slice
614 for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
615 slice.remove_person(peer_persons[person_id], commit = commit_mode)
616 message ("%s user %s removed from slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
618 # Add new users to slice
619 for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
620 slice.add_person(peer_persons[person_id], commit = commit_mode)
621 message ("%s user %s added into slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
623 # N.B.: Local users that may have been added to the slice
624 # by hand, are not touched.
626 timers['slices'] = time.time() - start
628 # Update peer itself and commit
629 peer.sync(commit = True)