X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=PLC%2FCache.py;h=21b8950c0ff7cc68d901ebfcd3b90385e4d31960;hb=0395bfd9e88b0f5fbfcbfd91c95fa5baaa9e7fe0;hp=ff9f1807b8063b02dcb8adf2853ba8860f01219b;hpb=79b4fa2971f7f6fca8dba0d0875a94556b26764f;p=plcapi.git diff --git a/PLC/Cache.py b/PLC/Cache.py index ff9f180..21b8950 100644 --- a/PLC/Cache.py +++ b/PLC/Cache.py @@ -1,3 +1,5 @@ +import time + from PLC.Faults import * from PLC.Parameter import Parameter from PLC.Filter import Filter @@ -28,18 +30,40 @@ def class_attributes (classname): class Cache: - # an attempt to provide genericity in the caching algorithm + """ + This class is the core of the RefreshPeer method's implementation, + that basically calls Cache:refresh_peer + + It manages caching on a table-by-table basis, and performs required + transcoding from remote ids to local ids - that's the job of the + Transcoder class + + For the tables (classes) that it handles, it uses the following + attributes + (*) primary_key is the name of the field where db indexes are stored + (*) class_key is used to implement equality on objects + (*) foreign_fields is the list of fields that are copied verbatim from + foreign objects + (*) foreign_xrefs is the list of columns that are subject to transcoding + (.) when obj[field] is an int, the transcoded value is directly + inserted in the table + (.) if it's a list, it is assumed that the relationship is maintained + in an external 2-column table. That's where the XrefTable class comes in + + The relationship between slices, slice attribute types and slice attributes being + more complex, it is handled in a specifically customized version of update_table + + Of course the order in which the tables are managed is important, with different + orders several iterations might be needed for the update process to converge. + + Of course the timers field was introduced for optimization and could safely be removed + """ - # the Peer object we are syncing with - def __init__ (self, api, peer, peer_server, auth): - - import PLC.Peers + def __init__ (self, api, peer_id, peer_server): self.api = api - assert isinstance(peer,PLC.Peers.Peer) - self.peer = peer + self.peer_id = peer_id self.peer_server = peer_server - self.auth = auth class Transcoder: @@ -63,18 +87,19 @@ class Cache: def transcode (self, alien_id): """ transforms an alien id into a local one """ # locate alien obj from alien_id - verbose ('entering transcode with alien_id',alien_id,) + #verbose ('.entering transcode with alien_id',alien_id,) alien_object=self.alien_objects_byid[alien_id] - verbose ('located alien_obj',) + #verbose ('..located alien_obj',) name = alien_object [self.class_key] - verbose ('got name',name,) + #verbose ('...got name',name,) local_object=self.local_objects_byname[name] - verbose ('found local obj') + #verbose ('....found local obj') local_id=local_object[self.primary_key] - verbose ('and local_id',local_id) + #verbose ('.....and local_id',local_id) return local_id + # for handling simple n-to-n relation tables, like e.g. slice_node class XrefTable: def __init__ (self, api, tablename, class1, class2): @@ -93,12 +118,22 @@ class Cache: self.api.db.do (sql) def insert_new_items (self, id1, id2_set): - ### xxx needs to be optimized - ### tried to figure a way to use a single sql statement - ### like: insert into table (x,y) values (1,2),(3,4); - ### but apparently this is not supported under postgresql - for id2 in id2_set: - sql = "INSERT INTO %s VALUES (%d,%d)"%(self.tablename,id1,id2) + ### xxx needs to be optimized + ### tried to figure a way to use a single sql statement + ### like: insert into table (x,y) values (1,2),(3,4); + ### but apparently this is not supported under postgresql + for id2 in id2_set: + sql = "INSERT INTO %s (%s_id,%s_id) VALUES (%d,%d)"% \ + (self.tablename,self.lowerclass1,self.lowerclass2,id1,id2) + +# below is Tony's code but it's badly broken. I'm not sure we care much in fact. +# if id2_set: +# sql = "INSERT INTO %s select %d, %d " % \ +# self.tablename, id1, id2[0] +# for id2 in id2_set[1:]: +# sql += " UNION ALL SELECT %d, %d " % \ +# (id1,id2) + self.api.db.do (sql) def update_item (self, id1, old_id2s, new_id2s): @@ -113,16 +148,15 @@ class Cache: # classname: the type of objects we are talking about; e.g. 'Slice' # peer_object_list list of objects at a given peer - e.g. peer.GetSlices() # alien_xref_objs_dict : a dict {'classname':alien_obj_list} e.g. {'Node':peer.GetNodes()} - # his must match the keys in xref_specs - # lambda_ignore : the alien objects are ignored if this returns true + # we need an entry for each class mentioned in the class's foreign_xrefs def update_table (self, classname, alien_object_list, alien_xref_objs_dict = {}, - lambda_ignore=lambda x:False): + report_name_conflicts = True): - peer = self.peer - peer_id = peer['peer_id'] + verbose ("============================== entering update_table on",classname) + peer_id=self.peer_id attrs = class_attributes (classname) row_class = attrs['row_class'] @@ -134,11 +168,15 @@ class Cache: ## allocate transcoders and xreftables once, for each item in foreign_xrefs # create a dict 'classname' -> {'transcoder' : ..., 'xref_table' : ...} - accessories = dict( - [ (xref_classname, - {'transcoder':Cache.Transcoder (self.api,xref_classname,alien_xref_objs_dict[xref_classname]), - 'xref_table':Cache.XrefTable (self.api,xref_spec['table'],classname,xref_classname)}) - for xref_classname,xref_spec in foreign_xrefs.iteritems()]) + xref_accessories = dict( + [ (xref['field'], + {'transcoder' : Cache.Transcoder (self.api,xref['class'],alien_xref_objs_dict[xref['class']]), + 'xref_table' : Cache.XrefTable (self.api,xref['table'],classname,xref['class'])}) + for xref in foreign_xrefs ]) + + # the fields that are direct references, like e.g. site_id in Node + # determined lazily, we need an alien_object to do that, and we may have none here + direct_ref_fields = None ### get current local table # get ALL local objects so as to cope with @@ -146,12 +184,10 @@ class Cache: # (*) or naming conflicts local_objects = table_class (self.api) ### index upon class_key for future searches - #verbose ('local objects:',local_objects) - verbose ('class_key',class_key) local_objects_index = local_objects.dict(class_key) - verbose ('update_table',classname,local_objects_index.keys()) ### mark entries for this peer outofdate + new_count=0 old_count=0; for local_object in local_objects: if local_object['peer_id'] == peer_id: @@ -160,29 +196,31 @@ class Cache: else: local_object.uptodate=True - new_count=0 + for alien_object in alien_object_list: + verbose ('+++ Got alien object',alien_object) + # scan the peer's local objects for alien_object in alien_object_list: object_name = alien_object[class_key] - ### ignore, e.g. system-wide slices - if lambda_ignore(alien_object): - verbose('Ignoring',object_name) - continue + verbose ('----- update_table (%s) - considering'%classname,object_name) + + # optimizing : avoid doing useless syncs + needs_sync = False - verbose ('update_table - Considering',object_name) - # create or update try: ### We know about this object already local_object = local_objects_index[object_name] if local_object ['peer_id'] is None: - ### xxx send e-mail - print 'We are in trouble here' - print 'The %s object named %s is natively defined twice'%(classname,object_name) - print 'Once on this PLC and once on peer %d'%peer_id - print 'We dont raise an exception so that the remaining updates can still take place' + if report_name_conflicts: + ### xxx send e-mail + print '!!!!!!!!!! We are in trouble here' + print 'The %s object named %s is natively defined twice, '%(classname,object_name), + print 'once on this PLC and once on peer %d'%peer_id + print 'We dont raise an exception so that the remaining updates can still take place' + print '!!!!!!!!!!' continue if local_object['peer_id'] != peer_id: ### the object has changed its plc, @@ -190,6 +228,12 @@ class Cache: ### we can assume the object just moved ### needs to update peer_id though local_object['peer_id'] = peer_id + needs_sync = True + # update all fields as per foreign_fields + for field in foreign_fields: + if (local_object[field] != alien_object [field]): + local_object[field]=alien_object[field] + needs_sync = True verbose ('update_table FOUND',object_name) except: ### create a new entry @@ -198,24 +242,41 @@ class Cache: # insert in index local_objects_index[class_key]=local_object verbose ('update_table CREATED',object_name) - - # go on with update - for field in foreign_fields: - local_object[field]=alien_object[field] + # update all fields as per foreign_fields + for field in foreign_fields: + local_object[field]=alien_object[field] + # this is tricky; at this point we may have primary_key unspecified, + # but we need it for handling xrefs below, so we'd like to sync to get one + # on the other hand some required fields may be still missing so + # the DB would refuse to sync in this case (e.g. site_id in Node) + # so let's fill them with 1 so we can sync, this will be overridden below + # lazily determine this set of fields now + if direct_ref_fields is None: + direct_ref_fields=[] + for xref in foreign_xrefs: + field=xref['field'] + #verbose('checking field %s for direct_ref'%field) + if isinstance(alien_object[field],int): + direct_ref_fields.append(field) + verbose("FOUND DIRECT REFS",direct_ref_fields) + for field in direct_ref_fields: + local_object[field]=1 + verbose('Early sync on',local_object) + local_object.sync() + needs_sync = False # this row is now valid local_object.uptodate=True new_count += 1 - local_object.sync() # manage cross-refs - for xref_classname,xref_spec in foreign_xrefs.iteritems(): - field=xref_spec['field'] - alien_xref_obj_list = alien_xref_objs_dict[xref_classname] + for xref in foreign_xrefs: + field=xref['field'] + alien_xref_obj_list = alien_xref_objs_dict[xref['class']] alien_value = alien_object[field] + transcoder = xref_accessories[xref['field']]['transcoder'] if isinstance (alien_value,list): - verbose ('update_table list-transcoding ',xref_classname,' aliens=',alien_value,) - transcoder = accessories[xref_classname]['transcoder'] + #verbose ('update_table list-transcoding ',xref['class'],' aliens=',alien_value,) local_values=[] for a in alien_value: try: @@ -223,21 +284,28 @@ class Cache: except: # could not transcode - might be from another peer that we dont know about.. pass - verbose (" transcoded as ",local_values) - xref_table = accessories[xref_classname]['xref_table'] - # newly created objects dont have xrefs yet + #verbose (" transcoded as ",local_values) + xref_table = xref_accessories[xref['field']]['xref_table'] + # newly created objects dont have xref fields set yet try: - former_xrefs=local_object[xref_spec['field']] + former_xrefs=local_object[xref['field']] except: former_xrefs=[] xref_table.update_item (local_object[primary_key], former_xrefs, local_values) elif isinstance (alien_value,int): + #verbose ('update_table atom-transcoding ',xref['class'],' aliens=',alien_value,) new_value = transcoder.transcode(alien_value) - local_object[field] = new_value - local_object.sync() - + if local_object[field] != new_value: + local_object[field] = new_value + needs_sync = True + + ### this object is completely updated, let's save it + if needs_sync: + verbose('FINAL sync on %s:'%object_name,local_object) + local_object.sync(False) + ### delete entries that are not uptodate for local_object in local_objects: if not local_object.uptodate: @@ -247,65 +315,222 @@ class Cache: ### return delta in number of objects return new_count-old_count - - def refresh_nodes (self, peer_get_nodes): - """ - refreshes the foreign_nodes and peer_node tables - expected input is the current list of local nodes - as returned from the peer by GetNodes {'peer_id':None} - returns the number of new nodes (can be negative) - """ + # slice attributes exhibit a special behaviour + # because there is no name we can use to retrieve/check for equality + # this object is like a 3-part xref, linking slice_attribute_type, slice, + # and potentially node, together with a value that can change over time. + # extending the generic model to support a lambda rather than class_key + # would clearly become overkill + def update_slice_attributes (self, + alien_slice_attributes, + alien_nodes, + alien_slices): - return self.update_table ('Node', peer_get_nodes) - - def refresh_slices (self, peer_get_slices, peer_foreign_nodes): - """ - refreshes the foreign_slices and peer_slice tables - expected input is the current list of slices as returned by GetSlices - - returns the number of new slices on this peer (can be negative) - """ - - # xxx use 'system' flag for finding system slices - return self.update_table ('Slice', peer_get_slices, - {'Node':peer_foreign_nodes}, - lambda x: x['creator_person_id']==1) + from PLC.SliceAttributeTypes import SliceAttributeTypes + from PLC.SliceAttributes import SliceAttribute, SliceAttributes + + verbose ("============================== entering update_slice_attributes") + + # init + peer_id = self.peer_id + # create transcoders + node_xcoder = Cache.Transcoder (self.api, 'Node', alien_nodes) + slice_xcoder= Cache.Transcoder (self.api, 'Slice', alien_slices) + # no need to transcode SliceAttributeTypes, we have a name in the result + local_sat_dict = SliceAttributeTypes(self.api).dict('name') + + # load local objects + local_objects = SliceAttributes (self.api,{'peer_id':peer_id}) + + ### mark entries for this peer outofdate + new_count = 0 + old_count=len(local_objects) + for local_object in local_objects: + local_object.uptodate=False + + for alien_object in alien_slice_attributes: + + verbose('----- update_slice_attributes: considering ...') + verbose(' ',alien_object) + + # locate local slice + try: + slice_id = slice_xcoder.transcode(alien_object['slice_id']) + except: + verbose('update_slice_attributes: unable to locate slice', + alien_object['slice_id']) + continue + # locate slice_attribute_type + try: + sat_id = local_sat_dict[alien_object['name']]['attribute_type_id'] + except: + verbose('update_slice_attributes: unable to locate slice attribute type', + alien_object['name']) + continue + # locate local node if specified + try: + alien_node_id = alien_object['node_id'] + if alien_node_id is not None: + node_id = node_xcoder.transcode(alien_node_id) + else: + node_id=None + except: + verbose('update_slice_attributes: unable to locate node', + alien_object['node_id']) + continue + + # locate the local SliceAttribute if any + try: + verbose ('searching name=', alien_object['name'], + 'slice_id',slice_id, 'node_id',node_id) + local_object = SliceAttributes (self.api, + {'name':alien_object['name'], + 'slice_id':slice_id, + 'node_id':node_id})[0] + + if local_object['peer_id'] != peer_id: + verbose ('FOUND local sa - skipped') + continue + verbose('FOUND already cached sa - setting value') + local_object['value'] = alien_object['value'] + # create it if missing + except: + local_object = SliceAttribute(self.api, + {'peer_id':peer_id, + 'slice_id':slice_id, + 'node_id':node_id, + 'attribute_type_id':sat_id, + 'value':alien_object['value']}) + verbose('CREATED new sa') + local_object.uptodate=True + new_count += 1 + local_object.sync(False) + + for local_object in local_objects: + if not local_object.uptodate: + local_object.delete() + + self.api.db.commit() + ### return delta in number of objects + return new_count-old_count + def refresh_peer (self): - peer_local_slices = self.peer_server.GetSlices(self.auth,{'peer_id':None}) + # so as to minimize the numer of requests + # we get all objects in a single call and sort afterwards + # xxx ideally get objects either local or the ones attached here + # requires to know remote peer's peer_id for ourselves, mmhh.. + # does not make any difference in a 2-peer deployment though + + ### uses GetPeerData to gather all info in a single xmlrpc request + + timers={} + t_start=time.time() + # xxx see also GetPeerData - peer_id arg unused yet + all_data = self.peer_server.GetPeerData (self.api.config.PLC_NAME) + + verbose ('Passed GetPeerData the name',self.api.config.PLC_NAME) + sks=all_data.keys() + sks.sort() + for k in sks: + f=all_data[k] + try: + verbose ('GetPeerData[%s] -> %d'%(k,len(f))) + except: + pass + + t_acquired = time.time() + # refresh sites + plocal_sites = all_data['Sites-local'] + all_sites = plocal_sites + all_data['Sites-peer'] + nb_new_sites = self.update_table('Site', plocal_sites) + + t0 = time.time() + timers['process-sites']=t0-t_acquired + # refresh keys - peer_local_keys = self.peer_server.GetKeys(self.auth,{'peer_id':None}) - nb_new_keys = self.update_table('Key', peer_local_keys) + plocal_keys = all_data['Keys-local'] + all_keys = plocal_keys + all_data['Keys-peer'] + nb_new_keys = self.update_table('Key', plocal_keys) + + t=time.time() + timers['process-keys']=t-t0 + t0=t # refresh nodes - peer_local_nodes = self.peer_server.GetNodes(self.auth,{'peer_id':None}) - nb_new_nodes = self.update_table('Node', peer_local_nodes) + plocal_nodes = all_data['Nodes-local'] + all_nodes = plocal_nodes + all_data['Nodes-peer'] + nb_new_nodes = self.update_table('Node', plocal_nodes, + { 'Site' : all_sites } ) + + t=time.time() + timers['process-nodes']=t-t0 + t0=t # refresh persons - peer_local_persons = self.peer_server.GetPersons(self.auth,{'peer_id':None}) - # xxx ideally get our own persons only - # requires to know remote peer's peer_id for ourselves, mmhh - peer_all_keys = peer_local_keys + self.peer_server.GetKeys(self.auth,{'~peer_id':None}) - nb_new_persons = self.update_table ('Person', peer_local_persons, - { 'Key': peer_all_keys} ) + plocal_persons = all_data['Persons-local'] + all_persons = plocal_persons + all_data['Persons-peer'] + nb_new_persons = self.update_table ('Person', plocal_persons, + { 'Key': all_keys, 'Site' : all_sites } ) - # refresh slices - def is_system_slice (slice): - return slice['creator_person_id'] == 1 - # xxx would ideally get our own nodes only, - peer_all_nodes = peer_local_nodes+self.peer_server.GetNodes(self.auth,{'~peer_id':None}) + t=time.time() + timers['process-persons']=t-t0 + t0=t + + # refresh slice attribute types + plocal_slice_attribute_types = all_data ['SliceAttibuteTypes-local'] + nb_new_slice_attribute_types = self.update_table ('SliceAttributeType', + plocal_slice_attribute_types, + report_name_conflicts = False) - nb_new_slices = self.update_table ('Slice', peer_local_slices, - {'Node':peer_all_nodes}, - is_system_slice) + t=time.time() + timers['process-sat']=t-t0 + t0=t + # refresh slices + plocal_slices = all_data['Slices-local'] + all_slices = plocal_slices + all_data['Slices-peer'] + + # forget about ignoring remote system slices + # just update them too, we'll be able to filter them out later in GetSlivers + + nb_new_slices = self.update_table ('Slice', plocal_slices, + {'Node': all_nodes, + 'Person': all_persons, + 'Site': all_sites}) + + t=time.time() + timers['process-slices']=t-t0 + t0=t + + # refresh slice attributes + plocal_slice_attributes = all_data ['SliceAttributes-local'] + nb_new_slice_attributes = self.update_slice_attributes (plocal_slice_attributes, + all_nodes, + all_slices) + t=time.time() + timers['process-sa']=t-t0 + t0=t + + t_end=time.time() - return {'plcname':self.api.config.PLC_NAME, + timers['time_gather'] = all_data['ellapsed'] + timers['time_transmit'] = t_acquired-t_start-all_data['ellapsed'] + timers['time_process'] = t_end-t_acquired + timers['time_all'] = t_end-t_start + + ### returned as-is by RefreshPeer + return { + 'new_sites':nb_new_sites, 'new_keys':nb_new_keys, 'new_nodes':nb_new_nodes, 'new_persons':nb_new_persons, - 'new_slices':nb_new_slices} + 'new_slice_attribute_types':nb_new_slice_attribute_types, + 'new_slices':nb_new_slices, + 'new_slice_attributes':nb_new_slice_attributes, + 'timers':timers, + }