- call DeleteSlice
[plcapi.git] / PLC / Cache.py
index ff9f180..21b8950 100644 (file)
@@ -1,3 +1,5 @@
+import time
+
 from PLC.Faults import *
 from PLC.Parameter import Parameter
 from PLC.Filter import Filter
@@ -28,18 +30,40 @@ def class_attributes (classname):
 
 class Cache:
 
-    # an attempt to provide genericity in the caching algorithm
+    """
+    This class is the core of the RefreshPeer method's implementation,
+    that basically calls Cache:refresh_peer
+
+    It manages caching on a table-by-table basis, and performs required
+    transcoding from remote ids to local ids - that's the job of the
+    Transcoder class
+
+    For the tables (classes) that it handles, it uses the following
+    attributes
+    (*) primary_key is the name of the field where db indexes are stored
+    (*) class_key is used to implement equality on objects
+    (*) foreign_fields is the list of fields that are copied verbatim from
+        foreign objects
+    (*) foreign_xrefs is the list of columns that are subject to transcoding
+        (.) when obj[field] is an int, the transcoded value is directly
+        inserted in the table
+        (.) if it's a list, it is assumed that the relationship is maintained
+            in an external 2-column table. That's where the XrefTable class comes in
+
+    The relationship between slices, slice attribute types and slice attributes being
+    more complex, it is handled in a specifically customized version of update_table
+
+    Of course the order in which the tables are managed is important, with different
+    orders several iterations might be needed for the update process to converge.
+
+    Of course the timers field was introduced for optimization and could safely be removed
+    """
     
-    # the Peer object we are syncing with
-    def __init__ (self, api, peer, peer_server, auth):
-
-       import PLC.Peers
+    def __init__ (self, api, peer_id, peer_server):
 
        self.api = api
-        assert isinstance(peer,PLC.Peers.Peer)
-        self.peer = peer
+        self.peer_id = peer_id
        self.peer_server = peer_server
-       self.auth = auth
         
     class Transcoder:
 
@@ -63,18 +87,19 @@ class Cache:
        def transcode (self, alien_id):
            """ transforms an alien id into a local one """
            # locate alien obj from alien_id
-           verbose ('entering transcode with alien_id',alien_id,)
+           #verbose ('.entering transcode with alien_id',alien_id,)
            alien_object=self.alien_objects_byid[alien_id]
-           verbose ('located alien_obj',)
+           #verbose ('..located alien_obj',)
            name = alien_object [self.class_key]
-           verbose ('got name',name,)
+           #verbose ('...got name',name,)
            local_object=self.local_objects_byname[name]
-           verbose ('found local obj')
+           #verbose ('....found local obj')
            local_id=local_object[self.primary_key]
-           verbose ('and local_id',local_id)
+           #verbose ('.....and local_id',local_id)
            return local_id
            
 
+    # for handling simple n-to-n relation tables, like e.g. slice_node
     class XrefTable: 
 
        def __init__ (self, api, tablename, class1, class2):
@@ -93,12 +118,22 @@ class Cache:
                self.api.db.do (sql)
 
        def insert_new_items (self, id1, id2_set):
-        ### xxx needs to be optimized
-        ### tried to figure a way to use a single sql statement
-        ### like: insert into table (x,y) values (1,2),(3,4);
-        ### but apparently this is not supported under postgresql
-           for id2 in id2_set:
-               sql = "INSERT INTO %s VALUES (%d,%d)"%(self.tablename,id1,id2)
+            ### xxx needs to be optimized
+            ### tried to figure a way to use a single sql statement
+            ### like: insert into table (x,y) values (1,2),(3,4);
+            ### but apparently this is not supported under postgresql
+            for id2 in id2_set:
+                sql = "INSERT INTO %s (%s_id,%s_id) VALUES (%d,%d)"% \
+                      (self.tablename,self.lowerclass1,self.lowerclass2,id1,id2)
+
+# below is Tony's code but it's badly broken. I'm not sure we care much in fact.
+#          if id2_set:
+#              sql = "INSERT INTO %s select %d, %d " % \
+#                      self.tablename, id1, id2[0] 
+#              for id2 in id2_set[1:]:
+#                      sql += " UNION ALL SELECT %d, %d " % \
+#                      (id1,id2)
+
                self.api.db.do (sql)
 
        def update_item (self, id1, old_id2s, new_id2s):
@@ -113,16 +148,15 @@ class Cache:
     # classname: the type of objects we are talking about;       e.g. 'Slice'
     # peer_object_list list of objects at a given peer -         e.g. peer.GetSlices()
     # alien_xref_objs_dict : a dict {'classname':alien_obj_list} e.g. {'Node':peer.GetNodes()}
-    # his must match the keys in xref_specs
-    # lambda_ignore : the alien objects are ignored if this returns true
+    #    we need an entry for each class mentioned in the class's foreign_xrefs
     def update_table (self,
                       classname,
                       alien_object_list,
                      alien_xref_objs_dict = {},
-                      lambda_ignore=lambda x:False):
+                      report_name_conflicts = True):
         
-        peer = self.peer
-        peer_id = peer['peer_id']
+        verbose ("============================== entering update_table on",classname)
+        peer_id=self.peer_id
 
        attrs = class_attributes (classname)
        row_class = attrs['row_class']
@@ -134,11 +168,15 @@ class Cache:
 
        ## allocate transcoders and xreftables once, for each item in foreign_xrefs
        # create a dict 'classname' -> {'transcoder' : ..., 'xref_table' : ...}
-       accessories = dict(
-           [ (xref_classname,
-              {'transcoder':Cache.Transcoder (self.api,xref_classname,alien_xref_objs_dict[xref_classname]),
-               'xref_table':Cache.XrefTable (self.api,xref_spec['table'],classname,xref_classname)})
-             for xref_classname,xref_spec in foreign_xrefs.iteritems()])
+        xref_accessories = dict(
+            [ (xref['field'],
+               {'transcoder' : Cache.Transcoder (self.api,xref['class'],alien_xref_objs_dict[xref['class']]),
+                'xref_table' : Cache.XrefTable (self.api,xref['table'],classname,xref['class'])})
+              for xref in foreign_xrefs ])
+
+        # the fields that are direct references, like e.g. site_id in Node
+        # determined lazily, we need an alien_object to do that, and we may have none here
+        direct_ref_fields = None
 
         ### get current local table
         # get ALL local objects so as to cope with
@@ -146,12 +184,10 @@ class Cache:
        # (*) or naming conflicts
         local_objects = table_class (self.api)
         ### index upon class_key for future searches
-       #verbose ('local objects:',local_objects)
-       verbose ('class_key',class_key)
         local_objects_index = local_objects.dict(class_key)
-       verbose ('update_table',classname,local_objects_index.keys())
 
        ### mark entries for this peer outofdate
+        new_count=0
         old_count=0;
        for local_object in local_objects:
            if local_object['peer_id'] == peer_id:
@@ -160,29 +196,31 @@ class Cache:
            else:
                local_object.uptodate=True
 
-        new_count=0
+        for alien_object in alien_object_list:
+            verbose ('+++ Got alien object',alien_object)
+
         # scan the peer's local objects
         for alien_object in alien_object_list:
 
             object_name = alien_object[class_key]
 
-            ### ignore, e.g. system-wide slices
-            if lambda_ignore(alien_object):
-               verbose('Ignoring',object_name)
-                continue
+           verbose ('----- update_table (%s) - considering'%classname,object_name)
+
+            # optimizing : avoid doing useless syncs
+            needs_sync = False
 
-           verbose ('update_table - Considering',object_name)
-                
             # create or update
             try:
                 ### We know about this object already
                 local_object = local_objects_index[object_name]
                if local_object ['peer_id'] is None:
-                   ### xxx send e-mail
-                   print 'We are in trouble here'
-                   print 'The %s object named %s is natively defined twice'%(classname,object_name)
-                   print 'Once on this PLC and once on peer %d'%peer_id
-                   print 'We dont raise an exception so that the remaining updates can still take place'
+                    if report_name_conflicts:
+                       ### xxx send e-mail
+                        print '!!!!!!!!!! We are in trouble here'
+                        print 'The %s object named %s is natively defined twice, '%(classname,object_name),
+                        print 'once on this PLC and once on peer %d'%peer_id
+                        print 'We dont raise an exception so that the remaining updates can still take place'
+                        print '!!!!!!!!!!'
                    continue
                 if local_object['peer_id'] != peer_id:
                     ### the object has changed its plc, 
@@ -190,6 +228,12 @@ class Cache:
                    ### we can assume the object just moved
                    ### needs to update peer_id though
                     local_object['peer_id'] = peer_id
+                    needs_sync = True
+                # update all fields as per foreign_fields
+                for field in foreign_fields:
+                    if (local_object[field] != alien_object [field]):
+                        local_object[field]=alien_object[field]
+                        needs_sync = True
                verbose ('update_table FOUND',object_name)
            except:
                 ### create a new entry
@@ -198,24 +242,41 @@ class Cache:
                 # insert in index
                 local_objects_index[class_key]=local_object
                verbose ('update_table CREATED',object_name)
-
-            # go on with update
-            for field in foreign_fields:
-                local_object[field]=alien_object[field]
+                # update all fields as per foreign_fields
+                for field in foreign_fields:
+                    local_object[field]=alien_object[field]
+                # this is tricky; at this point we may have primary_key unspecified,
+                # but we need it for handling xrefs below, so we'd like to sync to get one
+                # on the other hand some required fields may be still missing so
+                #  the DB would refuse to sync in this case (e.g. site_id in Node)
+                # so let's fill them with 1 so we can sync, this will be overridden below
+                # lazily determine this set of fields now
+                if direct_ref_fields is None:
+                    direct_ref_fields=[]
+                    for xref in foreign_xrefs:
+                        field=xref['field']
+                        #verbose('checking field %s for direct_ref'%field)
+                        if isinstance(alien_object[field],int):
+                            direct_ref_fields.append(field)
+                    verbose("FOUND DIRECT REFS",direct_ref_fields)
+                for field in direct_ref_fields:
+                    local_object[field]=1
+                verbose('Early sync on',local_object)
+                local_object.sync()
+                needs_sync = False
 
             # this row is now valid
             local_object.uptodate=True
             new_count += 1
-            local_object.sync()
 
            # manage cross-refs
-           for xref_classname,xref_spec in foreign_xrefs.iteritems():
-               field=xref_spec['field']
-               alien_xref_obj_list = alien_xref_objs_dict[xref_classname]
+           for xref in foreign_xrefs:
+               field=xref['field']
+               alien_xref_obj_list = alien_xref_objs_dict[xref['class']]
                alien_value = alien_object[field]
+               transcoder = xref_accessories[xref['field']]['transcoder']
                if isinstance (alien_value,list):
-                   verbose ('update_table list-transcoding ',xref_classname,' aliens=',alien_value,)
-                   transcoder = accessories[xref_classname]['transcoder']
+                   #verbose ('update_table list-transcoding ',xref['class'],' aliens=',alien_value,)
                    local_values=[]
                    for a in alien_value:
                        try:
@@ -223,21 +284,28 @@ class Cache:
                        except:
                            # could not transcode - might be from another peer that we dont know about..
                            pass
-                   verbose (" transcoded as ",local_values)
-                   xref_table = accessories[xref_classname]['xref_table']
-                   # newly created objects dont have xrefs yet
+                   #verbose (" transcoded as ",local_values)
+                   xref_table = xref_accessories[xref['field']]['xref_table']
+                   # newly created objects dont have xref fields set yet
                    try:
-                       former_xrefs=local_object[xref_spec['field']]
+                       former_xrefs=local_object[xref['field']]
                    except:
                        former_xrefs=[]
                    xref_table.update_item (local_object[primary_key],
                                            former_xrefs,
                                            local_values)
                elif isinstance (alien_value,int):
+                   #verbose ('update_table atom-transcoding ',xref['class'],' aliens=',alien_value,)
                    new_value = transcoder.transcode(alien_value)
-                   local_object[field] = new_value
-                   local_object.sync()
-
+                    if local_object[field] != new_value:
+                        local_object[field] = new_value
+                        needs_sync = True
+
+            ### this object is completely updated, let's save it
+            if needs_sync:
+                verbose('FINAL sync on %s:'%object_name,local_object)
+                local_object.sync(False)
+                    
        ### delete entries that are not uptodate
         for local_object in local_objects:
             if not local_object.uptodate:
@@ -247,65 +315,222 @@ class Cache:
 
         ### return delta in number of objects 
         return new_count-old_count
-                
-    def refresh_nodes (self, peer_get_nodes):
-        """
-        refreshes the foreign_nodes and peer_node tables
-        expected input is the current list of local nodes
-        as returned from the peer by GetNodes {'peer_id':None}
 
-        returns the number of new nodes (can be negative)
-        """
+    # slice attributes exhibit a special behaviour
+    # because there is no name we can use to retrieve/check for equality
+    # this object is like a 3-part xref, linking slice_attribute_type, slice,
+    #    and potentially node, together with a value that can change over time.
+    # extending the generic model to support a lambda rather than class_key
+    #    would clearly become overkill
+    def update_slice_attributes (self,
+                                 alien_slice_attributes,
+                                 alien_nodes,
+                                 alien_slices):
 
-        return self.update_table ('Node', peer_get_nodes)
-        
-    def refresh_slices (self, peer_get_slices, peer_foreign_nodes):
-        """
-        refreshes the foreign_slices and peer_slice tables
-        expected input is the current list of slices as returned by GetSlices
-
-        returns the number of new slices on this peer (can be negative)
-        """
-
-       # xxx use 'system' flag for finding system slices
-        return self.update_table ('Slice', peer_get_slices,
-                                 {'Node':peer_foreign_nodes},
-                                 lambda x: x['creator_person_id']==1)
+        from PLC.SliceAttributeTypes import SliceAttributeTypes
+        from PLC.SliceAttributes import SliceAttribute, SliceAttributes
+
+        verbose ("============================== entering update_slice_attributes")
+
+        # init
+        peer_id = self.peer_id
         
+        # create transcoders
+        node_xcoder = Cache.Transcoder (self.api, 'Node', alien_nodes)
+        slice_xcoder= Cache.Transcoder (self.api, 'Slice', alien_slices)
+        # no need to transcode SliceAttributeTypes, we have a name in the result
+        local_sat_dict = SliceAttributeTypes(self.api).dict('name')
+               
+        # load local objects
+        local_objects = SliceAttributes (self.api,{'peer_id':peer_id})
+
+       ### mark entries for this peer outofdate
+        new_count = 0
+        old_count=len(local_objects)
+       for local_object in local_objects:
+            local_object.uptodate=False
+
+        for alien_object in alien_slice_attributes:
+
+            verbose('----- update_slice_attributes: considering ...')
+            verbose('   ',alien_object)
+
+            # locate local slice
+            try:
+                slice_id = slice_xcoder.transcode(alien_object['slice_id'])
+            except:
+                verbose('update_slice_attributes: unable to locate slice',
+                        alien_object['slice_id'])
+                continue
+            # locate slice_attribute_type
+            try:
+                sat_id = local_sat_dict[alien_object['name']]['attribute_type_id']
+            except:
+                verbose('update_slice_attributes: unable to locate slice attribute type',
+                        alien_object['name'])
+                continue
+            # locate local node if specified
+            try:
+                alien_node_id = alien_object['node_id']
+                if alien_node_id is not None:
+                    node_id = node_xcoder.transcode(alien_node_id)
+                else:
+                    node_id=None
+            except:
+                verbose('update_slice_attributes: unable to locate node',
+                        alien_object['node_id'])
+                continue
+
+            # locate the local SliceAttribute if any
+            try:
+                verbose ('searching name=', alien_object['name'],
+                         'slice_id',slice_id, 'node_id',node_id)
+                local_object = SliceAttributes (self.api,
+                                                {'name':alien_object['name'],
+                                                 'slice_id':slice_id,
+                                                 'node_id':node_id})[0]
+                
+                if local_object['peer_id'] != peer_id:
+                    verbose ('FOUND local sa - skipped')
+                    continue
+                verbose('FOUND already cached sa - setting value')
+                local_object['value'] = alien_object['value']
+            # create it if missing
+            except:
+                local_object = SliceAttribute(self.api,
+                                              {'peer_id':peer_id,
+                                               'slice_id':slice_id,
+                                               'node_id':node_id,
+                                               'attribute_type_id':sat_id,
+                                               'value':alien_object['value']})
+                verbose('CREATED new sa')
+            local_object.uptodate=True
+            new_count += 1
+            local_object.sync(False)
+
+        for local_object in local_objects:
+            if not local_object.uptodate:
+                local_object.delete()
+
+        self.api.db.commit()
+        ### return delta in number of objects 
+        return new_count-old_count
+
     def refresh_peer (self):
        
-        peer_local_slices = self.peer_server.GetSlices(self.auth,{'peer_id':None})
+       # so as to minimize the numer of requests
+       # we get all objects in a single call and sort afterwards
+       # xxx ideally get objects either local or the ones attached here
+       # requires to know remote peer's peer_id for ourselves, mmhh..
+       # does not make any difference in a 2-peer deployment though
+
+        ### uses GetPeerData to gather all info in a single xmlrpc request
+
+        timers={}
+        t_start=time.time()
+        # xxx see also GetPeerData - peer_id arg unused yet
+        all_data = self.peer_server.GetPeerData (self.api.config.PLC_NAME)
+
+        verbose ('Passed GetPeerData the name',self.api.config.PLC_NAME)
+        sks=all_data.keys()
+        sks.sort()
+        for k in sks:
+            f=all_data[k]
+            try:
+                verbose ('GetPeerData[%s] -> %d'%(k,len(f)))
+            except:
+                pass
+
+        t_acquired = time.time()
+       # refresh sites
+       plocal_sites = all_data['Sites-local']
+        all_sites = plocal_sites + all_data['Sites-peer']
+       nb_new_sites = self.update_table('Site', plocal_sites)
+
+        t0 = time.time()
+        timers['process-sites']=t0-t_acquired
+        
 
        # refresh keys
-       peer_local_keys = self.peer_server.GetKeys(self.auth,{'peer_id':None})
-       nb_new_keys = self.update_table('Key', peer_local_keys)
+       plocal_keys = all_data['Keys-local']
+        all_keys = plocal_keys + all_data['Keys-peer']
+       nb_new_keys = self.update_table('Key', plocal_keys)
+
+        t=time.time()
+        timers['process-keys']=t-t0
+        t0=t
 
        # refresh nodes
-        peer_local_nodes = self.peer_server.GetNodes(self.auth,{'peer_id':None})
-        nb_new_nodes = self.update_table('Node', peer_local_nodes)
+       plocal_nodes = all_data['Nodes-local']
+        all_nodes = plocal_nodes + all_data['Nodes-peer']
+        nb_new_nodes = self.update_table('Node', plocal_nodes,
+                                        { 'Site' : all_sites } )
+
+        t=time.time()
+        timers['process-nodes']=t-t0
+        t0=t
 
        # refresh persons
-       peer_local_persons = self.peer_server.GetPersons(self.auth,{'peer_id':None})
-       # xxx ideally get our own persons only
-       # requires to know remote peer's peer_id for ourselves, mmhh
-       peer_all_keys = peer_local_keys + self.peer_server.GetKeys(self.auth,{'~peer_id':None})
-       nb_new_persons = self.update_table ('Person', peer_local_persons,
-                                           { 'Key': peer_all_keys} )
+       plocal_persons = all_data['Persons-local']
+        all_persons = plocal_persons + all_data['Persons-peer']
+       nb_new_persons = self.update_table ('Person', plocal_persons,
+                                           { 'Key': all_keys, 'Site' : all_sites } )
 
-       # refresh slices
-       def is_system_slice (slice):
-           return slice['creator_person_id'] == 1
-       # xxx would ideally get our own nodes only, 
-        peer_all_nodes = peer_local_nodes+self.peer_server.GetNodes(self.auth,{'~peer_id':None})
+        t=time.time()
+        timers['process-persons']=t-t0
+        t0=t
+
+        # refresh slice attribute types
+        plocal_slice_attribute_types = all_data ['SliceAttibuteTypes-local']
+        nb_new_slice_attribute_types = self.update_table ('SliceAttributeType',
+                                                          plocal_slice_attribute_types,
+                                                          report_name_conflicts = False)
 
-        nb_new_slices = self.update_table ('Slice', peer_local_slices,
-                                          {'Node':peer_all_nodes},
-                                          is_system_slice)
+        t=time.time()
+        timers['process-sat']=t-t0
+        t0=t
 
+       # refresh slices
+        plocal_slices = all_data['Slices-local']
+        all_slices = plocal_slices + all_data['Slices-peer']
+
+        # forget about ignoring remote system slices
+        # just update them too, we'll be able to filter them out later in GetSlivers
+
+        nb_new_slices = self.update_table ('Slice', plocal_slices,
+                                           {'Node': all_nodes,
+                                            'Person': all_persons,
+                                            'Site': all_sites})
+
+        t=time.time()
+        timers['process-slices']=t-t0
+        t0=t
+
+        # refresh slice attributes
+        plocal_slice_attributes = all_data ['SliceAttributes-local']
+        nb_new_slice_attributes = self.update_slice_attributes (plocal_slice_attributes,
+                                                                all_nodes,
+                                                                all_slices)
+        t=time.time()
+        timers['process-sa']=t-t0
+        t0=t
+        
+        t_end=time.time()
 
-        return {'plcname':self.api.config.PLC_NAME,
+        timers['time_gather']   = all_data['ellapsed']
+        timers['time_transmit'] = t_acquired-t_start-all_data['ellapsed']
+        timers['time_process']  = t_end-t_acquired
+        timers['time_all']      = t_end-t_start
+        
+        ### returned as-is by RefreshPeer
+        return {
+               'new_sites':nb_new_sites,
                'new_keys':nb_new_keys,
                 'new_nodes':nb_new_nodes,
                'new_persons':nb_new_persons,
-                'new_slices':nb_new_slices}
+                'new_slice_attribute_types':nb_new_slice_attribute_types,
+                'new_slices':nb_new_slices,
+                'new_slice_attributes':nb_new_slice_attributes,
+                'timers':timers,
+                }