X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=PLC%2FCache.py;h=21b8950c0ff7cc68d901ebfcd3b90385e4d31960;hb=6c38f39a5bd62c83ed181f1fe1e47f67446e7dec;hp=e94959918065cd90c8f1c10ec6dab22490daa22e;hpb=82e65a34b0c16a2079c5d0e846665d5a35dca51c;p=plcapi.git diff --git a/PLC/Cache.py b/PLC/Cache.py index e949599..21b8950 100644 --- a/PLC/Cache.py +++ b/PLC/Cache.py @@ -30,14 +30,40 @@ def class_attributes (classname): class Cache: - # an attempt to provide genericity in the caching algorithm + """ + This class is the core of the RefreshPeer method's implementation, + that basically calls Cache:refresh_peer + + It manages caching on a table-by-table basis, and performs required + transcoding from remote ids to local ids - that's the job of the + Transcoder class + + For the tables (classes) that it handles, it uses the following + attributes + (*) primary_key is the name of the field where db indexes are stored + (*) class_key is used to implement equality on objects + (*) foreign_fields is the list of fields that are copied verbatim from + foreign objects + (*) foreign_xrefs is the list of columns that are subject to transcoding + (.) when obj[field] is an int, the transcoded value is directly + inserted in the table + (.) if it's a list, it is assumed that the relationship is maintained + in an external 2-column table. That's where the XrefTable class comes in + + The relationship between slices, slice attribute types and slice attributes being + more complex, it is handled in a specifically customized version of update_table + + Of course the order in which the tables are managed is important, with different + orders several iterations might be needed for the update process to converge. + + Of course the timers field was introduced for optimization and could safely be removed + """ - def __init__ (self, api, peer_id, peer_server, auth): + def __init__ (self, api, peer_id, peer_server): self.api = api self.peer_id = peer_id self.peer_server = peer_server - self.auth = auth class Transcoder: @@ -92,12 +118,22 @@ class Cache: self.api.db.do (sql) def insert_new_items (self, id1, id2_set): - if id2_set: - sql = "INSERT INTO %s select %d, %d " % \ - self.tablename, id1, id2[0] - for id2 in id2_set[1:]: - sql += " UNION ALL SELECT %d, %d " % \ - (id1,id2) + ### xxx needs to be optimized + ### tried to figure a way to use a single sql statement + ### like: insert into table (x,y) values (1,2),(3,4); + ### but apparently this is not supported under postgresql + for id2 in id2_set: + sql = "INSERT INTO %s (%s_id,%s_id) VALUES (%d,%d)"% \ + (self.tablename,self.lowerclass1,self.lowerclass2,id1,id2) + +# below is Tony's code but it's badly broken. I'm not sure we care much in fact. +# if id2_set: +# sql = "INSERT INTO %s select %d, %d " % \ +# self.tablename, id1, id2[0] +# for id2 in id2_set[1:]: +# sql += " UNION ALL SELECT %d, %d " % \ +# (id1,id2) + self.api.db.do (sql) def update_item (self, id1, old_id2s, new_id2s): @@ -113,12 +149,10 @@ class Cache: # peer_object_list list of objects at a given peer - e.g. peer.GetSlices() # alien_xref_objs_dict : a dict {'classname':alien_obj_list} e.g. {'Node':peer.GetNodes()} # we need an entry for each class mentioned in the class's foreign_xrefs - # lambda_ignore : the alien objects are ignored if this returns true def update_table (self, classname, alien_object_list, alien_xref_objs_dict = {}, - lambda_ignore=lambda x:False, report_name_conflicts = True): verbose ("============================== entering update_table on",classname) @@ -152,8 +186,6 @@ class Cache: ### index upon class_key for future searches local_objects_index = local_objects.dict(class_key) - #verbose ('update_table',classname,local_objects_index.keys()) - ### mark entries for this peer outofdate new_count=0 old_count=0; @@ -164,18 +196,19 @@ class Cache: else: local_object.uptodate=True + for alien_object in alien_object_list: + verbose ('+++ Got alien object',alien_object) + # scan the peer's local objects for alien_object in alien_object_list: object_name = alien_object[class_key] - ### ignore, e.g. system-wide slices - if lambda_ignore(alien_object): - verbose('Ignoring',object_name) - continue + verbose ('----- update_table (%s) - considering'%classname,object_name) + + # optimizing : avoid doing useless syncs + needs_sync = False - verbose ('update_table (%s) - Considering'%classname,object_name) - # create or update try: ### We know about this object already @@ -195,9 +228,12 @@ class Cache: ### we can assume the object just moved ### needs to update peer_id though local_object['peer_id'] = peer_id + needs_sync = True # update all fields as per foreign_fields for field in foreign_fields: - local_object[field]=alien_object[field] + if (local_object[field] != alien_object [field]): + local_object[field]=alien_object[field] + needs_sync = True verbose ('update_table FOUND',object_name) except: ### create a new entry @@ -219,7 +255,7 @@ class Cache: direct_ref_fields=[] for xref in foreign_xrefs: field=xref['field'] - verbose('checking field %s for direct_ref'%field) + #verbose('checking field %s for direct_ref'%field) if isinstance(alien_object[field],int): direct_ref_fields.append(field) verbose("FOUND DIRECT REFS",direct_ref_fields) @@ -227,6 +263,7 @@ class Cache: local_object[field]=1 verbose('Early sync on',local_object) local_object.sync() + needs_sync = False # this row is now valid local_object.uptodate=True @@ -260,13 +297,15 @@ class Cache: elif isinstance (alien_value,int): #verbose ('update_table atom-transcoding ',xref['class'],' aliens=',alien_value,) new_value = transcoder.transcode(alien_value) - local_object[field] = new_value + if local_object[field] != new_value: + local_object[field] = new_value + needs_sync = True ### this object is completely updated, let's save it - verbose('FINAL sync on %s:'%object_name,local_object) - local_object.sync() + if needs_sync: + verbose('FINAL sync on %s:'%object_name,local_object) + local_object.sync(False) - ### delete entries that are not uptodate for local_object in local_objects: if not local_object.uptodate: @@ -291,6 +330,8 @@ class Cache: from PLC.SliceAttributeTypes import SliceAttributeTypes from PLC.SliceAttributes import SliceAttribute, SliceAttributes + verbose ("============================== entering update_slice_attributes") + # init peer_id = self.peer_id @@ -365,7 +406,7 @@ class Cache: verbose('CREATED new sa') local_object.uptodate=True new_count += 1 - local_object.sync() + local_object.sync(False) for local_object in local_objects: if not local_object.uptodate: @@ -385,9 +426,20 @@ class Cache: ### uses GetPeerData to gather all info in a single xmlrpc request + timers={} t_start=time.time() # xxx see also GetPeerData - peer_id arg unused yet - all_data = self.peer_server.GetPeerData (self.auth,0) + all_data = self.peer_server.GetPeerData (self.api.config.PLC_NAME) + + verbose ('Passed GetPeerData the name',self.api.config.PLC_NAME) + sks=all_data.keys() + sks.sort() + for k in sks: + f=all_data[k] + try: + verbose ('GetPeerData[%s] -> %d'%(k,len(f))) + except: + pass t_acquired = time.time() # refresh sites @@ -395,51 +447,83 @@ class Cache: all_sites = plocal_sites + all_data['Sites-peer'] nb_new_sites = self.update_table('Site', plocal_sites) + t0 = time.time() + timers['process-sites']=t0-t_acquired + + # refresh keys plocal_keys = all_data['Keys-local'] all_keys = plocal_keys + all_data['Keys-peer'] nb_new_keys = self.update_table('Key', plocal_keys) + t=time.time() + timers['process-keys']=t-t0 + t0=t + # refresh nodes plocal_nodes = all_data['Nodes-local'] all_nodes = plocal_nodes + all_data['Nodes-peer'] nb_new_nodes = self.update_table('Node', plocal_nodes, { 'Site' : all_sites } ) + t=time.time() + timers['process-nodes']=t-t0 + t0=t + # refresh persons plocal_persons = all_data['Persons-local'] all_persons = plocal_persons + all_data['Persons-peer'] nb_new_persons = self.update_table ('Person', plocal_persons, { 'Key': all_keys, 'Site' : all_sites } ) + t=time.time() + timers['process-persons']=t-t0 + t0=t + # refresh slice attribute types plocal_slice_attribute_types = all_data ['SliceAttibuteTypes-local'] nb_new_slice_attribute_types = self.update_table ('SliceAttributeType', plocal_slice_attribute_types, report_name_conflicts = False) + t=time.time() + timers['process-sat']=t-t0 + t0=t + # refresh slices plocal_slices = all_data['Slices-local'] all_slices = plocal_slices + all_data['Slices-peer'] - def is_system_slice (slice): - return slice['creator_person_id'] == 1 + # forget about ignoring remote system slices + # just update them too, we'll be able to filter them out later in GetSlivers nb_new_slices = self.update_table ('Slice', plocal_slices, {'Node': all_nodes, 'Person': all_persons, - 'Site': all_sites}, - is_system_slice) + 'Site': all_sites}) + + t=time.time() + timers['process-slices']=t-t0 + t0=t # refresh slice attributes plocal_slice_attributes = all_data ['SliceAttributes-local'] nb_new_slice_attributes = self.update_slice_attributes (plocal_slice_attributes, all_nodes, all_slices) + t=time.time() + timers['process-sa']=t-t0 + t0=t t_end=time.time() + + timers['time_gather'] = all_data['ellapsed'] + timers['time_transmit'] = t_acquired-t_start-all_data['ellapsed'] + timers['time_process'] = t_end-t_acquired + timers['time_all'] = t_end-t_start + ### returned as-is by RefreshPeer - return {'plcname':self.api.config.PLC_NAME, + return { 'new_sites':nb_new_sites, 'new_keys':nb_new_keys, 'new_nodes':nb_new_nodes, @@ -447,9 +531,6 @@ class Cache: 'new_slice_attribute_types':nb_new_slice_attribute_types, 'new_slices':nb_new_slices, 'new_slice_attributes':nb_new_slice_attributes, - 'time_gather': all_data['ellapsed'], - 'time_transmit':t_acquired-t_start-all_data['ellapsed'], - 'time_process':t_end-t_acquired, - 'time_all':t_end-t_start, + 'timers':timers, }