Setting tag plcapi-5.4-2
[plcapi.git] / PLC / Methods / RefreshPeer.py
1 #
2 # Thierry Parmentelat - INRIA
3 #
4 import os
5 import sys
6 import fcntl
7 import time
8
9 from PLC.Logger import logger
10 from PLC.Faults import *
11 from PLC.Method import Method
12 from PLC.Parameter import Parameter, Mixed
13 from PLC.Auth import Auth
14
15 from PLC.Peers import Peer, Peers
16 from PLC.Sites import Site, Sites
17 from PLC.Persons import Person, Persons
18 from PLC.KeyTypes import KeyType, KeyTypes
19 from PLC.Keys import Key, Keys
20 from PLC.BootStates import BootState, BootStates
21 from PLC.Nodes import Node, Nodes
22 from PLC.SliceInstantiations import SliceInstantiations
23 from PLC.Slices import Slice, Slices
24 from PLC.Roles import Role, Roles
25
26 # settings
27 # initial version was doing only one final commit
28 # * set commit_mode to False to get that behaviour
29 # * set comit_mode to True to get everything synced at once
30 # the issue with the 'one-commit-at-the-end' approach is
31 # that the db gets basically totally locked during too long
32 # causing various issues/crashes in the rest of the system
33 commit_mode = True
34
35 # turn this to False only if both ends have the same db schema
36 # compatibility mode is a bit slower but probably safer on the long run
37 compatibility = True
38
39 # debugging
40 # for verbose output
41 verbose = False
42 use_cache = None
43 # for debugging specific entries - display detailed info on selected objs
44 focus_type = None  # set to e.g. 'Person'
45 # set to a list of ids (e.g. person_ids) - remote or local ids should work
46 focus_ids = []
47 # DEBUGGING SETTINGS example
48 # set to a filename for using cached data when debugging
49 # WARNING: does *not* actually connect to the peer in this case
50 # or more precisely, connect only if the file is not found
51 # i.e. the first time
52 # use_cache = "/var/log/peers/getpeerdata.json"
53 # verbose = True
54 # focus_type = 'Person'
55 # focus_ids = [621, 1088]
56
57
58 ########## helpers
59
60 def message(to_print=None, verbose_only=False):
61     if verbose_only and not verbose:
62         return
63     logger.info(to_print)
64
65
66 def message_verbose(to_print=None, header='VERBOSE'):
67     message("{}> {}".format(header, to_print), verbose_only=True)
68
69
70 # to avoid several instances running at the same time
71 class FileLock:
72     """
73     Lock/Unlock file
74     """
75
76     def __init__(self, file_path, expire=60 * 60 * 2):
77         self.expire = expire
78         self.fpath = file_path
79         self.fd = None
80
81     def lock(self):
82         if os.path.exists(self.fpath):
83             if (time.time() - os.stat(self.fpath).st_ctime) > self.expire:
84                 try:
85                     os.unlink(self.fpath)
86                 except Exception, e:
87                     message('FileLock.lock({}) : {}'.format(self.fpath, e))
88                     return False
89         try:
90             self.fd = open(self.fpath, 'w')
91             fcntl.flock(self.fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
92         except IOError, e:
93             message('FileLock.lock({}) : {}'.format(self.fpath, e))
94             return False
95         return True
96
97     def unlock(self):
98         try:
99             fcntl.flock(self.fd, fcntl.LOCK_UN | fcntl.LOCK_NB)
100             self.fd.close()
101         except IOError, e:
102             message('FileLock.unlock({}) : {}'.format(self.fpath, e))
103
104
105 class RefreshPeer(Method):
106     """
107     Fetches site, node, slice, person and key data from the specified peer
108     and caches it locally; also deletes stale entries.
109     Upon successful completion, returns a dict reporting various timers.
110     Faults otherwise.
111     """
112
113     roles = ['admin']
114
115     accepts = [
116         Auth(),
117         Mixed(Peer.fields['peer_id'],
118               Peer.fields['peername']),
119     ]
120
121     returns = Parameter(dict, "various timers")
122
123     # the columns ignored in the comparison - it is safe to ignore:
124     # (*) the primary key - obviously this is not expected to match
125     # (*) peer_id and the transcode key, likewise
126     # (*) all identifiers that refer to other objects in the db
127     #     like e.g. Person.site_ids since this is managed separately
128     #     and does not make sense any way
129     # (*) we also ignore things like date_created and last_updated
130     #     that refer to local db creation/update
131     ignore_site_fields = [
132         'site_id', 'peer_id', 'peer_site_id',
133         'address_ids', 'node_ids', 'person_ids', 'pcu_ids', 'slice_ids', 'site_tag_ids',
134         'date_created', 'last_updated',
135     ]
136     ignore_key_fields = [
137         'key_id', 'peer_id', 'peer_key_id',
138         'person_id',
139     ]
140     ignore_person_fields = [
141         'person_id', 'peer_id', 'peer_person_id',
142         'key_ids', 'slice_ids', 'person_tag_ids', 'role_ids', 'roles', 'site_ids',
143         'date_created', 'last_updated',
144     ]
145     ignore_node_fields = [
146         'node_id', 'peer_id', 'peer_node_id', 
147         'node_tag_ids', 'interface_ids', 'slice_ids', 'nodegroup_ids', 'pcu_ids', 'ports',
148         'date_created', 'last_updated',
149         # somehow those won't print in the ple db
150         'last_download', 'last_contact', 'last_pcu_reboot', 'last_boot',
151         'last_time_spent_offline', 'last_time_spent_online', 'last_pcu_confirmation',
152     ]
153     ignore_slice_fields = [
154         'slice_id', 'peer_id', 'peer_slice_id',
155         'person_ids', 'slice_tag_ids', 'node_ids',
156         'created',
157     ]
158
159     def call(self, auth, peer_id_or_peername):
160         ret_val = None
161         peername = Peers(self.api, [peer_id_or_peername], [
162                          'peername'])[0]['peername']
163         file_lock = FileLock("/tmp/refresh-peer-{peername}.lock"
164                              .format(peername=peername))
165         if not file_lock.lock():
166             raise Exception, "Another instance of RefreshPeer is running."
167         try:
168             ret_val = self.real_call(auth, peer_id_or_peername)
169         except Exception, e:
170             file_lock.unlock()
171             logger.exception("RefreshPeer caught exception - BEG")
172             message("RefreshPeer caught exception - END")
173             raise Exception, e
174         file_lock.unlock()
175         return ret_val
176
177     def real_call(self, auth, peer_id_or_peername):
178         # Get peer
179         peers = Peers(self.api, [peer_id_or_peername])
180         if not peers:
181             raise PLCInvalidArgument, "No such peer '{}'".format(unicode(peer_id_or_peername))
182         peer = peers[0]
183         peer_id = peer['peer_id']
184         peername = peer['peername']
185
186         # Connect to peer API
187         peer.connect()
188
189         timers = {}
190
191         # Get peer data
192         start = time.time()
193         message('RefreshPeer starting up (commit_mode={})'.format(commit_mode))
194         if not use_cache:
195             message('Issuing GetPeerData')
196             peer_tables = peer.GetPeerData()
197         else:
198             import json
199             if os.path.isfile(use_cache):
200                 message("use_cache: WARNING: using cached getpeerdata")
201                 with open(use_cache) as storage:
202                     peer_tables = json.load(storage)
203             else:
204                 message("use_cache: issuing GetPeerData")
205                 peer_tables = peer.GetPeerData()
206                 message("use_cache: saving in cache {}".format(use_cache))
207                 with open(use_cache, 'w') as storage:
208                     json.dump(peer_tables, storage)
209
210         # additions in June 2017
211
212         # remove entries not marked as enabled
213         # actually the 'enabled' flag is present on 'Sites' and 'Persons'
214         # however we accept disabled slices as
215         # (*) they don't come and go too often
216         # (*) they may contain vlid nodes, that we would then lose
217         #     if we were to discard those sites
218         # so bottom line, we filter out only disabled persons
219         for cls in ('Persons',) :
220             peer_tables[cls] = [
221                 obj for obj in peer_tables[cls]  if obj['enabled']
222             ]
223
224         # somehow we can see GetPeerData from PLC that contains references
225         # to nodes that are not exposed themselves
226         # which suggests some inconsistency on their end
227         # anyway, it's safer to sanitize the dataset to avoid corruption
228         exposed_peer_node_ids = { n['node_id'] for n in peer_tables['Nodes']}
229         for slice in peer_tables['Slices']:
230             before = len(slice['node_ids'])
231             slice['node_ids'] = [x for x in slice['node_ids'] if x in exposed_peer_node_ids]
232             after = len(slice['node_ids'])
233             if after != before:
234                 message("{peername} slice {slicename} got sanitized - {diff} node entries removed out of {before}"
235                         .format(peername=peername, slicename=slice['name'],
236                                 diff=before-after, before=before))
237
238         # end of additions
239
240         # for smooth federation with 4.2 - ignore fields that are useless
241         # anyway, and rewrite boot_state
242         boot_state_rewrite = {'dbg': 'safeboot', 'diag': 'safeboot', 'disable': 'disabled',
243                               'inst': 'reinstall', 'rins': 'reinstall', 'new': 'reinstall', 'rcnf': 'reinstall'}
244         for node in peer_tables['Nodes']:
245             for key in ['nodenetwork_ids', 'dummybox_id']:
246                 if key in node:
247                     del node[key]
248             if node['boot_state'] in boot_state_rewrite:
249                 node['boot_state'] = boot_state_rewrite[node['boot_state']]
250         for slice in peer_tables['Slices']:
251             for key in ['slice_attribute_ids']:
252                 if key in slice:
253                     del slice[key]
254         timers['transport'] = time.time() - start - peer_tables['db_time']
255         timers['peer_db'] = peer_tables['db_time']
256         message_verbose('GetPeerData returned -> db={} transport={}'
257                         .format(timers['peer_db'], timers['transport']))
258
259         def sync(objects, peer_objects, classobj, columns):
260             """
261             Synchronizes two dictionaries of objects. objects should
262             be a dictionary of local objects keyed on their foreign
263             identifiers. peer_objects should be a dictionary of
264             foreign objects keyed on their local (i.e., foreign to us)
265             identifiers. Returns a final dictionary of local objects
266             keyed on their foreign identifiers.
267             """
268
269             classname = classobj(self.api).__class__.__name__
270             primary_key = getattr(classobj, 'primary_key')
271             # display all peer objects of these types while looping
272             secondary_keys = {'Node': 'hostname', 'Slice': 'name',
273                               'Person': 'email', 'Site': 'login_base'}
274             secondary_key = None
275             if classname in secondary_keys:
276                 secondary_key = secondary_keys[classname]
277
278             message_verbose('Entering sync on {} ({})'
279                             .format(classname, primary_key))
280
281             synced = {}
282
283             # Delete stale objects
284             for peer_object_id, object in objects.iteritems():
285                 if peer_object_id not in peer_objects:
286                     object.delete(commit=commit_mode)
287                     message("{} {} {} deleted"
288                             .format(peername, classname, object[primary_key]))
289
290             total = len(peer_objects)
291             count = 1
292
293             # peer_object_id, peer_object and object are open variables in the loop below...
294             # (local) object might be None if creating a new one
295             def in_focus():
296                 if classname != focus_type:
297                     return False
298                 return (peer_object_id in focus_ids) \
299                     or (object and (primary_key in object)
300                         and (object[primary_key] in focus_ids))
301
302             def message_focus(message):
303                 if in_focus():
304                     # always show remote
305                     message_verbose("peer_obj : {} [[{}]]".format(peer_object_id, peer_object),
306                                     header='FOCUS ' + message)
307                     # show local object if a match was found
308                     if object:
309                         message_verbose("local_obj : <<{}>>".format(object),
310                                         header='FOCUS ' + message)
311
312             # the function to compare a local object with its candidate peer obj
313             # xxx probably faster when compatibility is False...
314             def equal_fields(object, peer_object, columns):
315                 # fast version: must use __eq__() instead of == since
316                 # peer_object may be a raw dict instead of a Peer object.
317                 if not compatibility:
318                     result = object.__eq__(peer_object)
319                     if not result:
320                         message_verbose("fast mode: difference found between {} and {}"
321                                         .format(object, peer_object))
322                     return result
323                 else:
324                     for column in columns:
325                         if object[column] != peer_object[column]:
326                             message_verbose("difference found in column {}".format(column))
327                             message_verbose("our object {}".format(object[column]))
328                             message_verbose("remote object {}".format(peer_object[column]))
329                             return False
330                     return True
331
332             # Add/update new/existing objects
333             for peer_object_id, peer_object in peer_objects.iteritems():
334                 peer_object_name = ""
335                 if secondary_key:
336                     peer_object_name = "({})".format(peer_object[secondary_key])
337                 message_verbose('{} peer_object_id={} {} ({}/{})'
338                                 .format(classname, peer_object_id, peer_object_name, count, total))
339                 count += 1
340                 if peer_object_id in synced:
341                     message("Warning: {peername} Skipping already added {classname}: {obj}"
342                             .format(peername=peername,
343                                     classname=classname, obj=peer_object))
344                     continue
345
346                 if peer_object_id in objects:
347                     # Update existing object
348                     object = objects[peer_object_id]
349
350                     # Replace foreign identifier with existing local
351                     # identifier temporarily for the purposes of
352                     # comparison.
353                     peer_object[primary_key] = object[primary_key]
354
355                     if not equal_fields(object, peer_object, columns):
356                         # Only update intrinsic fields
357                         object.update(object.db_fields(peer_object))
358                         message_focus("DIFFERENCES : updated / syncing")
359                         sync = True
360                         action = "changed"
361                     else:
362                         message_focus("UNCHANGED - left intact / not syncing")
363                         sync = False
364                         action = None
365
366                     # Restore foreign identifier
367                     peer_object[primary_key] = peer_object_id
368                 else:
369                     object = None
370                     # Add new object
371                     object = classobj(self.api, peer_object)
372                     # Replace foreign identifier with new local identifier
373                     del object[primary_key]
374                     message_focus("NEW -- created with clean id - syncing")
375                     sync = True
376                     action = "added"
377
378                 if sync:
379                     message_verbose("syncing {classname} {id} - commit_mode={mode}"
380                                     .format(classname=classname,
381                                             id=peer_object_id, mode=commit_mode))
382                     try:
383                         object.sync(commit=commit_mode)
384                     except PLCInvalidArgument, err:
385                         # XXX Log an event instead of printing to logfile
386                         # skip if validation fails
387                         message("Warning: {peername} Skipping invalid {classname} ({err})\n{object}"
388                                 .format(peername=peername, classname=classname,
389                                         object=peer_object, err=err))
390                         continue
391
392                 synced[peer_object_id] = object
393
394                 if action:
395                     message("{peername}: ({count}/{total}) {classname} {primary} {name} {action}"
396                             .format(peername=peername,
397                                     count=count, total=total,
398                                     classname=classname, primary=object[primary_key],
399                                     name=peer_object_name, action=action))
400
401             message_verbose("Exiting sync on {}".format(classname))
402
403             return synced
404
405         # over time, we've had issues with a given column being
406         # added on one side and not on the other
407         # this helper function computes the intersection of two list of
408         # fields/columns
409         def intersect(l1, l2):
410             if compatibility:
411                 return list(set(l1).intersection(set(l2)))
412             else:
413                 return l1
414
415         # some fields definitely need to be ignored
416         def ignore(l1, l2):
417             return list(set(l1).difference(set(l2)))
418
419         #
420         # Synchronize foreign sites
421         #
422
423         start = time.time()
424
425         message('(1) Dealing with Sites')
426
427         # Compare only the columns returned by the GetPeerData() call
428         if peer_tables['Sites']:
429             columns = peer_tables['Sites'][0].keys()
430             columns = intersect(columns, Site.fields)
431         else:
432             columns = None
433
434         # Keyed on foreign site_id
435         old_peer_sites = Sites(
436             self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
437         sites_at_peer = dict([(site['site_id'], site)
438                               for site in peer_tables['Sites']])
439
440         # Synchronize new set (still keyed on foreign site_id)
441         peer_sites = sync(old_peer_sites, sites_at_peer, Site,
442                           ignore(columns, RefreshPeer.ignore_site_fields))
443
444         for peer_site_id, site in peer_sites.iteritems():
445             # Bind any newly cached sites to peer
446             if peer_site_id not in old_peer_sites:
447                 peer.add_site(site, peer_site_id, commit=commit_mode)
448                 site['peer_id'] = peer_id
449                 site['peer_site_id'] = peer_site_id
450
451         timers['site'] = time.time() - start
452
453         #
454         # XXX Synchronize foreign key types
455         #
456
457         message('(2) Dealing with Keys')
458
459         key_types = KeyTypes(self.api).dict()
460
461         #
462         # Synchronize foreign keys
463         #
464
465         start = time.time()
466
467         # Compare only the columns returned by the GetPeerData() call
468         if peer_tables['Keys']:
469             columns = peer_tables['Keys'][0].keys()
470             columns = intersect(columns, Key.fields)
471         else:
472             columns = None
473
474         # Keyed on foreign key_id
475         old_peer_keys = Keys(
476             self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
477         keys_at_peer = dict([(key['key_id'], key)
478                              for key in peer_tables['Keys']])
479
480         # Fix up key_type references
481         for peer_key_id, key in keys_at_peer.items():
482             if key['key_type'] not in key_types:
483                 # XXX Log an event instead of printing to logfile
484                 message("Warning: Skipping invalid {peername} key {key}"
485                         .format(peername=peername, key=key))
486                 del keys_at_peer[peer_key_id]
487                 continue
488
489         # Synchronize new set (still keyed on foreign key_id)
490         peer_keys = sync(old_peer_keys, keys_at_peer, Key,
491                          ignore(columns, RefreshPeer.ignore_key_fields))
492         for peer_key_id, key in peer_keys.iteritems():
493             # Bind any newly cached keys to peer
494             if peer_key_id not in old_peer_keys:
495                 peer.add_key(key, peer_key_id, commit=commit_mode)
496                 key['peer_id'] = peer_id
497                 key['peer_key_id'] = peer_key_id
498
499         timers['keys'] = time.time() - start
500
501         #
502         # Synchronize foreign users
503         #
504
505         start = time.time()
506
507         message('(3) Dealing with Persons')
508
509         # Compare only the columns returned by the GetPeerData() call
510         if peer_tables['Persons']:
511             columns = peer_tables['Persons'][0].keys()
512             columns = intersect(columns, Person.fields)
513         else:
514             columns = None
515
516         # Keyed on foreign person_id
517         old_peer_persons = Persons(
518             self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
519
520         # artificially attach the persons returned by GetPeerData to the new peer
521         # this is because validate_email needs peer_id to be correct when
522         # checking for duplicates
523         for person in peer_tables['Persons']:
524             person['peer_id'] = peer_id
525         persons_at_peer = dict([(peer_person['person_id'], peer_person)
526                                 for peer_person in peer_tables['Persons']])
527
528         # XXX Do we care about membership in foreign site(s)?
529
530         # Synchronize new set (still keyed on foreign person_id)
531         peer_persons = sync(old_peer_persons, persons_at_peer, Person,
532                             ignore(columns, RefreshPeer.ignore_person_fields))
533
534         # transcoder : retrieve a local key_id from a peer_key_id
535         key_transcoder = dict([(key['key_id'], peer_key_id)
536                                for peer_key_id, key in peer_keys.iteritems()])
537
538         for peer_person_id, person in peer_persons.iteritems():
539             # Bind any newly cached users to peer
540             if peer_person_id not in old_peer_persons:
541                 peer.add_person(person, peer_person_id, commit=commit_mode)
542                 person['peer_id'] = peer_id
543                 person['peer_person_id'] = peer_person_id
544                 person['key_ids'] = []
545
546             # User as viewed by peer
547             peer_person = persons_at_peer[peer_person_id]
548
549             # Foreign keys currently belonging to the user
550             old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids']
551                                   if key_transcoder[key_id] in peer_keys]
552
553             # Foreign keys that should belong to the user
554             # this is basically peer_person['key_ids'], we just check it makes sense
555             # (e.g. we might have failed importing it)
556             person_key_ids = [key_id for key_id in peer_person[
557                 'key_ids'] if key_id in peer_keys]
558
559             # Remove stale keys from user
560             for key_id in (set(old_person_key_ids) - set(person_key_ids)):
561                 person.remove_key(peer_keys[key_id], commit=commit_mode)
562                 message("{peername} Key {key_id} removed from person {email}"
563                         .format(peername=peername,
564                                 key_id=key_id, email=person['email']))
565
566             # Add new keys to user
567             for key_id in (set(person_key_ids) - set(old_person_key_ids)):
568                 #message("before add_key, passing person={}".format(person))
569                 #message("before add_key, passing key={}".format(peer_keys[key_id]))
570                 person.add_key(peer_keys[key_id], commit=commit_mode)
571                 message("{} Key {} added into person {}"
572                         .format(peername, key_id, person['email']))
573
574         timers['persons'] = time.time() - start
575
576         #
577         # XXX Synchronize foreign boot states
578         #
579
580         boot_states = BootStates(self.api).dict()
581
582         #
583         # Synchronize foreign nodes
584         #
585
586         start = time.time()
587
588         # NOTE: we do import disabled sites
589         message('(4) Dealing with Nodes (1)')
590
591         # Compare only the columns returned by the GetPeerData() call
592         if peer_tables['Nodes']:
593             columns = peer_tables['Nodes'][0].keys()
594             columns = intersect(columns, Node.fields)
595         else:
596             columns = Node.fields
597
598         # Keyed on foreign node_id
599         old_peer_nodes = Nodes(
600             self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
601         nodes_at_peer = dict([(node['node_id'], node)
602                               for node in peer_tables['Nodes']])
603
604         # Fix up site_id and boot_states references
605         for peer_node_id, node in nodes_at_peer.items():
606             errors = []
607             if node['site_id'] not in peer_sites:
608                 errors.append("invalid (or disabled) site {}".format(node['site_id']))
609             if node['boot_state'] not in boot_states:
610                 errors.append("invalid boot state {}".format(node['boot_state']))
611             if errors:
612                 # XXX Log an event instead of printing to logfile
613                 message("Warning: Skipping invalid {peername} node {hostname} - {errors}"
614                         .format(peername=peername,
615                                 hostname=node['hostname'], errors=", ".join(errors)))
616                 del nodes_at_peer[peer_node_id]
617                 continue
618             else:
619                 node['site_id'] = peer_sites[node['site_id']]['site_id']
620
621         # Synchronize new set
622         peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node,
623                           ignore(columns, RefreshPeer.ignore_node_fields))
624
625         for peer_node_id, node in peer_nodes.iteritems():
626             # Bind any newly cached foreign nodes to peer
627             if peer_node_id not in old_peer_nodes:
628                 peer.add_node(node, peer_node_id, commit=commit_mode)
629                 node['peer_id'] = peer_id
630                 node['peer_node_id'] = peer_node_id
631
632         timers['nodes'] = time.time() - start
633
634         #
635         # Synchronize local nodes
636         #
637
638         start = time.time()
639         message('(5) Dealing with Nodes (2)')
640
641         # Keyed on local node_id
642         local_nodes = Nodes(self.api).dict()
643
644         for node in peer_tables['PeerNodes']:
645             # Foreign identifier for our node as maintained by peer
646             peer_node_id = node['node_id']
647             # Local identifier for our node as cached by peer
648             node_id = node['peer_node_id']
649             if node_id in local_nodes:
650                 # Still a valid local node, add it to the synchronized
651                 # set of local node objects keyed on foreign node_id.
652                 peer_nodes[peer_node_id] = local_nodes[node_id]
653
654         timers['local_nodes'] = time.time() - start
655
656         #
657         # XXX Synchronize foreign slice instantiation states
658         #
659
660         slice_instantiations = SliceInstantiations(self.api).dict()
661
662         #
663         # Synchronize foreign slices
664         #
665
666         start = time.time()
667
668         message('(6) Dealing with Slices')
669
670         # Compare only the columns returned by the GetPeerData() call
671         if peer_tables['Slices']:
672             columns = peer_tables['Slices'][0].keys()
673             columns = intersect(columns, Slice.fields)
674         else:
675             columns = None
676
677         # Keyed on foreign slice_id
678         old_peer_slices = Slices(
679             self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
680         slices_at_peer = dict([(slice['slice_id'], slice)
681                                for slice in peer_tables['Slices']])
682
683         # Fix up site_id, instantiation, and creator_person_id references
684         for peer_slice_id, slice in slices_at_peer.items():
685             errors = []
686             if slice['site_id'] not in peer_sites:
687                 errors.append("invalid site {}".format(slice['site_id']))
688             if slice['instantiation'] not in slice_instantiations:
689                 errors.append("invalid instantiation {}"
690                               .format(slice['instantiation']))
691             if slice['creator_person_id'] not in peer_persons:
692                 # Just NULL it out
693                 slice['creator_person_id'] = None
694             else:
695                 slice['creator_person_id'] = peer_persons[
696                     slice['creator_person_id']]['person_id']
697             if errors:
698                 message("Warning: Skipping invalid {peername} slice {slice} : {errors}"
699                         .format(peername=peername,
700                                 slice=slice, errors=", ".join(errors)))
701                 del slices_at_peer[peer_slice_id]
702                 continue
703             else:
704                 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
705
706         # Synchronize new set 
707         peer_slices = sync(old_peer_slices, slices_at_peer, Slice,
708                            ignore(columns, RefreshPeer.ignore_slice_fields))
709
710         message('(7) Dealing with Nodes in Slices')
711         # transcoder : retrieve a local node_id from a peer_node_id
712         node_transcoder = dict([(node['node_id'], peer_node_id)
713                                 for peer_node_id, node in peer_nodes.iteritems()])
714         person_transcoder = dict([(person['person_id'], peer_person_id)
715                                   for peer_person_id, person in peer_persons.iteritems()])
716
717         for peer_slice_id, slice in peer_slices.iteritems():
718             # Bind any newly cached foreign slices to peer
719             if peer_slice_id not in old_peer_slices:
720                 peer.add_slice(slice, peer_slice_id, commit=commit_mode)
721                 slice['peer_id'] = peer_id
722                 slice['peer_slice_id'] = peer_slice_id
723                 slice['node_ids'] = []
724                 slice['person_ids'] = []
725
726             # Slice as viewed by peer
727             peer_slice = slices_at_peer[peer_slice_id]
728
729             # Nodes that are currently part of the slice
730             old_slice_node_ids = [node_transcoder[node_id] for node_id in slice['node_ids']
731                                   if node_id in node_transcoder and node_transcoder[node_id] in peer_nodes]
732
733             # Nodes that should be part of the slice
734             slice_node_ids = [node_id for node_id in peer_slice['node_ids']
735                               if node_id in peer_nodes]
736
737             # Remove stale nodes from slice
738             for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
739                 slice.remove_node(peer_nodes[node_id], commit=commit_mode)
740                 message("{peername} node {hostname} (id {node_id}) removed from slice {slicename} (id {slice_id})"
741                         .format(peername=peername,
742                                 hostname=peer_nodes[node_id]['hostname'], node_id=peer_nodes[node_id]['node_id'],
743                                 slicename=slice['name'], slice_id=slice['slice_id']))
744
745             # Add new nodes to slice
746             for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
747                 slice.add_node(peer_nodes[node_id], commit=commit_mode)
748                 message("{peername} node {hostname} (id {node_id}) added into slice {slicename} (id {slice_id})"
749                         .format(peername=peername,
750                                 hostname=peer_nodes[node_id]['hostname'], node_id=peer_nodes[node_id]['node_id'],
751                                 slicename=slice['name'], slice_id=slice['slice_id']))
752
753             if slice['slice_id'] == 225:
754                 return
755
756             # N.B.: Local nodes that may have been added to the slice
757             # by hand, are removed. In other words, don't do this.
758
759             # Foreign users that are currently part of the slice
760             # old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
761             #                if person_transcoder[person_id] in peer_persons]
762             # An issue occurred with a user who registered on both sites (same email)
763             # So the remote person could not get cached locally
764             # The one-line map/filter style is nicer but ineffective here
765             old_slice_person_ids = []
766             for person_id in slice['person_ids']:
767                 if not person_transcoder.has_key(person_id):
768                     message('WARNING : person_id {person_id} in {slicename} not transcodable (1) - skipped'
769                             .format(person_id=person_id, slicename=slice['name']))
770                 elif person_transcoder[person_id] not in peer_persons:
771                     message('WARNING : person_id {person_id} in {slicename} not transcodable (2) - skipped'
772                             .format(person_id=person_id, slicename=slice['name']))
773                 else:
774                     old_slice_person_ids += [person_transcoder[person_id]]
775
776             # Foreign users that should be part of the slice
777             slice_person_ids = [person_id for person_id in peer_slice[
778                 'person_ids'] if person_id in peer_persons]
779
780             # Remove stale users from slice
781             for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
782                 slice.remove_person(
783                     peer_persons[person_id], commit=commit_mode)
784                 message("{peername} user {email} removed from slice {slicename}"
785                         .format(peername=peername,
786                                 email=peer_persons[person_id]['email'],
787                                 slicename=slice['name']))
788
789             # Add new users to slice
790             for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
791                 slice.add_person(peer_persons[person_id], commit=commit_mode)
792                 message("{peername} user {email} added into slice {slicename}"
793                         .format(peername=peername,
794                                 email=peer_persons[person_id]['email'],
795                                 slicename=slice['name']))
796
797             # N.B.: Local users that may have been added to the slice
798             # by hand, are not touched.
799
800         timers['slices'] = time.time() - start
801
802         #
803         # Persons x Sites
804         #
805         start = time.time()
806
807         message('(8) Dealing with Persons in Sites')
808
809         for peer_site_id, site in peer_sites.iteritems():
810             # Site as viewed by peer
811             peer_site = sites_at_peer[peer_site_id]
812
813             # Persons that are currently part of the site
814             old_site_person_ids = [person_transcoder[person_id] for person_id in site['person_ids']
815                                    if person_id in person_transcoder and person_transcoder[person_id] in peer_persons]
816
817             # Perons that should be part of the site
818             site_person_ids = [person_id for person_id in peer_site[
819                 'person_ids'] if person_id in peer_persons]
820
821             # Remove stale persons from site
822             for person_id in (set(old_site_person_ids) - set(site_person_ids)):
823                 site.remove_person(peer_persons[person_id], commit=commit_mode)
824                 message("{peername} person {email} removed from site {login_base}"
825                         .format(peername=peername,
826                                 email=peer_persons[person_id]['email'],
827                                 login_base=site['login_base']))
828
829             # Add new persons to site
830             for person_id in (set(site_person_ids) - set(old_site_person_ids)):
831                 site.add_person(peer_persons[person_id], commit=commit_mode)
832                 message("{peername} person {email} added into site {login_base}"
833                         .format(peername=peername,
834                                 email=peer_persons[person_id]['email'],
835                                 login_base=site['login_base']))
836
837         timers['sites-persons'] = time.time() - start
838
839         #
840         # Persons x Roles
841         #
842         start = time.time()
843
844         message('(9) Dealing with Roles for Persons')
845
846         roles = Roles(self.api)
847         roles_dict = dict([(role['role_id'], role) for role in roles])
848         for peer_person_id, person in peer_persons.iteritems():
849             # Person as viewed by peer
850             peer_person = persons_at_peer[peer_person_id]
851
852             # Roles that are currently attributed for the person
853             old_person_role_ids = [role_id for role_id in person['role_ids']]
854
855             # Roles that should be attributed to the person
856             person_role_ids = [role_id for role_id in peer_person['role_ids']]
857
858             # Remove stale roles
859             for role_id in (set(old_person_role_ids) - set(person_role_ids)):
860                 person.remove_role(roles_dict[role_id], commit=commit_mode)
861                 message("{peername} role {rolename} removed from person {email}"
862                         .format(peername=peername,
863                                 rolename=roles_dict[role_id]['name'],
864                                 email=person['email']))
865
866             # Add new roles to person
867             for role_id in (set(person_role_ids) - set(old_person_role_ids)):
868                 person.add_role(roles_dict[role_id], commit=commit_mode)
869                 message("{peername} role {rolename} added from person {email}"
870                         .format(peername=peername,
871                                 rolename=roles_dict[role_id]['name'],
872                                 email=person['email']))
873
874         timers['persons-roles'] = time.time() - start
875
876         # Update peer itself and commit
877         peer.sync(commit=True)
878
879         return timers