2 # Thierry Parmentelat - INRIA
9 from PLC.Debug import log
10 from PLC.Faults import *
11 from PLC.Method import Method
12 from PLC.Parameter import Parameter, Mixed
13 from PLC.Auth import Auth
15 from PLC.Peers import Peer, Peers
16 from PLC.Sites import Site, Sites
17 from PLC.Persons import Person, Persons
18 from PLC.KeyTypes import KeyType, KeyTypes
19 from PLC.Keys import Key, Keys
20 from PLC.BootStates import BootState, BootStates
21 from PLC.Nodes import Node, Nodes
22 from PLC.SliceInstantiations import SliceInstantiations
23 from PLC.Slices import Slice, Slices
25 #################### settings
26 # initial version was doing only one final commit
27 # * set commit_mode to False to get that behaviour
28 # * set comit_mode to True to get everything synced at once
29 # the issue with the 'one-commit-at-the-end' approach is
30 # that the db gets basically totally locked during too long
31 # causing various issues/crashes in the rest of the system
34 # turn this to False only if both ends have the same db schema
35 # compatibility mode is a bit slower but probably safer on the long run
38 #################### debugging
41 # set to a filename for using cached data when debugging
42 # WARNING: does not actually connect to the peer in this case
44 # for debugging specific entries - display detailed info on selected objs
45 focus_type=None # set to e.g. 'Person'
46 focus_ids=[] # set to a list of ids (e.g. person_ids) - remote or local ids should work
48 #use_cache="/var/log/peers/getpeerdata.pickle"
53 #################### helpers
54 def message (to_print=None,verbose_only=False):
55 if verbose_only and not verbose:
57 print >> log, time.strftime("%m-%d-%H-%M-%S:"),
61 def message_verbose(to_print=None, header='VERBOSE'):
62 message("%s> %r"%(header,to_print),verbose_only=True)
65 #################### to avoid several instances running at the same time
70 def __init__(self, file_path, expire = 60 * 60 * 2):
72 self.fpath = file_path
76 if os.path.exists(self.fpath):
77 if (time.time() - os.stat(self.fpath).st_ctime) > self.expire:
81 message('FileLock.lock(%s) : %s' % (self.fpath, e))
84 self.fd = open(self.fpath, 'w')
85 fcntl.flock(self.fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
87 message('FileLock.lock(%s) : %s' % (self.fpath, e))
93 fcntl.flock(self.fd, fcntl.LOCK_UN | fcntl.LOCK_NB)
96 message('FileLock.unlock(%s) : %s' % (self.fpath, e))
99 class RefreshPeer(Method):
101 Fetches site, node, slice, person and key data from the specified peer
102 and caches it locally; also deletes stale entries.
103 Upon successful completion, returns a dict reporting various timers.
111 Mixed(Peer.fields['peer_id'],
112 Peer.fields['peername']),
115 returns = Parameter(int, "1 if successful")
117 ignore_site_fields=['peer_id', 'peer_site_id','last_updated', 'date_created',
118 'address_ids', 'node_ids', 'person_ids', 'pcu_ids', 'slice_ids' ]
119 ignore_key_fields=['peer_id','peer_key_id', 'person_id']
120 ignore_person_fields=['peer_id','peer_person_id','last_updated','date_created',
121 'roles','role_ids','key_ids','site_ids','slice_ids','person_tag_ids']
122 ignore_node_fields=['peer_id','peer_node_id','last_updated','last_contact','date_created',
123 'node_tag_ids', 'interface_ids', 'slice_ids', 'nodegroup_ids','pcu_ids','ports']
124 ignore_slice_fields=['peer_id','peer_slice_id','created',
125 'person_ids','slice_tag_ids','node_ids',]
127 def call(self, auth, peer_id_or_peername):
129 peername = Peers(self.api, [peer_id_or_peername], ['peername'])[0]['peername']
130 file_lock = FileLock("/tmp/refresh-peer-%s.lock" % peername)
131 if not file_lock.lock():
132 raise Exception, "Another instance of RefreshPeer is running."
134 ret_val = self.real_call(auth, peer_id_or_peername)
137 message("RefreshPeer caught exception - BEG")
139 traceback.print_exc()
140 message("RefreshPeer caught exception - END")
146 def real_call(self, auth, peer_id_or_peername):
148 peers = Peers(self.api, [peer_id_or_peername])
150 raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
152 peer_id = peer['peer_id']
154 # Connect to peer API
161 message('RefreshPeer starting up (commit_mode=%r)'%commit_mode)
163 message('Issuing GetPeerData')
164 peer_tables = peer.GetPeerData()
167 if os.path.isfile(use_cache):
168 message("use_cache: WARNING: using cached getpeerdata")
169 peer_tables=pickle.load(file(use_cache,'rb'))
171 message("use_cache: issuing getpeerdata")
172 peer_tables = peer.GetPeerData()
173 message("use_cache: saving in cache %s",use_cache)
174 pickle.dump(peer_tables,file(use_cache,'wb'))
176 # for smooth federation with 4.2 - ignore fields that are useless anyway, and rewrite boot_state
177 boot_state_rewrite={'dbg':'safeboot','diag':'safeboot','disable':'disabled',
178 'inst':'reinstall','rins':'reinstall','new':'reinstall','rcnf':'reinstall'}
179 for node in peer_tables['Nodes']:
180 for key in ['nodenetwork_ids','dummybox_id']:
183 if node['boot_state'] in boot_state_rewrite: node['boot_state']=boot_state_rewrite[node['boot_state']]
184 for slice in peer_tables['Slices']:
185 for key in ['slice_attribute_ids']:
188 timers['transport'] = time.time() - start - peer_tables['db_time']
189 timers['peer_db'] = peer_tables['db_time']
190 message_verbose('GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport']))
192 def sync(objects, peer_objects, classobj, columns):
194 Synchronizes two dictionaries of objects. objects should
195 be a dictionary of local objects keyed on their foreign
196 identifiers. peer_objects should be a dictionary of
197 foreign objects keyed on their local (i.e., foreign to us)
198 identifiers. Returns a final dictionary of local objects
199 keyed on their foreign identifiers.
202 classname=classobj(self.api).__class__.__name__
203 primary_key=getattr(classobj,'primary_key')
204 # display all peer objects of these types while looping
205 secondary_keys={'Node':'hostname','Slice':'name','Person':'email','Site':'login_base'}
207 if classname in secondary_keys: secondary_key=secondary_keys[classname]
209 message_verbose('Entering sync on %s (%s)'%(classname,primary_key))
213 # Delete stale objects
214 for peer_object_id, object in objects.iteritems():
215 if peer_object_id not in peer_objects:
216 object.delete(commit = commit_mode)
217 message("%s %s %s deleted"%(peer['peername'],classname, object[primary_key]))
219 total = len(peer_objects)
222 # peer_object_id, peer_object and object are dynamically bound in the loop below...
223 # (local) object might be None if creating a new one
225 if classname != focus_type: return False
226 return peer_object_id in focus_ids or \
227 (object and primary_key in object and object[primary_key] in focus_ids)
229 def message_focus (message):
232 message_verbose("peer_obj : %d [[%r]]"%(peer_object_id,peer_object),
233 header='FOCUS '+message)
234 # show local object if a match was found
235 if object: message_verbose("local_obj : <<%r>>"%(object),
236 header='FOCUS '+message);
239 # the function to compare a local object with its cadidate peer obj
240 # xxx probably faster when compatibility is False...
241 def equal_fields (object, peer_object, columns):
242 # fast version: must use __eq__() instead of == since
243 # peer_object may be a raw dict instead of a Peer object.
244 if not compatibility: return object.__eq__(peer_object)
246 for column in columns:
247 # if in_focus(): message ('FOCUS comparing column %s'%column)
248 if object[column] != peer_object[column]: return False
253 for column in columns:
254 test= object[column] == peer_object[column]
256 if not test: result=False
260 # Add/update new/existing objects
261 for peer_object_id, peer_object in peer_objects.iteritems():
263 if secondary_key: peer_object_name="(%s)"%peer_object[secondary_key]
264 message_verbose ('%s peer_object_id=%d %s (%d/%d)'%(classname,peer_object_id,peer_object_name,count,total))
266 if peer_object_id in synced:
267 message("Warning: %s Skipping already added %s: %r"%(
268 peer['peername'], classname, peer_object))
271 if peer_object_id in objects:
272 # Update existing object
273 object = objects[peer_object_id]
275 # Replace foreign identifier with existing local
276 # identifier temporarily for the purposes of
278 peer_object[primary_key] = object[primary_key]
280 if not equal_fields(object,peer_object, columns):
281 # Only update intrinsic fields
282 object.update(object.db_fields(peer_object))
283 message_focus ("DIFFERENCES : updated / syncing")
287 message_focus ("UNCHANGED - left intact / not syncing")
291 # Restore foreign identifier
292 peer_object[primary_key] = peer_object_id
296 object = classobj(self.api, peer_object)
297 # Replace foreign identifier with new local identifier
298 del object[primary_key]
299 message_focus ("NEW -- created with clean id - syncing")
304 message_verbose("syncing %s %d - commit_mode=%r"%(classname,peer_object_id,commit_mode))
306 object.sync(commit = commit_mode)
307 except PLCInvalidArgument, err:
308 # Skip if validation fails
309 # XXX Log an event instead of printing to logfile
310 message("Warning: %s Skipping invalid %s %r : %r"%(\
311 peer['peername'], classname, peer_object, err))
314 synced[peer_object_id] = object
317 message("%s: (%d/%d) %s %d %s %s"%(peer['peername'], count,total, classname,
318 object[primary_key], peer_object_name, action))
320 message_verbose("Exiting sync on %s"%classname)
324 ### over time, we've had issues with a given column being
325 ### added on one side and not on the other
326 ### this helper function computes the intersection of two list of fields/columns
327 def intersect (l1,l2):
328 if compatibility: return list (set(l1).intersection(set(l2)))
331 # some fields definitely need to be ignored
333 return list (set(l1).difference(set(l2)))
336 # Synchronize foreign sites
341 message('Dealing with Sites')
343 # Compare only the columns returned by the GetPeerData() call
344 if peer_tables['Sites']:
345 columns = peer_tables['Sites'][0].keys()
346 columns = intersect (columns, Site.fields)
350 # Keyed on foreign site_id
351 old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
352 sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
354 # Synchronize new set (still keyed on foreign site_id)
355 peer_sites = sync(old_peer_sites, sites_at_peer, Site, ignore (columns, RefreshPeer.ignore_site_fields))
357 for peer_site_id, site in peer_sites.iteritems():
358 # Bind any newly cached sites to peer
359 if peer_site_id not in old_peer_sites:
360 peer.add_site(site, peer_site_id, commit = commit_mode)
361 site['peer_id'] = peer_id
362 site['peer_site_id'] = peer_site_id
364 timers['site'] = time.time() - start
367 # XXX Synchronize foreign key types
370 message('Dealing with Keys')
372 key_types = KeyTypes(self.api).dict()
375 # Synchronize foreign keys
380 # Compare only the columns returned by the GetPeerData() call
381 if peer_tables['Keys']:
382 columns = peer_tables['Keys'][0].keys()
383 columns = intersect (columns, Key.fields)
387 # Keyed on foreign key_id
388 old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
389 keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
391 # Fix up key_type references
392 for peer_key_id, key in keys_at_peer.items():
393 if key['key_type'] not in key_types:
394 # XXX Log an event instead of printing to logfile
395 message("Warning: Skipping invalid %s key %r" % ( peer['peername'], key))
396 del keys_at_peer[peer_key_id]
399 # Synchronize new set (still keyed on foreign key_id)
400 peer_keys = sync(old_peer_keys, keys_at_peer, Key, ignore (columns, RefreshPeer.ignore_key_fields))
401 for peer_key_id, key in peer_keys.iteritems():
402 # Bind any newly cached keys to peer
403 if peer_key_id not in old_peer_keys:
404 peer.add_key(key, peer_key_id, commit = commit_mode)
405 key['peer_id'] = peer_id
406 key['peer_key_id'] = peer_key_id
408 timers['keys'] = time.time() - start
411 # Synchronize foreign users
416 message('Dealing with Persons')
418 # Compare only the columns returned by the GetPeerData() call
419 if peer_tables['Persons']:
420 columns = peer_tables['Persons'][0].keys()
421 columns = intersect (columns, Person.fields)
425 # Keyed on foreign person_id
426 old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
428 # artificially attach the persons returned by GetPeerData to the new peer
429 # this is because validate_email needs peer_id to be correct when checking for duplicates
430 for person in peer_tables['Persons']:
431 person['peer_id']=peer_id
432 persons_at_peer = dict([(peer_person['person_id'], peer_person) \
433 for peer_person in peer_tables['Persons']])
435 # XXX Do we care about membership in foreign site(s)?
437 # Synchronize new set (still keyed on foreign person_id)
438 peer_persons = sync(old_peer_persons, persons_at_peer, Person, ignore (columns, RefreshPeer.ignore_person_fields))
440 # transcoder : retrieve a local key_id from a peer_key_id
441 key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
442 for peer_key_id,key in peer_keys.iteritems()])
444 for peer_person_id, person in peer_persons.iteritems():
445 # Bind any newly cached users to peer
446 if peer_person_id not in old_peer_persons:
447 peer.add_person(person, peer_person_id, commit = commit_mode)
448 person['peer_id'] = peer_id
449 person['peer_person_id'] = peer_person_id
450 person['key_ids'] = []
452 # User as viewed by peer
453 peer_person = persons_at_peer[peer_person_id]
455 # Foreign keys currently belonging to the user
456 old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
457 if key_transcoder[key_id] in peer_keys]
459 # Foreign keys that should belong to the user
460 # this is basically peer_person['key_ids'], we just check it makes sense
461 # (e.g. we might have failed importing it)
462 person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
464 # Remove stale keys from user
465 for key_id in (set(old_person_key_ids) - set(person_key_ids)):
466 person.remove_key(peer_keys[key_id], commit = commit_mode)
467 message ("%s Key %d removed from person %s"%(peer['peername'], key_id, person['email']))
469 # Add new keys to user
470 for key_id in (set(person_key_ids) - set(old_person_key_ids)):
471 message ("before add_key, passing person=%r"%person)
472 message ("before add_key, passing key=%r"%peer_keys[key_id])
473 person.add_key(peer_keys[key_id], commit = commit_mode)
474 message ("%s Key %d added into person %s"%(peer['peername'],key_id, person['email']))
476 timers['persons'] = time.time() - start
479 # XXX Synchronize foreign boot states
482 boot_states = BootStates(self.api).dict()
485 # Synchronize foreign nodes
490 message('Dealing with Nodes (1)')
492 # Compare only the columns returned by the GetPeerData() call
493 if peer_tables['Nodes']:
494 columns = peer_tables['Nodes'][0].keys()
495 columns = intersect (columns, Node.fields)
497 columns = Node.fields
499 # Keyed on foreign node_id
500 old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
501 nodes_at_peer = dict([(node['node_id'], node) \
502 for node in peer_tables['Nodes']])
504 # Fix up site_id and boot_states references
505 for peer_node_id, node in nodes_at_peer.items():
507 if node['site_id'] not in peer_sites:
508 errors.append("invalid site %d" % node['site_id'])
509 if node['boot_state'] not in boot_states:
510 errors.append("invalid boot state %s" % node['boot_state'])
512 # XXX Log an event instead of printing to logfile
513 message ("Warning: Skipping invalid %s node %r : " % (peer['peername'], node)\
515 del nodes_at_peer[peer_node_id]
518 node['site_id'] = peer_sites[node['site_id']]['site_id']
520 # Synchronize new set
521 peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node, ignore (columns, RefreshPeer.ignore_node_fields))
523 for peer_node_id, node in peer_nodes.iteritems():
524 # Bind any newly cached foreign nodes to peer
525 if peer_node_id not in old_peer_nodes:
526 peer.add_node(node, peer_node_id, commit = commit_mode)
527 node['peer_id'] = peer_id
528 node['peer_node_id'] = peer_node_id
530 timers['nodes'] = time.time() - start
533 # Synchronize local nodes
537 message('Dealing with Nodes (2)')
539 # Keyed on local node_id
540 local_nodes = Nodes(self.api).dict()
542 for node in peer_tables['PeerNodes']:
543 # Foreign identifier for our node as maintained by peer
544 peer_node_id = node['node_id']
545 # Local identifier for our node as cached by peer
546 node_id = node['peer_node_id']
547 if node_id in local_nodes:
548 # Still a valid local node, add it to the synchronized
549 # set of local node objects keyed on foreign node_id.
550 peer_nodes[peer_node_id] = local_nodes[node_id]
552 timers['local_nodes'] = time.time() - start
555 # XXX Synchronize foreign slice instantiation states
558 slice_instantiations = SliceInstantiations(self.api).dict()
561 # Synchronize foreign slices
566 message('Dealing with Slices (1)')
568 # Compare only the columns returned by the GetPeerData() call
569 if peer_tables['Slices']:
570 columns = peer_tables['Slices'][0].keys()
571 columns = intersect (columns, Slice.fields)
575 # Keyed on foreign slice_id
576 old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
577 slices_at_peer = dict([(slice['slice_id'], slice) \
578 for slice in peer_tables['Slices']])
580 # Fix up site_id, instantiation, and creator_person_id references
581 for peer_slice_id, slice in slices_at_peer.items():
583 if slice['site_id'] not in peer_sites:
584 errors.append("invalid site %d" % slice['site_id'])
585 if slice['instantiation'] not in slice_instantiations:
586 errors.append("invalid instantiation %s" % slice['instantiation'])
587 if slice['creator_person_id'] not in peer_persons:
589 slice['creator_person_id'] = None
591 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
593 message("Warning: Skipping invalid %s slice %r : " % (peer['peername'], slice) \
595 del slices_at_peer[peer_slice_id]
598 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
600 # Synchronize new set
601 peer_slices = sync(old_peer_slices, slices_at_peer, Slice, ignore (columns, RefreshPeer.ignore_slice_fields))
603 message('Dealing with Slices (2)')
604 # transcoder : retrieve a local node_id from a peer_node_id
605 node_transcoder = dict ( [ (node['node_id'],peer_node_id) \
606 for peer_node_id,node in peer_nodes.iteritems()])
607 person_transcoder = dict ( [ (person['person_id'],peer_person_id) \
608 for peer_person_id,person in peer_persons.iteritems()])
610 for peer_slice_id, slice in peer_slices.iteritems():
611 # Bind any newly cached foreign slices to peer
612 if peer_slice_id not in old_peer_slices:
613 peer.add_slice(slice, peer_slice_id, commit = commit_mode)
614 slice['peer_id'] = peer_id
615 slice['peer_slice_id'] = peer_slice_id
616 slice['node_ids'] = []
617 slice['person_ids'] = []
619 # Slice as viewed by peer
620 peer_slice = slices_at_peer[peer_slice_id]
622 # Nodes that are currently part of the slice
623 old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
624 if node_id in node_transcoder and node_transcoder[node_id] in peer_nodes]
626 # Nodes that should be part of the slice
627 slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
629 # Remove stale nodes from slice
630 for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
631 slice.remove_node(peer_nodes[node_id], commit = commit_mode)
632 message ("%s node %s removed from slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
634 # Add new nodes to slice
635 for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
636 slice.add_node(peer_nodes[node_id], commit = commit_mode)
637 message ("%s node %s added into slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
639 # N.B.: Local nodes that may have been added to the slice
640 # by hand, are removed. In other words, don't do this.
642 # Foreign users that are currently part of the slice
643 #old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
644 # if person_transcoder[person_id] in peer_persons]
645 # An issue occurred with a user who registered on both sites (same email)
646 # So the remote person could not get cached locally
647 # The one-line map/filter style is nicer but ineffective here
648 old_slice_person_ids = []
649 for person_id in slice['person_ids']:
650 if not person_transcoder.has_key(person_id):
651 message ('WARNING : person_id %d in %s not transcodable (1) - skipped'%(person_id,slice['name']))
652 elif person_transcoder[person_id] not in peer_persons:
653 message('WARNING : person_id %d in %s not transcodable (2) - skipped'%(person_id,slice['name']))
655 old_slice_person_ids += [person_transcoder[person_id]]
657 # Foreign users that should be part of the slice
658 slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
660 # Remove stale users from slice
661 for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
662 slice.remove_person(peer_persons[person_id], commit = commit_mode)
663 message ("%s user %s removed from slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
665 # Add new users to slice
666 for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
667 slice.add_person(peer_persons[person_id], commit = commit_mode)
668 message ("%s user %s added into slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
670 # N.B.: Local users that may have been added to the slice
671 # by hand, are not touched.
673 timers['slices'] = time.time() - start
675 # Update peer itself and commit
676 peer.sync(commit = True)