compatibility mode, support for 2 ends running different api releases
authorThierry Parmentelat <thierry.parmentelat@sophia.inria.fr>
Wed, 15 Sep 2010 13:46:59 +0000 (15:46 +0200)
committerThierry Parmentelat <thierry.parmentelat@sophia.inria.fr>
Wed, 15 Sep 2010 13:46:59 +0000 (15:46 +0200)
tweaked verbose mode and focus for debugging specific entries

PLC/Methods/RefreshPeer.py

index 4a5a6cc..f8c81b4 100644 (file)
@@ -1,9 +1,6 @@
 #
 # Thierry Parmentelat - INRIA
 #
-# $Id$
-# $URL$
-
 import os
 import sys
 import fcntl
@@ -25,13 +22,29 @@ from PLC.Nodes import Node, Nodes
 from PLC.SliceInstantiations import SliceInstantiations
 from PLC.Slices import Slice, Slices
 
-verbose=False
-
+#################### settings
 # initial version was doing only one final commit
 # * set commit_mode to False to get that behaviour
 # * set comit_mode to True to get everything synced at once
+# the issue with the 'one-commit-at-the-end' approach is 
+# that the db gets basically totally locked during too long
+# causing various issues/crashes in the rest of the system
 commit_mode=True
 
+# turn this to False only if both ends have the same db schema
+# compatibility mode is a bit slower but probably safer on the long run
+compatibility=True
+
+# for verbose output
+verbose=False
+# for debugging specific entries - display detailed info on selected objs 
+focus_type=None # set to e.g. 'Person'
+focus_ids=[]    # set to a list of ids (e.g. person_ids) - remote or local ids should work
+#verbose=True
+#focus_type='Person'
+#focus_ids=[29103,239578,28825]
+
+#################### helpers
 def message (to_print=None,verbose_only=False):
     if verbose_only and not verbose:
         return
@@ -39,10 +52,11 @@ def message (to_print=None,verbose_only=False):
     if to_print:
         print >>log, to_print
 
-def message_verbose(to_print=None):
-    message(to_print,verbose_only=True)
+def message_verbose(to_print=None, header='VERBOSE'):
+    message("%s> %r"%(header,to_print),verbose_only=True)
 
 
+#################### to avoid several instances running at the same time
 class FileLock:
     """
     Lock/Unlock file
@@ -104,6 +118,10 @@ class RefreshPeer(Method):
             ret_val = self.real_call(auth, peer_id_or_peername)
         except Exception, e:
             file_lock.unlock()
+            message("RefreshPeer caught exception - BEG")
+            import traceback
+            traceback.print_exc()
+            message("RefreshPeer caught exception - END")
             raise Exception, e
         file_lock.unlock()
         return ret_val
@@ -143,7 +161,7 @@ class RefreshPeer(Method):
         timers['peer_db'] = peer_tables['db_time']
         message_verbose('GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport']))
 
-        def sync(objects, peer_objects, classobj):
+        def sync(objects, peer_objects, classobj, columns):
             """
             Synchronizes two dictionaries of objects. objects should
             be a dictionary of local objects keyed on their foreign
@@ -154,7 +172,13 @@ class RefreshPeer(Method):
             """
 
             classname=classobj(self.api).__class__.__name__
-            message_verbose('Entering sync on %s'%classname)
+            primary_key=getattr(classobj,'primary_key')
+            # display all peer objects of these types while looping
+            secondary_keys={'Node':'hostname','Slice':'name','Person':'email','Site':'login_base'}
+            secondary_key=None
+            if classname in secondary_keys: secondary_key=secondary_keys[classname]
+
+            message_verbose('Entering sync on %s (%s)'%(classname,primary_key))
 
             synced = {}
 
@@ -162,29 +186,46 @@ class RefreshPeer(Method):
             for peer_object_id, object in objects.iteritems():
                 if peer_object_id not in peer_objects:
                     object.delete(commit = commit_mode)
-                    message("%s %s %s deleted"%(peer['peername'],classname, object[object.primary_key]))
+                    message("%s %s %s deleted"%(peer['peername'],classname, object[primary_key]))
 
             total = len(peer_objects)
             count=1
-            # set this to something realistic to trace down a given object(s)
-            trace_type="Node"
-            trace_ids=[]
-            def trace (message):
-                if classname == trace_type and peer_object_id in trace_ids:
-                    message_verbose('TRACE>>'+message)
+
+            # peer_object_id, peer_object and object are dynamically bound in the loop below...
+            # (local) object might be None if creating a new one
+            def message_focus (message):
+                if classname != focus_type: return
+                if peer_object_id in focus_ids or \
+                        (object and primary_key in object and object[primary_key] in focus_ids):
+                    # always show remote
+                    message_verbose("peer_obj : %d [[%r]]"%(peer_object_id,peer_object),
+                                    header='FOCUS '+message)
+                    # show local object if a match was found
+                    if object: message_verbose("local_obj : <<%r>>"%(object),
+                                               header='FOCUS '+message);
+
+            # the function to compare a local object with its cadidate peer obj
+            # xxx probably faster when compatibility is False...
+            def equal_fields (object, peer_object, columns):
+                # fast version: must use __eq__() instead of == since
+                # peer_object may be a raw dict instead of a Peer object.
+                if not compatibility: return object.__eq__(peer_object)
+                else:
+                    for column in columns: 
+                        if object[column] != peer_object[column]: return False
+                    return True
 
             # Add/update new/existing objects
             for peer_object_id, peer_object in peer_objects.iteritems():
-                message_verbose ('DBG %s peer_object_id=%d (%d/%d)'%(classname,peer_object_id,count,total))
+                peer_object_name=""
+                if secondary_key: peer_object_name="(%s)"%peer_object[secondary_key]
+                message_verbose ('%s peer_object_id=%d %s (%d/%d)'%(classname,peer_object_id,peer_object_name,count,total))
                 count += 1
                 if peer_object_id in synced:
                     message("Warning: %s Skipping already added %s: %r"%(
                             peer['peername'], classname, peer_object))
                     continue
-                if classname == 'Node':
-                    message_verbose ('DBG>> hostname=%s'%peer_object['hostname'])
-                elif classname == "Slice":
-                    message_verbose ('DBG>> slicename=%s'%peer_object['name'])
+
                 if peer_object_id in objects:
                     # Update existing object
                     object = objects[peer_object_id]
@@ -192,39 +233,33 @@ class RefreshPeer(Method):
                     # Replace foreign identifier with existing local
                     # identifier temporarily for the purposes of
                     # comparison.
-                    peer_object[object.primary_key] = object[object.primary_key]
+                    peer_object[primary_key] = object[primary_key]
 
-                    # Must use __eq__() instead of == since
-                    # peer_object may be a raw dict instead of a Peer
-                    # object.
-                    trace ("in objects : comparing")
-                    if not object.__eq__(peer_object):
+                    if equal_fields(object,peer_object, columns):
                         # Only update intrinsic fields
-                        trace ("updating")
                         object.update(object.db_fields(peer_object))
-                        trace ("updated")
+                        message_focus ("DIFFERENCES : updated / syncing")
                         sync = True
-                        dbg = "changed"
+                        action = "changed"
                     else:
-                        trace ("intact")
+                        message_focus ("UNCHANGED - left intact / not syncing")
                         sync = False
-                        dbg = None
+                        action = None
 
                     # Restore foreign identifier
-                    peer_object[object.primary_key] = peer_object_id
+                    peer_object[primary_key] = peer_object_id
                 else:
-                    trace ("not in objects -- creating")
+                    object=None
                     # Add new object
                     object = classobj(self.api, peer_object)
-                    trace ("created")
                     # Replace foreign identifier with new local identifier
-                    del object[object.primary_key]
-                    trace ("forced clean id")
+                    del object[primary_key]
+                    message_focus ("NEW -- created with clean id - syncing")
                     sync = True
-                    dbg = "added"
+                    action = "added"
 
                 if sync:
-                    message_verbose("DBG>> syncing %s %d - commit_mode=%r"%(classname,peer_object_id,commit_mode))
+                    message_verbose("syncing %s %d - commit_mode=%r"%(classname,peer_object_id,commit_mode))
                     try:
                         object.sync(commit = commit_mode)
                     except PLCInvalidArgument, err:
@@ -236,13 +271,20 @@ class RefreshPeer(Method):
 
                 synced[peer_object_id] = object
 
-                if dbg:
-                    message("%s: %s %d %s"%(peer['peername'], classname, object[object.primary_key], dbg))
+                if action:
+                    message("%s: %s %d %s %s"%(peer['peername'], classname, object[primary_key], peer_object_name, action))
 
             message_verbose("Exiting sync on %s"%classname)
 
             return synced
 
+        ### over time, we've had issues with a given column being
+        ### added on one side and not on the other
+        ### this helper function computes the intersection of two list of fields/columns
+        def intersect (l1,l2): 
+            if compatibility: return list (set(l1).intersection(set(l2))) 
+            else: return l1
+
         #
         # Synchronize foreign sites
         #
@@ -254,6 +296,7 @@ class RefreshPeer(Method):
         # Compare only the columns returned by the GetPeerData() call
         if peer_tables['Sites']:
             columns = peer_tables['Sites'][0].keys()
+            columns = intersect (columns, Site.fields)
         else:
             columns = None
 
@@ -262,7 +305,7 @@ class RefreshPeer(Method):
         sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
 
         # Synchronize new set (still keyed on foreign site_id)
-        peer_sites = sync(old_peer_sites, sites_at_peer, Site)
+        peer_sites = sync(old_peer_sites, sites_at_peer, Site, columns)
 
         for peer_site_id, site in peer_sites.iteritems():
             # Bind any newly cached sites to peer
@@ -290,6 +333,7 @@ class RefreshPeer(Method):
         # Compare only the columns returned by the GetPeerData() call
         if peer_tables['Keys']:
             columns = peer_tables['Keys'][0].keys()
+            columns = intersect (columns, Key.fields)
         else:
             columns = None
 
@@ -306,7 +350,7 @@ class RefreshPeer(Method):
                 continue
 
         # Synchronize new set (still keyed on foreign key_id)
-        peer_keys = sync(old_peer_keys, keys_at_peer, Key)
+        peer_keys = sync(old_peer_keys, keys_at_peer, Key, columns)
         for peer_key_id, key in peer_keys.iteritems():
             # Bind any newly cached keys to peer
             if peer_key_id not in old_peer_keys:
@@ -327,6 +371,7 @@ class RefreshPeer(Method):
         # Compare only the columns returned by the GetPeerData() call
         if peer_tables['Persons']:
             columns = peer_tables['Persons'][0].keys()
+            columns = intersect (columns, Person.fields)
         else:
             columns = None
 
@@ -343,7 +388,7 @@ class RefreshPeer(Method):
         # XXX Do we care about membership in foreign site(s)?
 
         # Synchronize new set (still keyed on foreign person_id)
-        peer_persons = sync(old_peer_persons, persons_at_peer, Person)
+        peer_persons = sync(old_peer_persons, persons_at_peer, Person, columns)
 
         # transcoder : retrieve a local key_id from a peer_key_id
         key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
@@ -376,6 +421,8 @@ class RefreshPeer(Method):
 
             # Add new keys to user
             for key_id in (set(person_key_ids) - set(old_person_key_ids)):
+                message ("before add_key, passing person=%r"%person)
+                message ("before add_key, passing key=%r"%peer_keys[key_id])
                 person.add_key(peer_keys[key_id], commit = commit_mode)
                 message ("%s Key %d added into person %s"%(peer['peername'],key_id, person['email']))
 
@@ -398,11 +445,9 @@ class RefreshPeer(Method):
         # Compare only the columns returned by the GetPeerData() call
         if peer_tables['Nodes']:
             columns = peer_tables['Nodes'][0].keys()
+            columns = intersect (columns, Node.fields)
         else:
-            # smooth federation with a 4.2 peer - ignore these fields that are useless anyway
             columns = Node.fields
-            if 'interface_ids' in columns: columns.remove('interface_ids')
-            if 'dummybox_id' in columns: columns.remove('dummybox_id')
 
         # Keyed on foreign node_id
         old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
@@ -426,7 +471,7 @@ class RefreshPeer(Method):
                 node['site_id'] = peer_sites[node['site_id']]['site_id']
 
         # Synchronize new set
-        peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
+        peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node, columns)
 
         for peer_node_id, node in peer_nodes.iteritems():
             # Bind any newly cached foreign nodes to peer
@@ -476,6 +521,7 @@ class RefreshPeer(Method):
         # Compare only the columns returned by the GetPeerData() call
         if peer_tables['Slices']:
             columns = peer_tables['Slices'][0].keys()
+            columns = intersect (columns, Slice.fields)
         else:
             columns = None
 
@@ -505,7 +551,7 @@ class RefreshPeer(Method):
                 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
 
         # Synchronize new set
-        peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
+        peer_slices = sync(old_peer_slices, slices_at_peer, Slice, columns)
 
         message('Dealing with Slices (2)')
         # transcoder : retrieve a local node_id from a peer_node_id