2 # Thierry Parmentelat - INRIA
9 from PLC.Logger import logger
10 from PLC.Faults import *
11 from PLC.Method import Method
12 from PLC.Parameter import Parameter, Mixed
13 from PLC.Auth import Auth
15 from PLC.Peers import Peer, Peers
16 from PLC.Sites import Site, Sites
17 from PLC.Persons import Person, Persons
18 from PLC.KeyTypes import KeyType, KeyTypes
19 from PLC.Keys import Key, Keys
20 from PLC.BootStates import BootState, BootStates
21 from PLC.Nodes import Node, Nodes
22 from PLC.SliceInstantiations import SliceInstantiations
23 from PLC.Slices import Slice, Slices
24 from PLC.Roles import Role, Roles
27 # initial version was doing only one final commit
28 # * set commit_mode to False to get that behaviour
29 # * set comit_mode to True to get everything synced at once
30 # the issue with the 'one-commit-at-the-end' approach is
31 # that the db gets basically totally locked during too long
32 # causing various issues/crashes in the rest of the system
35 # turn this to False only if both ends have the same db schema
36 # compatibility mode is a bit slower but probably safer on the long run
43 # for debugging specific entries - display detailed info on selected objs
44 focus_type = None # set to e.g. 'Person'
45 # set to a list of ids (e.g. person_ids) - remote or local ids should work
47 # DEBUGGING SETTINGS example
48 # set to a filename for using cached data when debugging
49 # WARNING: does *not* actually connect to the peer in this case
50 # or more precisely, connect only if the file is not found
52 # use_cache = "/var/log/peers/getpeerdata.json"
54 # focus_type = 'Person'
55 # focus_ids = [621, 1088]
60 def message(to_print=None, verbose_only=False):
61 if verbose_only and not verbose:
66 def message_verbose(to_print=None, header='VERBOSE'):
67 message("{}> {}".format(header, to_print), verbose_only=True)
70 # to avoid several instances running at the same time
76 def __init__(self, file_path, expire=60 * 60 * 2):
78 self.fpath = file_path
82 if os.path.exists(self.fpath):
83 if (time.time() - os.stat(self.fpath).st_ctime) > self.expire:
86 except Exception as e:
87 message('FileLock.lock({}) : {}'.format(self.fpath, e))
90 self.fd = open(self.fpath, 'w')
91 fcntl.flock(self.fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
93 message('FileLock.lock({}) : {}'.format(self.fpath, e))
99 fcntl.flock(self.fd, fcntl.LOCK_UN | fcntl.LOCK_NB)
102 message('FileLock.unlock({}) : {}'.format(self.fpath, e))
105 class RefreshPeer(Method):
107 Fetches site, node, slice, person and key data from the specified peer
108 and caches it locally; also deletes stale entries.
109 Upon successful completion, returns a dict reporting various timers.
117 Mixed(Peer.fields['peer_id'],
118 Peer.fields['peername']),
121 returns = Parameter(dict, "various timers")
123 # the columns ignored in the comparison - it is safe to ignore:
124 # (*) the primary key - obviously this is not expected to match
125 # (*) peer_id and the transcode key, likewise
126 # (*) all identifiers that refer to other objects in the db
127 # like e.g. Person.site_ids since this is managed separately
128 # and does not make sense any way
129 # (*) we also ignore things like date_created and last_updated
130 # that refer to local db creation/update
131 ignore_site_fields = [
132 'site_id', 'peer_id', 'peer_site_id',
133 'address_ids', 'node_ids', 'person_ids', 'pcu_ids', 'slice_ids', 'site_tag_ids',
134 'date_created', 'last_updated',
136 ignore_key_fields = [
137 'key_id', 'peer_id', 'peer_key_id',
140 ignore_person_fields = [
141 'person_id', 'peer_id', 'peer_person_id',
142 'key_ids', 'slice_ids', 'person_tag_ids', 'role_ids', 'roles', 'site_ids',
143 'date_created', 'last_updated',
145 ignore_node_fields = [
146 'node_id', 'peer_id', 'peer_node_id',
147 'node_tag_ids', 'interface_ids', 'slice_ids', 'nodegroup_ids', 'pcu_ids', 'ports',
148 'date_created', 'last_updated',
149 # somehow those won't print in the ple db
150 'last_download', 'last_contact', 'last_pcu_reboot', 'last_boot',
151 'last_time_spent_offline', 'last_time_spent_online', 'last_pcu_confirmation',
153 ignore_slice_fields = [
154 'slice_id', 'peer_id', 'peer_slice_id',
155 'person_ids', 'slice_tag_ids', 'node_ids',
159 def call(self, auth, peer_id_or_peername):
161 peername = Peers(self.api, [peer_id_or_peername], [
162 'peername'])[0]['peername']
163 file_lock = FileLock("/tmp/refresh-peer-{peername}.lock"
164 .format(peername=peername))
165 if not file_lock.lock():
166 raise Exception("Another instance of RefreshPeer is running.")
168 ret_val = self.real_call(auth, peer_id_or_peername)
169 except Exception as e:
171 logger.exception("RefreshPeer caught exception - BEG")
172 message("RefreshPeer caught exception - END")
177 def real_call(self, auth, peer_id_or_peername):
179 peers = Peers(self.api, [peer_id_or_peername])
181 raise PLCInvalidArgument("No such peer '{}'".format(str(peer_id_or_peername)))
183 peer_id = peer['peer_id']
184 peername = peer['peername']
186 # Connect to peer API
193 message('RefreshPeer starting up (commit_mode={})'.format(commit_mode))
195 message('Issuing GetPeerData')
196 peer_tables = peer.GetPeerData()
199 if os.path.isfile(use_cache):
200 message("use_cache: WARNING: using cached getpeerdata")
201 with open(use_cache) as storage:
202 peer_tables = json.load(storage)
204 message("use_cache: issuing GetPeerData")
205 peer_tables = peer.GetPeerData()
206 message("use_cache: saving in cache {}".format(use_cache))
207 with open(use_cache, 'w') as storage:
208 json.dump(peer_tables, storage)
210 # additions in June 2017
212 # remove entries not marked as enabled
213 # actually the 'enabled' flag is present on 'Sites' and 'Persons'
214 # however we accept disabled slices as
215 # (*) they don't come and go too often
216 # (*) they may contain vlid nodes, that we would then lose
217 # if we were to discard those sites
218 # so bottom line, we filter out only disabled persons
219 for cls in ('Persons',) :
221 obj for obj in peer_tables[cls] if obj['enabled']
224 # somehow we can see GetPeerData from PLC that contains references
225 # to nodes that are not exposed themselves
226 # which suggests some inconsistency on their end
227 # anyway, it's safer to sanitize the dataset to avoid corruption
228 exposed_peer_node_ids = { n['node_id'] for n in peer_tables['Nodes']}
229 for slice in peer_tables['Slices']:
230 before = len(slice['node_ids'])
231 slice['node_ids'] = [x for x in slice['node_ids'] if x in exposed_peer_node_ids]
232 after = len(slice['node_ids'])
234 message("{peername} slice {slicename} got sanitized - {diff} node entries removed out of {before}"
235 .format(peername=peername, slicename=slice['name'],
236 diff=before-after, before=before))
240 # for smooth federation with 4.2 - ignore fields that are useless
241 # anyway, and rewrite boot_state
242 boot_state_rewrite = {'dbg': 'safeboot', 'diag': 'safeboot', 'disable': 'disabled',
243 'inst': 'reinstall', 'rins': 'reinstall', 'new': 'reinstall', 'rcnf': 'reinstall'}
244 for node in peer_tables['Nodes']:
245 for key in ['nodenetwork_ids', 'dummybox_id']:
248 if node['boot_state'] in boot_state_rewrite:
249 node['boot_state'] = boot_state_rewrite[node['boot_state']]
250 for slice in peer_tables['Slices']:
251 for key in ['slice_attribute_ids']:
254 timers['transport'] = time.time() - start - peer_tables['db_time']
255 timers['peer_db'] = peer_tables['db_time']
256 message_verbose('GetPeerData returned -> db={} transport={}'
257 .format(timers['peer_db'], timers['transport']))
259 def sync(objects, peer_objects, classobj, columns):
261 Synchronizes two dictionaries of objects. objects should
262 be a dictionary of local objects keyed on their foreign
263 identifiers. peer_objects should be a dictionary of
264 foreign objects keyed on their local (i.e., foreign to us)
265 identifiers. Returns a final dictionary of local objects
266 keyed on their foreign identifiers.
269 classname = classobj(self.api).__class__.__name__
270 primary_key = getattr(classobj, 'primary_key')
271 # display all peer objects of these types while looping
272 secondary_keys = {'Node': 'hostname', 'Slice': 'name',
273 'Person': 'email', 'Site': 'login_base'}
275 if classname in secondary_keys:
276 secondary_key = secondary_keys[classname]
278 message_verbose('Entering sync on {} ({})'
279 .format(classname, primary_key))
283 # Delete stale objects
284 for peer_object_id, object in objects.items():
285 if peer_object_id not in peer_objects:
286 object.delete(commit=commit_mode)
287 message("{} {} {} deleted"
288 .format(peername, classname, object[primary_key]))
290 total = len(peer_objects)
293 # peer_object_id, peer_object and object are open variables in the loop below...
294 # (local) object might be None if creating a new one
296 if classname != focus_type:
298 return (peer_object_id in focus_ids) \
299 or (object and (primary_key in object)
300 and (object[primary_key] in focus_ids))
302 def message_focus(message):
305 message_verbose("peer_obj : {} [[{}]]".format(peer_object_id, peer_object),
306 header='FOCUS ' + message)
307 # show local object if a match was found
309 message_verbose("local_obj : <<{}>>".format(object),
310 header='FOCUS ' + message)
312 # the function to compare a local object with its candidate peer obj
313 # xxx probably faster when compatibility is False...
314 def equal_fields(object, peer_object, columns):
315 # fast version: must use __eq__() instead of == since
316 # peer_object may be a raw dict instead of a Peer object.
317 if not compatibility:
318 result = object.__eq__(peer_object)
320 message_verbose("fast mode: difference found between {} and {}"
321 .format(object, peer_object))
324 for column in columns:
325 if object[column] != peer_object[column]:
326 message_verbose("difference found in column {}".format(column))
327 message_verbose("our object {}".format(object[column]))
328 message_verbose("remote object {}".format(peer_object[column]))
332 # Add/update new/existing objects
333 for peer_object_id, peer_object in peer_objects.items():
334 peer_object_name = ""
336 peer_object_name = "({})".format(peer_object[secondary_key])
337 message_verbose('{} peer_object_id={} {} ({}/{})'
338 .format(classname, peer_object_id, peer_object_name, count, total))
340 if peer_object_id in synced:
341 message("Warning: {peername} Skipping already added {classname}: {obj}"
342 .format(peername=peername,
343 classname=classname, obj=peer_object))
346 if peer_object_id in objects:
347 # Update existing object
348 object = objects[peer_object_id]
350 # Replace foreign identifier with existing local
351 # identifier temporarily for the purposes of
353 peer_object[primary_key] = object[primary_key]
355 if not equal_fields(object, peer_object, columns):
356 # Only update intrinsic fields
357 object.update(object.db_fields(peer_object))
358 message_focus("DIFFERENCES : updated / syncing")
362 message_focus("UNCHANGED - left intact / not syncing")
366 # Restore foreign identifier
367 peer_object[primary_key] = peer_object_id
371 object = classobj(self.api, peer_object)
372 # Replace foreign identifier with new local identifier
373 del object[primary_key]
374 message_focus("NEW -- created with clean id - syncing")
379 message_verbose("syncing {classname} {id} - commit_mode={mode}"
380 .format(classname=classname,
381 id=peer_object_id, mode=commit_mode))
383 object.sync(commit=commit_mode)
384 except PLCInvalidArgument as err:
385 # XXX Log an event instead of printing to logfile
386 # skip if validation fails
387 message("Warning: {peername} Skipping invalid {classname} ({err})\n{object}"
388 .format(peername=peername, classname=classname,
389 object=peer_object, err=err))
392 synced[peer_object_id] = object
395 message("{peername}: ({count}/{total}) {classname} {primary} {name} {action}"
396 .format(peername=peername,
397 count=count, total=total,
398 classname=classname, primary=object[primary_key],
399 name=peer_object_name, action=action))
401 message_verbose("Exiting sync on {}".format(classname))
405 # over time, we've had issues with a given column being
406 # added on one side and not on the other
407 # this helper function computes the intersection of two list of
409 def intersect(l1, l2):
411 return list(set(l1).intersection(set(l2)))
415 # some fields definitely need to be ignored
417 return list(set(l1).difference(set(l2)))
420 # Synchronize foreign sites
425 message('(1) Dealing with Sites')
427 # Compare only the columns returned by the GetPeerData() call
428 if peer_tables['Sites']:
429 columns = list(peer_tables['Sites'][0].keys())
430 columns = intersect(columns, Site.fields)
434 # Keyed on foreign site_id
435 old_peer_sites = Sites(
436 self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
437 sites_at_peer = dict([(site['site_id'], site)
438 for site in peer_tables['Sites']])
440 # Synchronize new set (still keyed on foreign site_id)
441 peer_sites = sync(old_peer_sites, sites_at_peer, Site,
442 ignore(columns, RefreshPeer.ignore_site_fields))
444 for peer_site_id, site in peer_sites.items():
445 # Bind any newly cached sites to peer
446 if peer_site_id not in old_peer_sites:
447 peer.add_site(site, peer_site_id, commit=commit_mode)
448 site['peer_id'] = peer_id
449 site['peer_site_id'] = peer_site_id
451 timers['site'] = time.time() - start
454 # XXX Synchronize foreign key types
457 message('(2) Dealing with Keys')
459 key_types = KeyTypes(self.api).dict()
462 # Synchronize foreign keys
467 # Compare only the columns returned by the GetPeerData() call
468 if peer_tables['Keys']:
469 columns = list(peer_tables['Keys'][0].keys())
470 columns = intersect(columns, Key.fields)
474 # Keyed on foreign key_id
475 old_peer_keys = Keys(
476 self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
477 keys_at_peer = dict([(key['key_id'], key)
478 for key in peer_tables['Keys']])
480 # Fix up key_type references
481 for peer_key_id, key in list(keys_at_peer.items()):
482 if key['key_type'] not in key_types:
483 # XXX Log an event instead of printing to logfile
484 message("Warning: Skipping invalid {peername} key {key}"
485 .format(peername=peername, key=key))
486 del keys_at_peer[peer_key_id]
489 # Synchronize new set (still keyed on foreign key_id)
490 peer_keys = sync(old_peer_keys, keys_at_peer, Key,
491 ignore(columns, RefreshPeer.ignore_key_fields))
492 for peer_key_id, key in peer_keys.items():
493 # Bind any newly cached keys to peer
494 if peer_key_id not in old_peer_keys:
495 peer.add_key(key, peer_key_id, commit=commit_mode)
496 key['peer_id'] = peer_id
497 key['peer_key_id'] = peer_key_id
499 timers['keys'] = time.time() - start
502 # Synchronize foreign users
507 message('(3) Dealing with Persons')
509 # Compare only the columns returned by the GetPeerData() call
510 if peer_tables['Persons']:
511 columns = list(peer_tables['Persons'][0].keys())
512 columns = intersect(columns, Person.fields)
516 # Keyed on foreign person_id
517 old_peer_persons = Persons(
518 self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
520 # artificially attach the persons returned by GetPeerData to the new peer
521 # this is because validate_email needs peer_id to be correct when
522 # checking for duplicates
523 for person in peer_tables['Persons']:
524 person['peer_id'] = peer_id
525 persons_at_peer = dict([(peer_person['person_id'], peer_person)
526 for peer_person in peer_tables['Persons']])
528 # XXX Do we care about membership in foreign site(s)?
530 # Synchronize new set (still keyed on foreign person_id)
531 peer_persons = sync(old_peer_persons, persons_at_peer, Person,
532 ignore(columns, RefreshPeer.ignore_person_fields))
534 # transcoder : retrieve a local key_id from a peer_key_id
535 key_transcoder = dict([(key['key_id'], peer_key_id)
536 for peer_key_id, key in peer_keys.items()])
538 for peer_person_id, person in peer_persons.items():
539 # Bind any newly cached users to peer
540 if peer_person_id not in old_peer_persons:
541 peer.add_person(person, peer_person_id, commit=commit_mode)
542 person['peer_id'] = peer_id
543 person['peer_person_id'] = peer_person_id
544 person['key_ids'] = []
546 # User as viewed by peer
547 peer_person = persons_at_peer[peer_person_id]
549 # Foreign keys currently belonging to the user
550 old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids']
551 if key_transcoder[key_id] in peer_keys]
553 # Foreign keys that should belong to the user
554 # this is basically peer_person['key_ids'], we just check it makes sense
555 # (e.g. we might have failed importing it)
556 person_key_ids = [key_id for key_id in peer_person[
557 'key_ids'] if key_id in peer_keys]
559 # Remove stale keys from user
560 for key_id in (set(old_person_key_ids) - set(person_key_ids)):
561 person.remove_key(peer_keys[key_id], commit=commit_mode)
562 message("{peername} Key {key_id} removed from person {email}"
563 .format(peername=peername,
564 key_id=key_id, email=person['email']))
566 # Add new keys to user
567 for key_id in (set(person_key_ids) - set(old_person_key_ids)):
568 #message("before add_key, passing person={}".format(person))
569 #message("before add_key, passing key={}".format(peer_keys[key_id]))
570 person.add_key(peer_keys[key_id], commit=commit_mode)
571 message("{} Key {} added into person {}"
572 .format(peername, key_id, person['email']))
574 timers['persons'] = time.time() - start
577 # XXX Synchronize foreign boot states
580 boot_states = BootStates(self.api).dict()
583 # Synchronize foreign nodes
588 # NOTE: we do import disabled sites
589 message('(4) Dealing with Nodes (1)')
591 # Compare only the columns returned by the GetPeerData() call
592 if peer_tables['Nodes']:
593 columns = list(peer_tables['Nodes'][0].keys())
594 columns = intersect(columns, Node.fields)
596 columns = Node.fields
598 # Keyed on foreign node_id
599 old_peer_nodes = Nodes(
600 self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
601 nodes_at_peer = dict([(node['node_id'], node)
602 for node in peer_tables['Nodes']])
604 # Fix up site_id and boot_states references
605 for peer_node_id, node in list(nodes_at_peer.items()):
607 if node['site_id'] not in peer_sites:
608 errors.append("invalid (or disabled) site {}".format(node['site_id']))
609 if node['boot_state'] not in boot_states:
610 errors.append("invalid boot state {}".format(node['boot_state']))
612 # XXX Log an event instead of printing to logfile
613 message("Warning: Skipping invalid {peername} node {hostname} - {errors}"
614 .format(peername=peername,
615 hostname=node['hostname'], errors=", ".join(errors)))
616 del nodes_at_peer[peer_node_id]
619 node['site_id'] = peer_sites[node['site_id']]['site_id']
621 # Synchronize new set
622 peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node,
623 ignore(columns, RefreshPeer.ignore_node_fields))
625 for peer_node_id, node in peer_nodes.items():
626 # Bind any newly cached foreign nodes to peer
627 if peer_node_id not in old_peer_nodes:
628 peer.add_node(node, peer_node_id, commit=commit_mode)
629 node['peer_id'] = peer_id
630 node['peer_node_id'] = peer_node_id
632 timers['nodes'] = time.time() - start
635 # Synchronize local nodes
639 message('(5) Dealing with Nodes (2)')
641 # Keyed on local node_id
642 local_nodes = Nodes(self.api).dict()
644 for node in peer_tables['PeerNodes']:
645 # Foreign identifier for our node as maintained by peer
646 peer_node_id = node['node_id']
647 # Local identifier for our node as cached by peer
648 node_id = node['peer_node_id']
649 if node_id in local_nodes:
650 # Still a valid local node, add it to the synchronized
651 # set of local node objects keyed on foreign node_id.
652 peer_nodes[peer_node_id] = local_nodes[node_id]
654 timers['local_nodes'] = time.time() - start
657 # XXX Synchronize foreign slice instantiation states
660 slice_instantiations = SliceInstantiations(self.api).dict()
663 # Synchronize foreign slices
668 message('(6) Dealing with Slices')
670 # Compare only the columns returned by the GetPeerData() call
671 if peer_tables['Slices']:
672 columns = list(peer_tables['Slices'][0].keys())
673 columns = intersect(columns, Slice.fields)
677 # Keyed on foreign slice_id
678 old_peer_slices = Slices(
679 self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
680 slices_at_peer = dict([(slice['slice_id'], slice)
681 for slice in peer_tables['Slices']])
683 # Fix up site_id, instantiation, and creator_person_id references
684 for peer_slice_id, slice in list(slices_at_peer.items()):
686 if slice['site_id'] not in peer_sites:
687 errors.append("invalid site {}".format(slice['site_id']))
688 if slice['instantiation'] not in slice_instantiations:
689 errors.append("invalid instantiation {}"
690 .format(slice['instantiation']))
691 if slice['creator_person_id'] not in peer_persons:
693 slice['creator_person_id'] = None
695 slice['creator_person_id'] = peer_persons[
696 slice['creator_person_id']]['person_id']
698 message("Warning: Skipping invalid {peername} slice {slice} : {errors}"
699 .format(peername=peername,
700 slice=slice, errors=", ".join(errors)))
701 del slices_at_peer[peer_slice_id]
704 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
706 # Synchronize new set
707 peer_slices = sync(old_peer_slices, slices_at_peer, Slice,
708 ignore(columns, RefreshPeer.ignore_slice_fields))
710 message('(7) Dealing with Nodes in Slices')
711 # transcoder : retrieve a local node_id from a peer_node_id
712 node_transcoder = dict([(node['node_id'], peer_node_id)
713 for peer_node_id, node in peer_nodes.items()])
714 person_transcoder = dict([(person['person_id'], peer_person_id)
715 for peer_person_id, person in peer_persons.items()])
717 for peer_slice_id, slice in peer_slices.items():
718 # Bind any newly cached foreign slices to peer
719 if peer_slice_id not in old_peer_slices:
720 peer.add_slice(slice, peer_slice_id, commit=commit_mode)
721 slice['peer_id'] = peer_id
722 slice['peer_slice_id'] = peer_slice_id
723 slice['node_ids'] = []
724 slice['person_ids'] = []
726 # Slice as viewed by peer
727 peer_slice = slices_at_peer[peer_slice_id]
729 # Nodes that are currently part of the slice
730 old_slice_node_ids = [node_transcoder[node_id] for node_id in slice['node_ids']
731 if node_id in node_transcoder and node_transcoder[node_id] in peer_nodes]
733 # Nodes that should be part of the slice
734 slice_node_ids = [node_id for node_id in peer_slice['node_ids']
735 if node_id in peer_nodes]
737 # Remove stale nodes from slice
738 for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
739 slice.remove_node(peer_nodes[node_id], commit=commit_mode)
740 message("{peername} node {hostname} (id {node_id}) removed from slice {slicename} (id {slice_id})"
741 .format(peername=peername,
742 hostname=peer_nodes[node_id]['hostname'], node_id=peer_nodes[node_id]['node_id'],
743 slicename=slice['name'], slice_id=slice['slice_id']))
745 # Add new nodes to slice
746 for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
747 slice.add_node(peer_nodes[node_id], commit=commit_mode)
748 message("{peername} node {hostname} (id {node_id}) added into slice {slicename} (id {slice_id})"
749 .format(peername=peername,
750 hostname=peer_nodes[node_id]['hostname'], node_id=peer_nodes[node_id]['node_id'],
751 slicename=slice['name'], slice_id=slice['slice_id']))
753 if slice['slice_id'] == 225:
756 # N.B.: Local nodes that may have been added to the slice
757 # by hand, are removed. In other words, don't do this.
759 # Foreign users that are currently part of the slice
760 # old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
761 # if person_transcoder[person_id] in peer_persons]
762 # An issue occurred with a user who registered on both sites (same email)
763 # So the remote person could not get cached locally
764 # The one-line map/filter style is nicer but ineffective here
765 old_slice_person_ids = []
766 for person_id in slice['person_ids']:
767 if person_id not in person_transcoder:
768 message('WARNING : person_id {person_id} in {slicename} not transcodable (1) - skipped'
769 .format(person_id=person_id, slicename=slice['name']))
770 elif person_transcoder[person_id] not in peer_persons:
771 message('WARNING : person_id {person_id} in {slicename} not transcodable (2) - skipped'
772 .format(person_id=person_id, slicename=slice['name']))
774 old_slice_person_ids += [person_transcoder[person_id]]
776 # Foreign users that should be part of the slice
777 slice_person_ids = [person_id for person_id in peer_slice[
778 'person_ids'] if person_id in peer_persons]
780 # Remove stale users from slice
781 for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
783 peer_persons[person_id], commit=commit_mode)
784 message("{peername} user {email} removed from slice {slicename}"
785 .format(peername=peername,
786 email=peer_persons[person_id]['email'],
787 slicename=slice['name']))
789 # Add new users to slice
790 for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
791 slice.add_person(peer_persons[person_id], commit=commit_mode)
792 message("{peername} user {email} added into slice {slicename}"
793 .format(peername=peername,
794 email=peer_persons[person_id]['email'],
795 slicename=slice['name']))
797 # N.B.: Local users that may have been added to the slice
798 # by hand, are not touched.
800 timers['slices'] = time.time() - start
807 message('(8) Dealing with Persons in Sites')
809 for peer_site_id, site in peer_sites.items():
810 # Site as viewed by peer
811 peer_site = sites_at_peer[peer_site_id]
813 # Persons that are currently part of the site
814 old_site_person_ids = [person_transcoder[person_id] for person_id in site['person_ids']
815 if person_id in person_transcoder and person_transcoder[person_id] in peer_persons]
817 # Perons that should be part of the site
818 site_person_ids = [person_id for person_id in peer_site[
819 'person_ids'] if person_id in peer_persons]
821 # Remove stale persons from site
822 for person_id in (set(old_site_person_ids) - set(site_person_ids)):
823 site.remove_person(peer_persons[person_id], commit=commit_mode)
824 message("{peername} person {email} removed from site {login_base}"
825 .format(peername=peername,
826 email=peer_persons[person_id]['email'],
827 login_base=site['login_base']))
829 # Add new persons to site
830 for person_id in (set(site_person_ids) - set(old_site_person_ids)):
831 site.add_person(peer_persons[person_id], commit=commit_mode)
832 message("{peername} person {email} added into site {login_base}"
833 .format(peername=peername,
834 email=peer_persons[person_id]['email'],
835 login_base=site['login_base']))
837 timers['sites-persons'] = time.time() - start
844 message('(9) Dealing with Roles for Persons')
846 roles = Roles(self.api)
847 roles_dict = dict([(role['role_id'], role) for role in roles])
848 for peer_person_id, person in peer_persons.items():
849 # Person as viewed by peer
850 peer_person = persons_at_peer[peer_person_id]
852 # Roles that are currently attributed for the person
853 old_person_role_ids = [role_id for role_id in person['role_ids']]
855 # Roles that should be attributed to the person
856 person_role_ids = [role_id for role_id in peer_person['role_ids']]
859 for role_id in (set(old_person_role_ids) - set(person_role_ids)):
860 person.remove_role(roles_dict[role_id], commit=commit_mode)
861 message("{peername} role {rolename} removed from person {email}"
862 .format(peername=peername,
863 rolename=roles_dict[role_id]['name'],
864 email=person['email']))
866 # Add new roles to person
867 for role_id in (set(person_role_ids) - set(old_person_role_ids)):
868 person.add_role(roles_dict[role_id], commit=commit_mode)
869 message("{peername} role {rolename} added from person {email}"
870 .format(peername=peername,
871 rolename=roles_dict[role_id]['name'],
872 email=person['email']))
874 timers['persons-roles'] = time.time() - start
876 # Update peer itself and commit
877 peer.sync(commit=True)