90a0b702ff60d6181d191ede3c396b2b13e71ab2
[plcapi.git] / PLC / Methods / RefreshPeer.py
1 #
2 # Thierry Parmentelat - INRIA
3 #
4 import os
5 import sys
6 import fcntl
7 import time
8
9 from PLC.Debug import log
10 from PLC.Faults import *
11 from PLC.Method import Method
12 from PLC.Parameter import Parameter, Mixed
13 from PLC.Auth import Auth
14
15 from PLC.Peers import Peer, Peers
16 from PLC.Sites import Site, Sites
17 from PLC.Persons import Person, Persons
18 from PLC.KeyTypes import KeyType, KeyTypes
19 from PLC.Keys import Key, Keys
20 from PLC.BootStates import BootState, BootStates
21 from PLC.Nodes import Node, Nodes
22 from PLC.SliceInstantiations import SliceInstantiations
23 from PLC.Slices import Slice, Slices
24 from PLC.Roles import Role, Roles
25
26 #################### settings
27 # initial version was doing only one final commit
28 # * set commit_mode to False to get that behaviour
29 # * set comit_mode to True to get everything synced at once
30 # the issue with the 'one-commit-at-the-end' approach is 
31 # that the db gets basically totally locked during too long
32 # causing various issues/crashes in the rest of the system
33 commit_mode=True
34
35 # turn this to False only if both ends have the same db schema
36 # compatibility mode is a bit slower but probably safer on the long run
37 compatibility=True
38
39 #################### debugging
40 # for verbose output
41 verbose=False
42 # set to a filename for using cached data when debugging
43 # WARNING: does not actually connect to the peer in this case
44 use_cache=None
45 # for debugging specific entries - display detailed info on selected objs 
46 focus_type=None # set to e.g. 'Person'
47 focus_ids=[]    # set to a list of ids (e.g. person_ids) - remote or local ids should work
48 #### example
49 #use_cache="/var/log/peers/getpeerdata.pickle"
50 #verbose=True
51 #focus_type='Person'
52 #focus_ids=[621,1088]
53
54 #################### helpers
55 def message (to_print=None,verbose_only=False):
56     if verbose_only and not verbose:
57         return
58     print >> log, time.strftime("%m-%d-%H-%M-%S:"),
59     if to_print:
60         print >>log, to_print
61
62 def message_verbose(to_print=None, header='VERBOSE'):
63     message("%s> %r"%(header,to_print),verbose_only=True)
64
65
66 #################### to avoid several instances running at the same time
67 class FileLock:
68     """
69     Lock/Unlock file
70     """
71     def __init__(self, file_path, expire = 60 * 60 * 2):
72         self.expire = expire
73         self.fpath = file_path
74         self.fd = None
75
76     def lock(self):
77         if os.path.exists(self.fpath):
78             if (time.time() - os.stat(self.fpath).st_ctime) > self.expire:
79                 try:
80                     os.unlink(self.fpath)
81                 except Exception, e:
82                     message('FileLock.lock(%s) : %s' % (self.fpath, e))
83                     return False
84         try:
85             self.fd = open(self.fpath, 'w')
86             fcntl.flock(self.fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
87         except IOError, e:
88             message('FileLock.lock(%s) : %s' % (self.fpath, e))
89             return False
90         return True
91
92     def unlock(self):
93         try:
94             fcntl.flock(self.fd, fcntl.LOCK_UN | fcntl.LOCK_NB)
95             self.fd.close()
96         except IOError, e:
97             message('FileLock.unlock(%s) : %s' % (self.fpath, e))
98
99
100 class RefreshPeer(Method):
101     """
102     Fetches site, node, slice, person and key data from the specified peer
103     and caches it locally; also deletes stale entries.
104     Upon successful completion, returns a dict reporting various timers.
105     Faults otherwise.
106     """
107
108     roles = ['admin']
109
110     accepts = [
111         Auth(),
112         Mixed(Peer.fields['peer_id'],
113               Peer.fields['peername']),
114         ]
115
116     returns = Parameter(int, "1 if successful")
117
118     ignore_site_fields=['peer_id', 'peer_site_id','last_updated', 'date_created',
119                         'address_ids', 'node_ids', 'person_ids', 'pcu_ids', 'slice_ids' ]
120     ignore_key_fields=['peer_id','peer_key_id', 'person_id']
121     ignore_person_fields=['peer_id','peer_person_id','last_updated','date_created',
122                           'key_ids','slice_ids','person_tag_ids']
123     ignore_node_fields=['peer_id','peer_node_id','last_updated','last_contact','date_created',
124                         'node_tag_ids', 'interface_ids', 'slice_ids', 'nodegroup_ids','pcu_ids','ports']
125     ignore_slice_fields=['peer_id','peer_slice_id','created',
126                          'person_ids','slice_tag_ids','node_ids',]
127
128     def call(self, auth, peer_id_or_peername):
129         ret_val = None
130         peername = Peers(self.api, [peer_id_or_peername], ['peername'])[0]['peername']
131         file_lock = FileLock("/tmp/refresh-peer-%s.lock" % peername)
132         if not file_lock.lock():
133             raise Exception, "Another instance of RefreshPeer is running."
134         try:
135             ret_val = self.real_call(auth, peer_id_or_peername)
136         except Exception, e:
137             file_lock.unlock()
138             message("RefreshPeer caught exception - BEG")
139             import traceback
140             traceback.print_exc(file=log)
141             message("RefreshPeer caught exception - END")
142             raise Exception, e
143         file_lock.unlock()
144         return ret_val
145
146
147     def real_call(self, auth, peer_id_or_peername):
148         # Get peer
149         peers = Peers(self.api, [peer_id_or_peername])
150         if not peers:
151             raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
152         peer = peers[0]
153         peer_id = peer['peer_id']
154
155         # Connect to peer API
156         peer.connect()
157
158         timers = {}
159
160         # Get peer data
161         start = time.time()
162         message('RefreshPeer starting up (commit_mode=%r)'%commit_mode)
163         if not use_cache:
164             message('Issuing GetPeerData')
165             peer_tables = peer.GetPeerData()
166         else:
167             import pickle
168             if os.path.isfile(use_cache):
169                 message("use_cache: WARNING: using cached getpeerdata")
170                 peer_tables=pickle.load(file(use_cache,'rb'))
171             else:
172                 message("use_cache: issuing getpeerdata")
173                 peer_tables = peer.GetPeerData()
174                 message("use_cache: saving in cache %s",use_cache)
175                 pickle.dump(peer_tables,file(use_cache,'wb'))
176                 
177         # for smooth federation with 4.2 - ignore fields that are useless anyway, and rewrite boot_state
178         boot_state_rewrite={'dbg':'safeboot','diag':'safeboot','disable':'disabled',
179                             'inst':'reinstall','rins':'reinstall','new':'reinstall','rcnf':'reinstall'}
180         for node in peer_tables['Nodes']:
181             for key in ['nodenetwork_ids','dummybox_id']:
182                 if key in node:
183                     del node[key]
184             if node['boot_state'] in boot_state_rewrite: node['boot_state']=boot_state_rewrite[node['boot_state']]
185         for slice in peer_tables['Slices']:
186             for key in ['slice_attribute_ids']:
187                 if key in slice:
188                     del slice[key]
189         timers['transport'] = time.time() - start - peer_tables['db_time']
190         timers['peer_db'] = peer_tables['db_time']
191         message_verbose('GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport']))
192
193         def sync(objects, peer_objects, classobj, columns):
194             """
195             Synchronizes two dictionaries of objects. objects should
196             be a dictionary of local objects keyed on their foreign
197             identifiers. peer_objects should be a dictionary of
198             foreign objects keyed on their local (i.e., foreign to us)
199             identifiers. Returns a final dictionary of local objects
200             keyed on their foreign identifiers.
201             """
202
203             classname=classobj(self.api).__class__.__name__
204             primary_key=getattr(classobj,'primary_key')
205             # display all peer objects of these types while looping
206             secondary_keys={'Node':'hostname','Slice':'name','Person':'email','Site':'login_base'}
207             secondary_key=None
208             if classname in secondary_keys: secondary_key=secondary_keys[classname]
209
210             message_verbose('Entering sync on %s (%s)'%(classname,primary_key))
211
212             synced = {}
213
214             # Delete stale objects
215             for peer_object_id, object in objects.iteritems():
216                 if peer_object_id not in peer_objects:
217                     object.delete(commit = commit_mode)
218                     message("%s %s %s deleted"%(peer['peername'],classname, object[primary_key]))
219
220             total = len(peer_objects)
221             count=1
222
223             # peer_object_id, peer_object and object are dynamically bound in the loop below...
224             # (local) object might be None if creating a new one
225             def in_focus():
226                 if classname != focus_type: return False
227                 return peer_object_id in focus_ids or \
228                     (object and primary_key in object and object[primary_key] in focus_ids)
229
230             def message_focus (message):
231                 if in_focus():
232                     # always show remote
233                     message_verbose("peer_obj : %d [[%r]]"%(peer_object_id,peer_object),
234                                     header='FOCUS '+message)
235                     # show local object if a match was found
236                     if object: message_verbose("local_obj : <<%r>>"%(object),
237                                                header='FOCUS '+message);
238
239
240             # the function to compare a local object with its cadidate peer obj
241             # xxx probably faster when compatibility is False...
242             def equal_fields (object, peer_object, columns):
243                 # fast version: must use __eq__() instead of == since
244                 # peer_object may be a raw dict instead of a Peer object.
245                 if not compatibility: return object.__eq__(peer_object)
246                 elif not verbose:
247                     for column in columns:
248 #                        if in_focus(): message ('FOCUS comparing column %s'%column)
249                         if object[column] != peer_object[column]: return False
250                     return True
251                 else:
252                     result=True
253 #                    print >> log, 'COMPARING ',
254                     for column in columns:
255                         test= object[column] == peer_object[column]
256 #                        print >> log, column,test,
257                         if not test: result=False
258 #                    print >> log, '=>',result
259                     return result
260
261             # Add/update new/existing objects
262             for peer_object_id, peer_object in peer_objects.iteritems():
263                 peer_object_name=""
264                 if secondary_key: peer_object_name="(%s)"%peer_object[secondary_key]
265                 message_verbose ('%s peer_object_id=%d %s (%d/%d)'%(classname,peer_object_id,peer_object_name,count,total))
266                 count += 1
267                 if peer_object_id in synced:
268                     message("Warning: %s Skipping already added %s: %r"%(
269                             peer['peername'], classname, peer_object))
270                     continue
271
272                 if peer_object_id in objects:
273                     # Update existing object
274                     object = objects[peer_object_id]
275
276                     # Replace foreign identifier with existing local
277                     # identifier temporarily for the purposes of
278                     # comparison.
279                     peer_object[primary_key] = object[primary_key]
280
281                     if not equal_fields(object,peer_object, columns):
282                         # Only update intrinsic fields
283                         object.update(object.db_fields(peer_object))
284                         message_focus ("DIFFERENCES : updated / syncing")
285                         sync = True
286                         action = "changed"
287                     else:
288                         message_focus ("UNCHANGED - left intact / not syncing")
289                         sync = False
290                         action = None
291
292                     # Restore foreign identifier
293                     peer_object[primary_key] = peer_object_id
294                 else:
295                     object=None
296                     # Add new object
297                     object = classobj(self.api, peer_object)
298                     # Replace foreign identifier with new local identifier
299                     del object[primary_key]
300                     message_focus ("NEW -- created with clean id - syncing")
301                     sync = True
302                     action = "added"
303
304                 if sync:
305                     message_verbose("syncing %s %d - commit_mode=%r"%(classname,peer_object_id,commit_mode))
306                     try:
307                         object.sync(commit = commit_mode)
308                     except PLCInvalidArgument, err:
309                         # Skip if validation fails
310                         # XXX Log an event instead of printing to logfile
311                         message("Warning: %s Skipping invalid %s %r : %r"%(\
312                                 peer['peername'], classname, peer_object, err))
313                         continue
314
315                 synced[peer_object_id] = object
316
317                 if action:
318                     message("%s: (%d/%d) %s %d %s %s"%(peer['peername'], count,total, classname, 
319                                                        object[primary_key], peer_object_name, action))
320
321             message_verbose("Exiting sync on %s"%classname)
322
323             return synced
324
325         ### over time, we've had issues with a given column being
326         ### added on one side and not on the other
327         ### this helper function computes the intersection of two list of fields/columns
328         def intersect (l1,l2): 
329             if compatibility: return list (set(l1).intersection(set(l2))) 
330             else: return l1
331
332         # some fields definitely need to be ignored
333         def ignore (l1,l2):
334             return list (set(l1).difference(set(l2)))
335
336         #
337         # Synchronize foreign sites
338         #
339
340         start = time.time()
341
342         message('Dealing with Sites')
343
344         # Compare only the columns returned by the GetPeerData() call
345         if peer_tables['Sites']:
346             columns = peer_tables['Sites'][0].keys()
347             columns = intersect (columns, Site.fields)
348         else:
349             columns = None
350
351         # Keyed on foreign site_id
352         old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
353         sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
354
355         # Synchronize new set (still keyed on foreign site_id)
356         peer_sites = sync(old_peer_sites, sites_at_peer, Site, ignore (columns, RefreshPeer.ignore_site_fields))
357
358         for peer_site_id, site in peer_sites.iteritems():
359             # Bind any newly cached sites to peer
360             if peer_site_id not in old_peer_sites:
361                 peer.add_site(site, peer_site_id, commit = commit_mode)
362                 site['peer_id'] = peer_id
363                 site['peer_site_id'] = peer_site_id
364
365         timers['site'] = time.time() - start
366
367         #
368         # XXX Synchronize foreign key types
369         #
370
371         message('Dealing with Keys')
372
373         key_types = KeyTypes(self.api).dict()
374
375         #
376         # Synchronize foreign keys
377         #
378
379         start = time.time()
380
381         # Compare only the columns returned by the GetPeerData() call
382         if peer_tables['Keys']:
383             columns = peer_tables['Keys'][0].keys()
384             columns = intersect (columns, Key.fields)
385         else:
386             columns = None
387
388         # Keyed on foreign key_id
389         old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
390         keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
391
392         # Fix up key_type references
393         for peer_key_id, key in keys_at_peer.items():
394             if key['key_type'] not in key_types:
395                 # XXX Log an event instead of printing to logfile
396                 message("Warning: Skipping invalid %s key %r" % ( peer['peername'], key))
397                 del keys_at_peer[peer_key_id]
398                 continue
399
400         # Synchronize new set (still keyed on foreign key_id)
401         peer_keys = sync(old_peer_keys, keys_at_peer, Key, ignore (columns, RefreshPeer.ignore_key_fields))
402         for peer_key_id, key in peer_keys.iteritems():
403             # Bind any newly cached keys to peer
404             if peer_key_id not in old_peer_keys:
405                 peer.add_key(key, peer_key_id, commit = commit_mode)
406                 key['peer_id'] = peer_id
407                 key['peer_key_id'] = peer_key_id
408
409         timers['keys'] = time.time() - start
410
411         #
412         # Synchronize foreign users
413         #
414
415         start = time.time()
416
417         message('Dealing with Persons')
418
419         # Compare only the columns returned by the GetPeerData() call
420         if peer_tables['Persons']:
421             columns = peer_tables['Persons'][0].keys()
422             columns = intersect (columns, Person.fields)
423         else:
424             columns = None
425
426         # Keyed on foreign person_id
427         old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
428
429         # artificially attach the persons returned by GetPeerData to the new peer
430         # this is because validate_email needs peer_id to be correct when checking for duplicates
431         for person in peer_tables['Persons']:
432             person['peer_id']=peer_id
433         persons_at_peer = dict([(peer_person['person_id'], peer_person) \
434                                 for peer_person in peer_tables['Persons']])
435
436         # XXX Do we care about membership in foreign site(s)?
437
438         # Synchronize new set (still keyed on foreign person_id)
439         peer_persons = sync(old_peer_persons, persons_at_peer, Person, ignore (columns, RefreshPeer.ignore_person_fields))
440
441         # transcoder : retrieve a local key_id from a peer_key_id
442         key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
443                                   for peer_key_id,key in peer_keys.iteritems()])
444
445         for peer_person_id, person in peer_persons.iteritems():
446             # Bind any newly cached users to peer
447             if peer_person_id not in old_peer_persons:
448                 peer.add_person(person, peer_person_id, commit = commit_mode)
449                 person['peer_id'] = peer_id
450                 person['peer_person_id'] = peer_person_id
451                 person['key_ids'] = []
452      
453
454             # User as viewed by peer
455             peer_person = persons_at_peer[peer_person_id]
456
457             # Foreign keys currently belonging to the user
458             old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
459                                   if key_transcoder[key_id] in peer_keys]
460
461             # Foreign keys that should belong to the user
462             # this is basically peer_person['key_ids'], we just check it makes sense
463             # (e.g. we might have failed importing it)
464             person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
465
466             # Remove stale keys from user
467             for key_id in (set(old_person_key_ids) - set(person_key_ids)):
468                 person.remove_key(peer_keys[key_id], commit = commit_mode)
469                 message ("%s Key %d removed from person %s"%(peer['peername'], key_id, person['email']))
470
471             # Add new keys to user
472             for key_id in (set(person_key_ids) - set(old_person_key_ids)):
473                 message ("before add_key, passing person=%r"%person)
474                 message ("before add_key, passing key=%r"%peer_keys[key_id])
475                 person.add_key(peer_keys[key_id], commit = commit_mode)
476                 message ("%s Key %d added into person %s"%(peer['peername'],key_id, person['email']))
477
478         timers['persons'] = time.time() - start
479
480         #
481         # XXX Synchronize foreign boot states
482         #
483
484         boot_states = BootStates(self.api).dict()
485
486         #
487         # Synchronize foreign nodes
488         #
489
490         start = time.time()
491
492         message('Dealing with Nodes (1)')
493
494         # Compare only the columns returned by the GetPeerData() call
495         if peer_tables['Nodes']:
496             columns = peer_tables['Nodes'][0].keys()
497             columns = intersect (columns, Node.fields)
498         else:
499             columns = Node.fields
500
501         # Keyed on foreign node_id
502         old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
503         nodes_at_peer = dict([(node['node_id'], node) \
504                               for node in peer_tables['Nodes']])
505
506         # Fix up site_id and boot_states references
507         for peer_node_id, node in nodes_at_peer.items():
508             errors = []
509             if node['site_id'] not in peer_sites:
510                 errors.append("invalid site %d" % node['site_id'])
511             if node['boot_state'] not in boot_states:
512                 errors.append("invalid boot state %s" % node['boot_state'])
513             if errors:
514                 # XXX Log an event instead of printing to logfile
515                 message ("Warning: Skipping invalid %s node %r : " % (peer['peername'], node)\
516                              + ", ".join(errors))
517                 del nodes_at_peer[peer_node_id]
518                 continue
519             else:
520                 node['site_id'] = peer_sites[node['site_id']]['site_id']
521
522         # Synchronize new set
523         peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node, ignore (columns, RefreshPeer.ignore_node_fields))
524
525         for peer_node_id, node in peer_nodes.iteritems():
526             # Bind any newly cached foreign nodes to peer
527             if peer_node_id not in old_peer_nodes:
528                 peer.add_node(node, peer_node_id, commit = commit_mode)
529                 node['peer_id'] = peer_id
530                 node['peer_node_id'] = peer_node_id
531
532         timers['nodes'] = time.time() - start
533
534         #
535         # Synchronize local nodes
536         #
537
538         start = time.time()
539         message('Dealing with Nodes (2)')
540
541         # Keyed on local node_id
542         local_nodes = Nodes(self.api).dict()
543
544         for node in peer_tables['PeerNodes']:
545             # Foreign identifier for our node as maintained by peer
546             peer_node_id = node['node_id']
547             # Local identifier for our node as cached by peer
548             node_id = node['peer_node_id']
549             if node_id in local_nodes:
550                 # Still a valid local node, add it to the synchronized
551                 # set of local node objects keyed on foreign node_id.
552                 peer_nodes[peer_node_id] = local_nodes[node_id]
553
554         timers['local_nodes'] = time.time() - start
555
556         #
557         # XXX Synchronize foreign slice instantiation states
558         #
559
560         slice_instantiations = SliceInstantiations(self.api).dict()
561
562         #
563         # Synchronize foreign slices
564         #
565
566         start = time.time()
567
568         message('Dealing with Slices (1)')
569
570         # Compare only the columns returned by the GetPeerData() call
571         if peer_tables['Slices']:
572             columns = peer_tables['Slices'][0].keys()
573             columns = intersect (columns, Slice.fields)
574         else:
575             columns = None
576
577         # Keyed on foreign slice_id
578         old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
579         slices_at_peer = dict([(slice['slice_id'], slice) \
580                                for slice in peer_tables['Slices']])
581
582         # Fix up site_id, instantiation, and creator_person_id references
583         for peer_slice_id, slice in slices_at_peer.items():
584             errors = []
585             if slice['site_id'] not in peer_sites:
586                 errors.append("invalid site %d" % slice['site_id'])
587             if slice['instantiation'] not in slice_instantiations:
588                 errors.append("invalid instantiation %s" % slice['instantiation'])
589             if slice['creator_person_id'] not in peer_persons:
590                 # Just NULL it out
591                 slice['creator_person_id'] = None
592             else:
593                 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
594             if errors:
595                 message("Warning: Skipping invalid %s slice %r : " % (peer['peername'], slice) \
596                             + ", ".join(errors))
597                 del slices_at_peer[peer_slice_id]
598                 continue
599             else:
600                 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
601
602         # Synchronize new set
603         peer_slices = sync(old_peer_slices, slices_at_peer, Slice, ignore (columns, RefreshPeer.ignore_slice_fields))
604
605         message('Dealing with Slices (2)')
606         # transcoder : retrieve a local node_id from a peer_node_id
607         node_transcoder = dict ( [ (node['node_id'],peer_node_id) \
608                                    for peer_node_id,node in peer_nodes.iteritems()])
609         person_transcoder = dict ( [ (person['person_id'],peer_person_id) \
610                                      for peer_person_id,person in peer_persons.iteritems()])
611
612         for peer_slice_id, slice in peer_slices.iteritems():
613             # Bind any newly cached foreign slices to peer
614             if peer_slice_id not in old_peer_slices:
615                 peer.add_slice(slice, peer_slice_id, commit = commit_mode)
616                 slice['peer_id'] = peer_id
617                 slice['peer_slice_id'] = peer_slice_id
618                 slice['node_ids'] = []
619                 slice['person_ids'] = []
620
621             # Slice as viewed by peer
622             peer_slice = slices_at_peer[peer_slice_id]
623
624             # Nodes that are currently part of the slice
625             old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
626                                    if node_id in node_transcoder and node_transcoder[node_id] in peer_nodes]
627
628             # Nodes that should be part of the slice
629             slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
630
631             # Remove stale nodes from slice
632             for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
633                 slice.remove_node(peer_nodes[node_id], commit = commit_mode)
634                 message ("%s node %s removed from slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
635
636             # Add new nodes to slice
637             for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
638                 slice.add_node(peer_nodes[node_id], commit = commit_mode)
639                 message ("%s node %s added into slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
640
641             # N.B.: Local nodes that may have been added to the slice
642             # by hand, are removed. In other words, don't do this.
643
644             # Foreign users that are currently part of the slice
645             #old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
646             #                if person_transcoder[person_id] in peer_persons]
647             # An issue occurred with a user who registered on both sites (same email)
648             # So the remote person could not get cached locally
649             # The one-line map/filter style is nicer but ineffective here
650             old_slice_person_ids = []
651             for person_id in slice['person_ids']:
652                 if not person_transcoder.has_key(person_id):
653                     message ('WARNING : person_id %d in %s not transcodable (1) - skipped'%(person_id,slice['name']))
654                 elif person_transcoder[person_id] not in peer_persons:
655                     message('WARNING : person_id %d in %s not transcodable (2) - skipped'%(person_id,slice['name']))
656                 else:
657                     old_slice_person_ids += [person_transcoder[person_id]]
658
659             # Foreign users that should be part of the slice
660             slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
661
662             # Remove stale users from slice
663             for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
664                 slice.remove_person(peer_persons[person_id], commit = commit_mode)
665                 message ("%s user %s removed from slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
666
667             # Add new users to slice
668             for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
669                 slice.add_person(peer_persons[person_id], commit = commit_mode)
670                 message ("%s user %s added into slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
671
672             # N.B.: Local users that may have been added to the slice
673             # by hand, are not touched.
674
675         timers['slices'] = time.time() - start
676
677
678         #
679         # Persons x Sites
680         #
681         start = time.time()
682
683         message('Dealing Sites X Persons relationship')
684
685         for peer_site_id, site in peer_sites.iteritems():
686             # Site as viewed by peer
687             peer_site = sites_at_peer[peer_site_id]
688
689             # Persons that are currently part of the site
690             old_site_person_ids = [ person_transcoder[person_id] for person_id in site['person_ids'] \
691                                    if person_id in person_transcoder and person_transcoder[person_id] in peer_persons]
692
693             # Perons that should be part of the site
694             site_person_ids = [ person_id for person_id in peer_site['person_ids'] if person_id in peer_persons]
695
696             # Remove stale persons from site
697             for person_id in (set(old_site_person_ids) - set(site_person_ids)):
698                 site.remove_person(peer_persons[person_id], commit = commit_mode)
699                 message ("%s person %s removed from site %s"%(peer['peername'], peer_persons[person_id]['email'], site['login_base']))
700
701             # Add new persons to site
702             for person_id in (set(site_person_ids) - set(old_site_person_ids)):
703                 site.add_person(peer_persons[person_id], commit = commit_mode)
704                 message ("%s person %s added into site %s"%(peer['peername'], peer_persons[person_id]['email'], site['login_base']))
705
706         timers['sites-persons'] = time.time() - start
707
708
709         #
710         # Persons x Roles
711         #
712         start = time.time()
713
714         message('Dealing with Persons Roles relationship')
715         
716         roles = Roles(self.api)
717         roles_dict = dict([(role['role_id'], role) for role in roles])
718         for peer_person_id, person in peer_persons.iteritems():
719             # Person as viewed by peer
720             peer_person = persons_at_peer[peer_person_id]
721
722             # Roles that are currently attributed for the person
723             old_person_role_ids = [ role_id for role_id in person['role_ids'] ]
724
725             # Roles that should be attributed to the person
726             person_role_ids = [ role_id for role_id in peer_person['role_ids'] ]
727
728             # Remove stale roles
729             for role_id in (set(old_person_role_ids) - set(person_role_ids)):
730                 person.remove_role(roles_dict[role_id], commit = commit_mode)
731                 message ("%s role %s removed from person %s"%(peer['peername'], roles_dict[role_id]['name'], person['email']))
732
733             # Add new roles to person
734             for role_id in (set(person_role_ids) - set(old_person_role_ids)):
735                 person.add_role(roles_dict[role_id], commit = commit_mode)
736                 message ("%s role %s added from person %s"%(peer['peername'], roles_dict[role_id]['name'], person['email']))
737
738         timers['persons-roles'] = time.time() - start
739
740         # Update peer itself and commit
741         peer.sync(commit = True)
742
743         return timers