+import time
+
from PLC.Faults import *
from PLC.Parameter import Parameter
from PLC.Filter import Filter
class Cache:
- # an attempt to provide genericity in the caching algorithm
+ """
+ This class is the core of the RefreshPeer method's implementation,
+ that basically calls Cache:refresh_peer
+
+ It manages caching on a table-by-table basis, and performs required
+ transcoding from remote ids to local ids - that's the job of the
+ Transcoder class
+
+ For the tables (classes) that it handles, it uses the following
+ attributes
+ (*) primary_key is the name of the field where db indexes are stored
+ (*) class_key is used to implement equality on objects
+ (*) foreign_fields is the list of fields that are copied verbatim from
+ foreign objects
+ (*) foreign_xrefs is the list of columns that are subject to transcoding
+ (.) when obj[field] is an int, the transcoded value is directly
+ inserted in the table
+ (.) if it's a list, it is assumed that the relationship is maintained
+ in an external 2-column table. That's where the XrefTable class comes in
+
+ The relationship between slices, slice attribute types and slice attributes being
+ more complex, it is handled in a specifically customized version of update_table
+
+ Of course the order in which the tables are managed is important, with different
+ orders several iterations might be needed for the update process to converge.
+
+ Of course the timers field was introduced for optimization and could safely be removed
+ """
- def __init__ (self, api, peer_id, peer_server, auth):
+ def __init__ (self, api, peer_id, peer_server):
self.api = api
self.peer_id = peer_id
self.peer_server = peer_server
- self.auth = auth
class Transcoder:
self.api.db.do (sql)
def insert_new_items (self, id1, id2_set):
- ### xxx needs to be optimized
- ### tried to figure a way to use a single sql statement
- ### like: insert into table (x,y) values (1,2),(3,4);
- ### but apparently this is not supported under postgresql
- for id2 in id2_set:
- sql = "INSERT INTO %s VALUES (%d,%d)"%(self.tablename,id1,id2)
+ ### xxx needs to be optimized
+ ### tried to figure a way to use a single sql statement
+ ### like: insert into table (x,y) values (1,2),(3,4);
+ ### but apparently this is not supported under postgresql
+ for id2 in id2_set:
+ sql = "INSERT INTO %s (%s_id,%s_id) VALUES (%d,%d)"% \
+ (self.tablename,self.lowerclass1,self.lowerclass2,id1,id2)
+
+# below is Tony's code but it's badly broken. I'm not sure we care much in fact.
+# if id2_set:
+# sql = "INSERT INTO %s select %d, %d " % \
+# self.tablename, id1, id2[0]
+# for id2 in id2_set[1:]:
+# sql += " UNION ALL SELECT %d, %d " % \
+# (id1,id2)
+
self.api.db.do (sql)
def update_item (self, id1, old_id2s, new_id2s):
# peer_object_list list of objects at a given peer - e.g. peer.GetSlices()
# alien_xref_objs_dict : a dict {'classname':alien_obj_list} e.g. {'Node':peer.GetNodes()}
# we need an entry for each class mentioned in the class's foreign_xrefs
- # lambda_ignore : the alien objects are ignored if this returns true
def update_table (self,
classname,
alien_object_list,
alien_xref_objs_dict = {},
- lambda_ignore=lambda x:False,
report_name_conflicts = True):
verbose ("============================== entering update_table on",classname)
### index upon class_key for future searches
local_objects_index = local_objects.dict(class_key)
- #verbose ('update_table',classname,local_objects_index.keys())
-
### mark entries for this peer outofdate
new_count=0
old_count=0;
else:
local_object.uptodate=True
+ for alien_object in alien_object_list:
+ verbose ('+++ Got alien object',alien_object)
+
# scan the peer's local objects
for alien_object in alien_object_list:
object_name = alien_object[class_key]
- ### ignore, e.g. system-wide slices
- if lambda_ignore(alien_object):
- verbose('Ignoring',object_name)
- continue
+ verbose ('----- update_table (%s) - considering'%classname,object_name)
+
+ # optimizing : avoid doing useless syncs
+ needs_sync = False
- verbose ('update_table (%s) - Considering'%classname,object_name)
-
# create or update
try:
### We know about this object already
### we can assume the object just moved
### needs to update peer_id though
local_object['peer_id'] = peer_id
+ needs_sync = True
# update all fields as per foreign_fields
for field in foreign_fields:
- local_object[field]=alien_object[field]
+ if (local_object[field] != alien_object [field]):
+ local_object[field]=alien_object[field]
+ needs_sync = True
verbose ('update_table FOUND',object_name)
except:
### create a new entry
direct_ref_fields=[]
for xref in foreign_xrefs:
field=xref['field']
- verbose('checking field %s for direct_ref'%field)
+ #verbose('checking field %s for direct_ref'%field)
if isinstance(alien_object[field],int):
direct_ref_fields.append(field)
verbose("FOUND DIRECT REFS",direct_ref_fields)
local_object[field]=1
verbose('Early sync on',local_object)
local_object.sync()
+ needs_sync = False
# this row is now valid
local_object.uptodate=True
elif isinstance (alien_value,int):
#verbose ('update_table atom-transcoding ',xref['class'],' aliens=',alien_value,)
new_value = transcoder.transcode(alien_value)
- local_object[field] = new_value
+ if local_object[field] != new_value:
+ local_object[field] = new_value
+ needs_sync = True
### this object is completely updated, let's save it
- verbose('FINAL sync on %s:'%object_name,local_object)
- local_object.sync()
+ if needs_sync:
+ verbose('FINAL sync on %s:'%object_name,local_object)
+ local_object.sync(False)
-
### delete entries that are not uptodate
for local_object in local_objects:
if not local_object.uptodate:
from PLC.SliceAttributeTypes import SliceAttributeTypes
from PLC.SliceAttributes import SliceAttribute, SliceAttributes
+ verbose ("============================== entering update_slice_attributes")
+
# init
peer_id = self.peer_id
verbose('CREATED new sa')
local_object.uptodate=True
new_count += 1
- local_object.sync()
+ local_object.sync(False)
for local_object in local_objects:
if not local_object.uptodate:
### return delta in number of objects
return new_count-old_count
- def get_locals (self, list):
- return [x for x in list if x['peer_id'] is None]
-
def refresh_peer (self):
# so as to minimize the numer of requests
### uses GetPeerData to gather all info in a single xmlrpc request
+ timers={}
+ t_start=time.time()
# xxx see also GetPeerData - peer_id arg unused yet
- all_data = self.peer_server.GetPeerData (self.auth,0)
+ all_data = self.peer_server.GetPeerData (self.api.config.PLC_NAME)
+ verbose ('Passed GetPeerData the name',self.api.config.PLC_NAME)
+ sks=all_data.keys()
+ sks.sort()
+ for k in sks:
+ f=all_data[k]
+ try:
+ verbose ('GetPeerData[%s] -> %d'%(k,len(f)))
+ except:
+ pass
+
+ t_acquired = time.time()
# refresh sites
- all_sites = all_data['Sites']
- plocal_sites = self.get_locals (all_sites)
+ plocal_sites = all_data['Sites-local']
+ all_sites = plocal_sites + all_data['Sites-peer']
nb_new_sites = self.update_table('Site', plocal_sites)
+ t0 = time.time()
+ timers['process-sites']=t0-t_acquired
+
+
# refresh keys
- all_keys = all_data['Keys']
- plocal_keys = self.get_locals (all_keys)
+ plocal_keys = all_data['Keys-local']
+ all_keys = plocal_keys + all_data['Keys-peer']
nb_new_keys = self.update_table('Key', plocal_keys)
+ t=time.time()
+ timers['process-keys']=t-t0
+ t0=t
+
# refresh nodes
- all_nodes = all_data['Nodes']
- plocal_nodes = self.get_locals(all_nodes)
+ plocal_nodes = all_data['Nodes-local']
+ all_nodes = plocal_nodes + all_data['Nodes-peer']
nb_new_nodes = self.update_table('Node', plocal_nodes,
{ 'Site' : all_sites } )
+ t=time.time()
+ timers['process-nodes']=t-t0
+ t0=t
+
# refresh persons
- all_persons = all_data['Persons']
- plocal_persons = self.get_locals(all_persons)
+ plocal_persons = all_data['Persons-local']
+ all_persons = plocal_persons + all_data['Persons-peer']
nb_new_persons = self.update_table ('Person', plocal_persons,
{ 'Key': all_keys, 'Site' : all_sites } )
+ t=time.time()
+ timers['process-persons']=t-t0
+ t0=t
+
# refresh slice attribute types
- all_slice_attribute_types = all_data ['SliceAttibuteTypes']
- plocal_slice_attribute_types = self.get_locals(all_slice_attribute_types)
+ plocal_slice_attribute_types = all_data ['SliceAttibuteTypes-local']
nb_new_slice_attribute_types = self.update_table ('SliceAttributeType',
plocal_slice_attribute_types,
report_name_conflicts = False)
+ t=time.time()
+ timers['process-sat']=t-t0
+ t0=t
+
# refresh slices
- all_slices = all_data['Slices']
- plocal_slices = self.get_locals(all_slices)
+ plocal_slices = all_data['Slices-local']
+ all_slices = plocal_slices + all_data['Slices-peer']
- def is_system_slice (slice):
- return slice['creator_person_id'] == 1
+ # forget about ignoring remote system slices
+ # just update them too, we'll be able to filter them out later in GetSlivers
nb_new_slices = self.update_table ('Slice', plocal_slices,
- {'Node': all_nodes, 'Person': all_persons, 'Site': all_sites},
- is_system_slice)
+ {'Node': all_nodes,
+ 'Person': all_persons,
+ 'Site': all_sites})
+
+ t=time.time()
+ timers['process-slices']=t-t0
+ t0=t
# refresh slice attributes
- all_slice_attributes = all_data ['SliceAttributes']
- plocal_slice_attributes = self.get_locals(all_slice_attributes)
+ plocal_slice_attributes = all_data ['SliceAttributes-local']
nb_new_slice_attributes = self.update_slice_attributes (plocal_slice_attributes,
all_nodes,
all_slices)
+ t=time.time()
+ timers['process-sa']=t-t0
+ t0=t
+
+ t_end=time.time()
+ timers['time_gather'] = all_data['ellapsed']
+ timers['time_transmit'] = t_acquired-t_start-all_data['ellapsed']
+ timers['time_process'] = t_end-t_acquired
+ timers['time_all'] = t_end-t_start
+
### returned as-is by RefreshPeer
- return {'plcname':self.api.config.PLC_NAME,
+ return {
'new_sites':nb_new_sites,
'new_keys':nb_new_keys,
'new_nodes':nb_new_nodes,
'new_slice_attribute_types':nb_new_slice_attribute_types,
'new_slices':nb_new_slices,
'new_slice_attributes':nb_new_slice_attributes,
+ 'timers':timers,
}