X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;ds=sidebyside;f=PLC%2FMethods%2FRefreshPeer.py;h=40fb1e28980a750c03d91e2c37966e90c1db829d;hb=a74854dd38cb742b8fdc0d0cda7fff738a95312c;hp=ee8f896e3e30222f4606b00e08948aa177c0c2ef;hpb=2fe12af6440fcc9fba6c086c8db883b4fcb20583;p=plcapi.git diff --git a/PLC/Methods/RefreshPeer.py b/PLC/Methods/RefreshPeer.py index ee8f896..40fb1e2 100644 --- a/PLC/Methods/RefreshPeer.py +++ b/PLC/Methods/RefreshPeer.py @@ -1,6 +1,7 @@ # # Thierry Parmentelat - INRIA # +# $Id$ import time @@ -20,11 +21,14 @@ from PLC.Nodes import Node, Nodes from PLC.SliceInstantiations import SliceInstantiations from PLC.Slices import Slice, Slices +verbose=False + class RefreshPeer(Method): """ - Fetches node and slice data from the specified peer and caches it - locally; also deletes stale entries. Returns 1 if successful, - faults otherwise. + Fetches site, node, slice, person and key data from the specified peer + and caches it locally; also deletes stale entries. + Upon successful completion, returns a dict reporting various timers. + Faults otherwise. """ roles = ['admin'] @@ -52,9 +56,12 @@ class RefreshPeer(Method): # Get peer data start = time.time() + print >>log, 'Issuing GetPeerData' peer_tables = peer.GetPeerData() timers['transport'] = time.time() - start - peer_tables['db_time'] timers['peer_db'] = peer_tables['db_time'] + if verbose: + print >>log, 'GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport']) def sync(objects, peer_objects, classobj): """ @@ -66,13 +73,16 @@ class RefreshPeer(Method): keyed on their foreign identifiers. """ + if verbose: + print >>log, 'Entering sync on',classobj(self.api).__class__.__name__ + synced = {} # Delete stale objects for peer_object_id, object in objects.iteritems(): if peer_object_id not in peer_objects: object.delete(commit = False) - print classobj, "object %d deleted" % object[object.primary_key] + print >> log, peer['peername'],classobj(self.api).__class__.__name__, object[object.primary_key],"deleted" # Add/update new/existing objects for peer_object_id, peer_object in peer_objects.iteritems(): @@ -123,6 +133,9 @@ class RefreshPeer(Method): if dbg: print >> log, peer['peername'], classobj(self.api).__class__.__name__, object[object.primary_key], dbg + if verbose: + print >>log, 'Exiting sync on',classobj(self.api).__class__.__name__ + return synced # @@ -131,6 +144,8 @@ class RefreshPeer(Method): start = time.time() + print >>log, 'Dealing with Sites' + # Compare only the columns returned by the GetPeerData() call if peer_tables['Sites']: columns = peer_tables['Sites'][0].keys() @@ -157,6 +172,8 @@ class RefreshPeer(Method): # XXX Synchronize foreign key types # + print >>log, 'Dealing with Keys' + key_types = KeyTypes(self.api).dict() # @@ -201,6 +218,8 @@ class RefreshPeer(Method): start = time.time() + print >>log, 'Dealing with Persons' + # Compare only the columns returned by the GetPeerData() call if peer_tables['Persons']: columns = peer_tables['Persons'][0].keys() @@ -209,6 +228,11 @@ class RefreshPeer(Method): # Keyed on foreign person_id old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id') + + # artificially attach the persons returned by GetPeerData to the new peer + # this is because validate_email needs peer_id to be correct when checking for duplicates + for person in peer_tables['Persons']: + person['peer_id']=peer_id persons_at_peer = dict([(peer_person['person_id'], peer_person) \ for peer_person in peer_tables['Persons']]) @@ -217,6 +241,10 @@ class RefreshPeer(Method): # Synchronize new set (still keyed on foreign person_id) peer_persons = sync(old_peer_persons, persons_at_peer, Person) + # transcoder : retrieve a local key_id from a peer_key_id + key_transcoder = dict ( [ (key['key_id'],peer_key_id) \ + for peer_key_id,key in peer_keys.iteritems()]) + for peer_person_id, person in peer_persons.iteritems(): # Bind any newly cached users to peer if peer_person_id not in old_peer_persons: @@ -229,22 +257,23 @@ class RefreshPeer(Method): peer_person = persons_at_peer[peer_person_id] # Foreign keys currently belonging to the user - old_person_keys = dict(filter(lambda (peer_key_id, key): \ - key['key_id'] in person['key_ids'], - peer_keys.items())) + old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \ + if key_transcoder[key_id] in peer_keys] # Foreign keys that should belong to the user - person_keys = dict(filter(lambda (peer_key_id, key): \ - peer_key_id in peer_person['key_ids'], - peer_keys.items())) + # this is basically peer_person['key_ids'], we just check it makes sense + # (e.g. we might have failed importing it) + person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys] # Remove stale keys from user - for peer_key_id in (set(old_person_keys.keys()) - set(person_keys.keys())): - person.remove_key(old_person_keys[peer_key_id], commit = False) + for key_id in (set(old_person_key_ids) - set(person_key_ids)): + person.remove_key(peer_keys[key_id], commit = False) + print >> log, peer['peername'], 'Key', key_id, 'removed from', person['email'] # Add new keys to user - for peer_key_id in (set(person_keys.keys()) - set(old_person_keys.keys())): - person.add_key(person_keys[peer_key_id], commit = False) + for key_id in (set(person_key_ids) - set(old_person_key_ids)): + person.add_key(peer_keys[key_id], commit = False) + print >> log, peer['peername'], 'Key', key_id, 'added into', person['email'] timers['persons'] = time.time() - start @@ -260,6 +289,8 @@ class RefreshPeer(Method): start = time.time() + print >>log, 'Dealing with Nodes' + # Compare only the columns returned by the GetPeerData() call if peer_tables['Nodes']: columns = peer_tables['Nodes'][0].keys() @@ -273,10 +304,15 @@ class RefreshPeer(Method): # Fix up site_id and boot_states references for peer_node_id, node in nodes_at_peer.items(): - if node['site_id'] not in peer_sites or \ - node['boot_state'] not in boot_states: + errors = [] + if node['site_id'] not in peer_sites: + errors.append("invalid site %d" % node['site_id']) + if node['boot_state'] not in boot_states: + errors.append("invalid boot state %s" % node['boot_state']) + if errors: # XXX Log an event instead of printing to logfile - print >> log, "Warning: Skipping invalid %s node:" % peer['peername'], node + print >> log, "Warning: Skipping invalid %s node:" % peer['peername'], \ + node, ":", ", ".join(errors) del nodes_at_peer[peer_node_id] continue else: @@ -327,6 +363,8 @@ class RefreshPeer(Method): start = time.time() + print >>log, 'Dealing with Slices' + # Compare only the columns returned by the GetPeerData() call if peer_tables['Slices']: columns = peer_tables['Slices'][0].keys() @@ -340,20 +378,33 @@ class RefreshPeer(Method): # Fix up site_id, instantiation, and creator_person_id references for peer_slice_id, slice in slices_at_peer.items(): - if slice['site_id'] not in peer_sites or \ - slice['instantiation'] not in slice_instantiations or \ - slice['creator_person_id'] not in peer_persons: - # XXX Log an event instead of printing to logfile - print >> log, "Warning: Skipping invalid %s slice:" % peer['peername'], slice + errors = [] + if slice['site_id'] not in peer_sites: + errors.append("invalid site %d" % slice['site_id']) + if slice['instantiation'] not in slice_instantiations: + errors.append("invalid instantiation %s" % slice['instantiation']) + if slice['creator_person_id'] not in peer_persons: + # Just NULL it out + slice['creator_person_id'] = None + else: + slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id'] + if errors: + print >> log, "Warning: Skipping invalid %s slice:" % peer['peername'], \ + slice, ":", ", ".join(errors) del slices_at_peer[peer_slice_id] continue else: slice['site_id'] = peer_sites[slice['site_id']]['site_id'] - slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id'] # Synchronize new set peer_slices = sync(old_peer_slices, slices_at_peer, Slice) + # transcoder : retrieve a local node_id from a peer_node_id + node_transcoder = dict ( [ (node['node_id'],peer_node_id) \ + for peer_node_id,node in peer_nodes.iteritems()]) + person_transcoder = dict ( [ (person['person_id'],peer_person_id) \ + for peer_person_id,person in peer_persons.iteritems()]) + for peer_slice_id, slice in peer_slices.iteritems(): # Bind any newly cached foreign slices to peer if peer_slice_id not in old_peer_slices: @@ -367,43 +418,52 @@ class RefreshPeer(Method): peer_slice = slices_at_peer[peer_slice_id] # Nodes that are currently part of the slice - old_slice_nodes = dict(filter(lambda (peer_node_id, node): \ - node['node_id'] in slice['node_ids'], - peer_nodes.items())) + old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \ + if node_transcoder[node_id] in peer_nodes] # Nodes that should be part of the slice - slice_nodes = dict(filter(lambda (peer_node_id, node): \ - peer_node_id in peer_slice['node_ids'], - peer_nodes.items())) + slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes] # Remove stale nodes from slice - for node_id in (set(old_slice_nodes.keys()) - set(slice_nodes.keys())): - slice.remove_node(old_slice_nodes[node_id], commit = False) + for node_id in (set(old_slice_node_ids) - set(slice_node_ids)): + slice.remove_node(peer_nodes[node_id], commit = False) + print >> log, peer['peername'], 'Node', peer_nodes[node_id]['hostname'], 'removed from', slice['name'] # Add new nodes to slice - for node_id in (set(slice_nodes.keys()) - set(old_slice_nodes.keys())): - slice.add_node(slice_nodes[node_id], commit = False) + for node_id in (set(slice_node_ids) - set(old_slice_node_ids)): + slice.add_node(peer_nodes[node_id], commit = False) + print >> log, peer['peername'], 'Node', peer_nodes[node_id]['hostname'], 'added into', slice['name'] # N.B.: Local nodes that may have been added to the slice # by hand, are removed. In other words, don't do this. # Foreign users that are currently part of the slice - old_slice_persons = dict(filter(lambda (peer_person_id, person): \ - person['person_id'] in slice['person_ids'], - peer_persons.items())) + #old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \ + # if person_transcoder[person_id] in peer_persons] + # An issue occurred with a user who registered on both sites (same email) + # So the remote person could not get cached locally + # The one-line map/filter style is nicer but ineffective here + old_slice_person_ids = [] + for person_id in slice['person_ids']: + if not person_transcoder.has_key(person_id): + print >> log, 'WARNING : person_id %d in %s not transcodable (1) - skipped'%(person_id,slice['name']) + elif person_transcoder[person_id] not in peer_persons: + print >> log, 'WARNING : person_id %d in %s not transcodable (2) - skipped'%(person_id,slice['name']) + else: + old_slice_person_ids += [person_transcoder[person_id]] # Foreign users that should be part of the slice - slice_persons = dict(filter(lambda (peer_person_id, person): \ - peer_person_id in peer_slice['person_ids'], - peer_persons.items())) + slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ] # Remove stale users from slice - for peer_person_id in (set(old_slice_persons.keys()) - set(slice_persons.keys())): - slice.remove_person(old_slice_persons[peer_person_id], commit = False) + for person_id in (set(old_slice_person_ids) - set(slice_person_ids)): + slice.remove_person(peer_persons[person_id], commit = False) + print >> log, peer['peername'], 'User', peer_persons[person_id]['email'], 'removed from', slice['name'] # Add new users to slice - for peer_person_id in (set(slice_persons.keys()) - set(old_slice_persons.keys())): - slice.add_person(slice_persons[peer_person_id], commit = False) + for person_id in (set(slice_person_ids) - set(old_slice_person_ids)): + slice.add_person(peer_persons[person_id], commit = False) + print >> log, peer['peername'], 'User', peer_persons[person_id]['email'], 'added into', slice['name'] # N.B.: Local users that may have been added to the slice # by hand, are not touched.