X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=PLC%2FCache.py;h=21b8950c0ff7cc68d901ebfcd3b90385e4d31960;hb=49e875c7e14b95697e3d0c7502d8457d3ce6378c;hp=61ebcfccafdd66cedc8e9b76e7e1723c24601f99;hpb=865c7ad3dba34691309904e49cb5885e8d9cd5a1;p=plcapi.git diff --git a/PLC/Cache.py b/PLC/Cache.py index 61ebcfc..21b8950 100644 --- a/PLC/Cache.py +++ b/PLC/Cache.py @@ -1,10 +1,12 @@ +import time + from PLC.Faults import * from PLC.Parameter import Parameter from PLC.Filter import Filter from PLC.Table import Row, Table verbose_flag=False; -verbose_flag=True; +#verbose_flag=True; def verbose (*args): if verbose_flag: print (args) @@ -28,14 +30,40 @@ def class_attributes (classname): class Cache: - # an attempt to provide genericity in the caching algorithm + """ + This class is the core of the RefreshPeer method's implementation, + that basically calls Cache:refresh_peer + + It manages caching on a table-by-table basis, and performs required + transcoding from remote ids to local ids - that's the job of the + Transcoder class + + For the tables (classes) that it handles, it uses the following + attributes + (*) primary_key is the name of the field where db indexes are stored + (*) class_key is used to implement equality on objects + (*) foreign_fields is the list of fields that are copied verbatim from + foreign objects + (*) foreign_xrefs is the list of columns that are subject to transcoding + (.) when obj[field] is an int, the transcoded value is directly + inserted in the table + (.) if it's a list, it is assumed that the relationship is maintained + in an external 2-column table. That's where the XrefTable class comes in + + The relationship between slices, slice attribute types and slice attributes being + more complex, it is handled in a specifically customized version of update_table + + Of course the order in which the tables are managed is important, with different + orders several iterations might be needed for the update process to converge. + + Of course the timers field was introduced for optimization and could safely be removed + """ - def __init__ (self, api, peer_id, peer_server, auth): + def __init__ (self, api, peer_id, peer_server): self.api = api self.peer_id = peer_id self.peer_server = peer_server - self.auth = auth class Transcoder: @@ -59,15 +87,15 @@ class Cache: def transcode (self, alien_id): """ transforms an alien id into a local one """ # locate alien obj from alien_id - verbose ('.entering transcode with alien_id',alien_id,) + #verbose ('.entering transcode with alien_id',alien_id,) alien_object=self.alien_objects_byid[alien_id] - verbose ('..located alien_obj',) + #verbose ('..located alien_obj',) name = alien_object [self.class_key] - verbose ('...got name',name,) + #verbose ('...got name',name,) local_object=self.local_objects_byname[name] - verbose ('....found local obj') + #verbose ('....found local obj') local_id=local_object[self.primary_key] - verbose ('.....and local_id',local_id) + #verbose ('.....and local_id',local_id) return local_id @@ -90,12 +118,22 @@ class Cache: self.api.db.do (sql) def insert_new_items (self, id1, id2_set): - ### xxx needs to be optimized - ### tried to figure a way to use a single sql statement - ### like: insert into table (x,y) values (1,2),(3,4); - ### but apparently this is not supported under postgresql - for id2 in id2_set: - sql = "INSERT INTO %s VALUES (%d,%d)"%(self.tablename,id1,id2) + ### xxx needs to be optimized + ### tried to figure a way to use a single sql statement + ### like: insert into table (x,y) values (1,2),(3,4); + ### but apparently this is not supported under postgresql + for id2 in id2_set: + sql = "INSERT INTO %s (%s_id,%s_id) VALUES (%d,%d)"% \ + (self.tablename,self.lowerclass1,self.lowerclass2,id1,id2) + +# below is Tony's code but it's badly broken. I'm not sure we care much in fact. +# if id2_set: +# sql = "INSERT INTO %s select %d, %d " % \ +# self.tablename, id1, id2[0] +# for id2 in id2_set[1:]: +# sql += " UNION ALL SELECT %d, %d " % \ +# (id1,id2) + self.api.db.do (sql) def update_item (self, id1, old_id2s, new_id2s): @@ -111,12 +149,10 @@ class Cache: # peer_object_list list of objects at a given peer - e.g. peer.GetSlices() # alien_xref_objs_dict : a dict {'classname':alien_obj_list} e.g. {'Node':peer.GetNodes()} # we need an entry for each class mentioned in the class's foreign_xrefs - # lambda_ignore : the alien objects are ignored if this returns true def update_table (self, classname, alien_object_list, alien_xref_objs_dict = {}, - lambda_ignore=lambda x:False, report_name_conflicts = True): verbose ("============================== entering update_table on",classname) @@ -150,8 +186,6 @@ class Cache: ### index upon class_key for future searches local_objects_index = local_objects.dict(class_key) - verbose ('update_table',classname,local_objects_index.keys()) - ### mark entries for this peer outofdate new_count=0 old_count=0; @@ -162,18 +196,19 @@ class Cache: else: local_object.uptodate=True + for alien_object in alien_object_list: + verbose ('+++ Got alien object',alien_object) + # scan the peer's local objects for alien_object in alien_object_list: object_name = alien_object[class_key] - ### ignore, e.g. system-wide slices - if lambda_ignore(alien_object): - verbose('Ignoring',object_name) - continue + verbose ('----- update_table (%s) - considering'%classname,object_name) + + # optimizing : avoid doing useless syncs + needs_sync = False - verbose ('update_table (%s) - Considering'%classname,object_name) - # create or update try: ### We know about this object already @@ -193,9 +228,12 @@ class Cache: ### we can assume the object just moved ### needs to update peer_id though local_object['peer_id'] = peer_id + needs_sync = True # update all fields as per foreign_fields for field in foreign_fields: - local_object[field]=alien_object[field] + if (local_object[field] != alien_object [field]): + local_object[field]=alien_object[field] + needs_sync = True verbose ('update_table FOUND',object_name) except: ### create a new entry @@ -217,7 +255,7 @@ class Cache: direct_ref_fields=[] for xref in foreign_xrefs: field=xref['field'] - verbose('checking field %s for direct_ref'%field) + #verbose('checking field %s for direct_ref'%field) if isinstance(alien_object[field],int): direct_ref_fields.append(field) verbose("FOUND DIRECT REFS",direct_ref_fields) @@ -225,10 +263,7 @@ class Cache: local_object[field]=1 verbose('Early sync on',local_object) local_object.sync() - verbose('Early syncing of %s, reloading'%object_name) - # sigh: now we have to reload it because of side-effects, like e.g. on Slice.expires - local_object=table_class(self.api, {class_key:object_name})[0] - verbose('After reload',local_object) + needs_sync = False # this row is now valid local_object.uptodate=True @@ -241,7 +276,7 @@ class Cache: alien_value = alien_object[field] transcoder = xref_accessories[xref['field']]['transcoder'] if isinstance (alien_value,list): - verbose ('update_table list-transcoding ',xref['class'],' aliens=',alien_value,) + #verbose ('update_table list-transcoding ',xref['class'],' aliens=',alien_value,) local_values=[] for a in alien_value: try: @@ -249,7 +284,7 @@ class Cache: except: # could not transcode - might be from another peer that we dont know about.. pass - verbose (" transcoded as ",local_values) + #verbose (" transcoded as ",local_values) xref_table = xref_accessories[xref['field']]['xref_table'] # newly created objects dont have xref fields set yet try: @@ -260,15 +295,17 @@ class Cache: former_xrefs, local_values) elif isinstance (alien_value,int): - verbose ('update_table atom-transcoding ',xref['class'],' aliens=',alien_value,) + #verbose ('update_table atom-transcoding ',xref['class'],' aliens=',alien_value,) new_value = transcoder.transcode(alien_value) - local_object[field] = new_value + if local_object[field] != new_value: + local_object[field] = new_value + needs_sync = True ### this object is completely updated, let's save it - verbose('FINAL sync on %s:'%object_name,local_object) - local_object.sync() + if needs_sync: + verbose('FINAL sync on %s:'%object_name,local_object) + local_object.sync(False) - ### delete entries that are not uptodate for local_object in local_objects: if not local_object.uptodate: @@ -293,6 +330,8 @@ class Cache: from PLC.SliceAttributeTypes import SliceAttributeTypes from PLC.SliceAttributes import SliceAttribute, SliceAttributes + verbose ("============================== entering update_slice_attributes") + # init peer_id = self.peer_id @@ -354,7 +393,7 @@ class Cache: if local_object['peer_id'] != peer_id: verbose ('FOUND local sa - skipped') continue - verbose('FOUND already cached sa') + verbose('FOUND already cached sa - setting value') local_object['value'] = alien_object['value'] # create it if missing except: @@ -367,7 +406,7 @@ class Cache: verbose('CREATED new sa') local_object.uptodate=True new_count += 1 - local_object.sync() + local_object.sync(False) for local_object in local_objects: if not local_object.uptodate: @@ -377,9 +416,6 @@ class Cache: ### return delta in number of objects return new_count-old_count - def get_locals (self, list): - return [x for x in list if x['peer_id'] is None] - def refresh_peer (self): # so as to minimize the numer of requests @@ -390,58 +426,104 @@ class Cache: ### uses GetPeerData to gather all info in a single xmlrpc request + timers={} + t_start=time.time() # xxx see also GetPeerData - peer_id arg unused yet - all_data = self.peer_server.GetPeerData (self.auth,0) + all_data = self.peer_server.GetPeerData (self.api.config.PLC_NAME) + verbose ('Passed GetPeerData the name',self.api.config.PLC_NAME) + sks=all_data.keys() + sks.sort() + for k in sks: + f=all_data[k] + try: + verbose ('GetPeerData[%s] -> %d'%(k,len(f))) + except: + pass + + t_acquired = time.time() # refresh sites - all_sites = all_data['Sites'] - plocal_sites = self.get_locals (all_sites) + plocal_sites = all_data['Sites-local'] + all_sites = plocal_sites + all_data['Sites-peer'] nb_new_sites = self.update_table('Site', plocal_sites) + t0 = time.time() + timers['process-sites']=t0-t_acquired + + # refresh keys - all_keys = all_data['Keys'] - plocal_keys = self.get_locals (all_keys) + plocal_keys = all_data['Keys-local'] + all_keys = plocal_keys + all_data['Keys-peer'] nb_new_keys = self.update_table('Key', plocal_keys) + t=time.time() + timers['process-keys']=t-t0 + t0=t + # refresh nodes - all_nodes = all_data['Nodes'] - plocal_nodes = self.get_locals(all_nodes) + plocal_nodes = all_data['Nodes-local'] + all_nodes = plocal_nodes + all_data['Nodes-peer'] nb_new_nodes = self.update_table('Node', plocal_nodes, { 'Site' : all_sites } ) + t=time.time() + timers['process-nodes']=t-t0 + t0=t + # refresh persons - all_persons = all_data['Persons'] - plocal_persons = self.get_locals(all_persons) + plocal_persons = all_data['Persons-local'] + all_persons = plocal_persons + all_data['Persons-peer'] nb_new_persons = self.update_table ('Person', plocal_persons, { 'Key': all_keys, 'Site' : all_sites } ) + t=time.time() + timers['process-persons']=t-t0 + t0=t + # refresh slice attribute types - all_slice_attribute_types = all_data ['SliceAttibuteTypes'] - plocal_slice_attribute_types = self.get_locals(all_slice_attribute_types) + plocal_slice_attribute_types = all_data ['SliceAttibuteTypes-local'] nb_new_slice_attribute_types = self.update_table ('SliceAttributeType', plocal_slice_attribute_types, report_name_conflicts = False) + t=time.time() + timers['process-sat']=t-t0 + t0=t + # refresh slices - all_slices = all_data['Slices'] - plocal_slices = self.get_locals(all_slices) + plocal_slices = all_data['Slices-local'] + all_slices = plocal_slices + all_data['Slices-peer'] - def is_system_slice (slice): - return slice['creator_person_id'] == 1 + # forget about ignoring remote system slices + # just update them too, we'll be able to filter them out later in GetSlivers nb_new_slices = self.update_table ('Slice', plocal_slices, - {'Node': all_nodes, 'Person': all_persons, 'Site': all_sites}, - is_system_slice) + {'Node': all_nodes, + 'Person': all_persons, + 'Site': all_sites}) + + t=time.time() + timers['process-slices']=t-t0 + t0=t # refresh slice attributes - all_slice_attributes = all_data ['SliceAttributes'] - plocal_slice_attributes = self.get_locals(all_slice_attributes) + plocal_slice_attributes = all_data ['SliceAttributes-local'] nb_new_slice_attributes = self.update_slice_attributes (plocal_slice_attributes, all_nodes, all_slices) + t=time.time() + timers['process-sa']=t-t0 + t0=t + + t_end=time.time() + timers['time_gather'] = all_data['ellapsed'] + timers['time_transmit'] = t_acquired-t_start-all_data['ellapsed'] + timers['time_process'] = t_end-t_acquired + timers['time_all'] = t_end-t_start + ### returned as-is by RefreshPeer - return {'plcname':self.api.config.PLC_NAME, + return { 'new_sites':nb_new_sites, 'new_keys':nb_new_keys, 'new_nodes':nb_new_nodes, @@ -449,5 +531,6 @@ class Cache: 'new_slice_attribute_types':nb_new_slice_attribute_types, 'new_slices':nb_new_slices, 'new_slice_attributes':nb_new_slice_attributes, + 'timers':timers, }