+import time
+
from PLC.Faults import *
from PLC.Parameter import Parameter
from PLC.Filter import Filter
from PLC.Table import Row, Table
verbose_flag=False;
-verbose_flag=True;
+#verbose_flag=True;
def verbose (*args):
if verbose_flag:
print (args)
class Cache:
- # an attempt to provide genericity in the caching algorithm
+ """
+ This class is the core of the RefreshPeer method's implementation,
+ that basically calls Cache:refresh_peer
+
+ It manages caching on a table-by-table basis, and performs required
+ transcoding from remote ids to local ids - that's the job of the
+ Transcoder class
+
+ For the tables (classes) that it handles, it uses the following
+ attributes
+ (*) primary_key is the name of the field where db indexes are stored
+ (*) class_key is used to implement equality on objects
+ (*) foreign_fields is the list of fields that are copied verbatim from
+ foreign objects
+ (*) foreign_xrefs is the list of columns that are subject to transcoding
+ (.) when obj[field] is an int, the transcoded value is directly
+ inserted in the table
+ (.) if it's a list, it is assumed that the relationship is maintained
+ in an external 2-column table. That's where the XrefTable class comes in
+
+ The relationship between slices, slice attribute types and slice attributes being
+ more complex, it is handled in a specifically customized version of update_table
+
+ Of course the order in which the tables are managed is important, with different
+ orders several iterations might be needed for the update process to converge.
+
+ Of course the timers field was introduced for optimization and could safely be removed
+ """
- def __init__ (self, api, peer_id, peer_server, auth):
+ def __init__ (self, api, peer_id, peer_server):
self.api = api
self.peer_id = peer_id
self.peer_server = peer_server
- self.auth = auth
class Transcoder:
def transcode (self, alien_id):
""" transforms an alien id into a local one """
# locate alien obj from alien_id
- verbose ('.entering transcode with alien_id',alien_id,)
+ #verbose ('.entering transcode with alien_id',alien_id,)
alien_object=self.alien_objects_byid[alien_id]
- verbose ('..located alien_obj',)
+ #verbose ('..located alien_obj',)
name = alien_object [self.class_key]
- verbose ('...got name',name,)
+ #verbose ('...got name',name,)
local_object=self.local_objects_byname[name]
- verbose ('....found local obj')
+ #verbose ('....found local obj')
local_id=local_object[self.primary_key]
- verbose ('.....and local_id',local_id)
+ #verbose ('.....and local_id',local_id)
return local_id
+ # for handling simple n-to-n relation tables, like e.g. slice_node
class XrefTable:
def __init__ (self, api, tablename, class1, class2):
self.api.db.do (sql)
def insert_new_items (self, id1, id2_set):
- ### xxx needs to be optimized
- ### tried to figure a way to use a single sql statement
- ### like: insert into table (x,y) values (1,2),(3,4);
- ### but apparently this is not supported under postgresql
- for id2 in id2_set:
- sql = "INSERT INTO %s VALUES (%d,%d)"%(self.tablename,id1,id2)
+ ### xxx needs to be optimized
+ ### tried to figure a way to use a single sql statement
+ ### like: insert into table (x,y) values (1,2),(3,4);
+ ### but apparently this is not supported under postgresql
+ for id2 in id2_set:
+ sql = "INSERT INTO %s (%s_id,%s_id) VALUES (%d,%d)"% \
+ (self.tablename,self.lowerclass1,self.lowerclass2,id1,id2)
+
+# below is Tony's code but it's badly broken. I'm not sure we care much in fact.
+# if id2_set:
+# sql = "INSERT INTO %s select %d, %d " % \
+# self.tablename, id1, id2[0]
+# for id2 in id2_set[1:]:
+# sql += " UNION ALL SELECT %d, %d " % \
+# (id1,id2)
+
self.api.db.do (sql)
def update_item (self, id1, old_id2s, new_id2s):
# classname: the type of objects we are talking about; e.g. 'Slice'
# peer_object_list list of objects at a given peer - e.g. peer.GetSlices()
# alien_xref_objs_dict : a dict {'classname':alien_obj_list} e.g. {'Node':peer.GetNodes()}
- # his must match the keys in xref_specs
- # lambda_ignore : the alien objects are ignored if this returns true
+ # we need an entry for each class mentioned in the class's foreign_xrefs
def update_table (self,
classname,
alien_object_list,
alien_xref_objs_dict = {},
- lambda_ignore=lambda x:False,
report_name_conflicts = True):
+ verbose ("============================== entering update_table on",classname)
peer_id=self.peer_id
attrs = class_attributes (classname)
## allocate transcoders and xreftables once, for each item in foreign_xrefs
# create a dict 'classname' -> {'transcoder' : ..., 'xref_table' : ...}
- accessories = dict(
- [ (xref_classname,
- {'transcoder':Cache.Transcoder (self.api,xref_classname,alien_xref_objs_dict[xref_classname]),
- 'xref_table':Cache.XrefTable (self.api,xref_spec['table'],classname,xref_classname)})
- for xref_classname,xref_spec in foreign_xrefs.iteritems()])
+ xref_accessories = dict(
+ [ (xref['field'],
+ {'transcoder' : Cache.Transcoder (self.api,xref['class'],alien_xref_objs_dict[xref['class']]),
+ 'xref_table' : Cache.XrefTable (self.api,xref['table'],classname,xref['class'])})
+ for xref in foreign_xrefs ])
+
+ # the fields that are direct references, like e.g. site_id in Node
+ # determined lazily, we need an alien_object to do that, and we may have none here
+ direct_ref_fields = None
### get current local table
# get ALL local objects so as to cope with
### index upon class_key for future searches
local_objects_index = local_objects.dict(class_key)
- verbose ('update_table',classname,local_objects_index.keys())
-
### mark entries for this peer outofdate
new_count=0
old_count=0;
else:
local_object.uptodate=True
+ for alien_object in alien_object_list:
+ verbose ('+++ Got alien object',alien_object)
+
# scan the peer's local objects
for alien_object in alien_object_list:
object_name = alien_object[class_key]
- ### ignore, e.g. system-wide slices
- if lambda_ignore(alien_object):
- verbose('Ignoring',object_name)
- continue
+ verbose ('----- update_table (%s) - considering'%classname,object_name)
+
+ # optimizing : avoid doing useless syncs
+ needs_sync = False
- verbose ('update_table (%s) - Considering'%classname,object_name)
-
# create or update
try:
### We know about this object already
if local_object ['peer_id'] is None:
if report_name_conflicts:
### xxx send e-mail
- print '==================== We are in trouble here'
- print 'The %s object named %s is natively defined twice'%(classname,object_name)
- print 'Once on this PLC and once on peer %d'%peer_id
+ print '!!!!!!!!!! We are in trouble here'
+ print 'The %s object named %s is natively defined twice, '%(classname,object_name),
+ print 'once on this PLC and once on peer %d'%peer_id
print 'We dont raise an exception so that the remaining updates can still take place'
+ print '!!!!!!!!!!'
continue
if local_object['peer_id'] != peer_id:
### the object has changed its plc,
### we can assume the object just moved
### needs to update peer_id though
local_object['peer_id'] = peer_id
+ needs_sync = True
+ # update all fields as per foreign_fields
+ for field in foreign_fields:
+ if (local_object[field] != alien_object [field]):
+ local_object[field]=alien_object[field]
+ needs_sync = True
verbose ('update_table FOUND',object_name)
except:
### create a new entry
# insert in index
local_objects_index[class_key]=local_object
verbose ('update_table CREATED',object_name)
-
- # go on with update
- for field in foreign_fields:
- local_object[field]=alien_object[field]
+ # update all fields as per foreign_fields
+ for field in foreign_fields:
+ local_object[field]=alien_object[field]
+ # this is tricky; at this point we may have primary_key unspecified,
+ # but we need it for handling xrefs below, so we'd like to sync to get one
+ # on the other hand some required fields may be still missing so
+ # the DB would refuse to sync in this case (e.g. site_id in Node)
+ # so let's fill them with 1 so we can sync, this will be overridden below
+ # lazily determine this set of fields now
+ if direct_ref_fields is None:
+ direct_ref_fields=[]
+ for xref in foreign_xrefs:
+ field=xref['field']
+ #verbose('checking field %s for direct_ref'%field)
+ if isinstance(alien_object[field],int):
+ direct_ref_fields.append(field)
+ verbose("FOUND DIRECT REFS",direct_ref_fields)
+ for field in direct_ref_fields:
+ local_object[field]=1
+ verbose('Early sync on',local_object)
+ local_object.sync()
+ needs_sync = False
# this row is now valid
local_object.uptodate=True
new_count += 1
- local_object.sync()
# manage cross-refs
- for xref_classname,xref_spec in foreign_xrefs.iteritems():
- field=xref_spec['field']
- alien_xref_obj_list = alien_xref_objs_dict[xref_classname]
+ for xref in foreign_xrefs:
+ field=xref['field']
+ alien_xref_obj_list = alien_xref_objs_dict[xref['class']]
alien_value = alien_object[field]
- transcoder = accessories[xref_classname]['transcoder']
+ transcoder = xref_accessories[xref['field']]['transcoder']
if isinstance (alien_value,list):
- verbose ('update_table list-transcoding ',xref_classname,' aliens=',alien_value,)
+ #verbose ('update_table list-transcoding ',xref['class'],' aliens=',alien_value,)
local_values=[]
for a in alien_value:
try:
except:
# could not transcode - might be from another peer that we dont know about..
pass
- verbose (" transcoded as ",local_values)
- xref_table = accessories[xref_classname]['xref_table']
- # newly created objects dont have xrefs yet
+ #verbose (" transcoded as ",local_values)
+ xref_table = xref_accessories[xref['field']]['xref_table']
+ # newly created objects dont have xref fields set yet
try:
- former_xrefs=local_object[xref_spec['field']]
+ former_xrefs=local_object[xref['field']]
except:
former_xrefs=[]
xref_table.update_item (local_object[primary_key],
former_xrefs,
local_values)
elif isinstance (alien_value,int):
- verbose ('update_table atom-transcoding ',xref_classname,' aliens=',alien_value,)
+ #verbose ('update_table atom-transcoding ',xref['class'],' aliens=',alien_value,)
new_value = transcoder.transcode(alien_value)
- local_object[field] = new_value
- local_object.sync()
-
+ if local_object[field] != new_value:
+ local_object[field] = new_value
+ needs_sync = True
+
+ ### this object is completely updated, let's save it
+ if needs_sync:
+ verbose('FINAL sync on %s:'%object_name,local_object)
+ local_object.sync(False)
+
### delete entries that are not uptodate
for local_object in local_objects:
if not local_object.uptodate:
from PLC.SliceAttributeTypes import SliceAttributeTypes
from PLC.SliceAttributes import SliceAttribute, SliceAttributes
+ verbose ("============================== entering update_slice_attributes")
+
# init
peer_id = self.peer_id
if local_object['peer_id'] != peer_id:
verbose ('FOUND local sa - skipped')
continue
- verbose('FOUND already cached sa')
+ verbose('FOUND already cached sa - setting value')
local_object['value'] = alien_object['value']
# create it if missing
except:
verbose('CREATED new sa')
local_object.uptodate=True
new_count += 1
- local_object.sync()
+ local_object.sync(False)
for local_object in local_objects:
if not local_object.uptodate:
### return delta in number of objects
return new_count-old_count
- def get_locals (self, list):
- return [x for x in list if x['peer_id'] is None]
-
def refresh_peer (self):
# so as to minimize the numer of requests
### uses GetPeerData to gather all info in a single xmlrpc request
+ timers={}
+ t_start=time.time()
# xxx see also GetPeerData - peer_id arg unused yet
- all_data = self.peer_server.GetPeerData (self.auth,0)
+ all_data = self.peer_server.GetPeerData (self.api.config.PLC_NAME)
+ verbose ('Passed GetPeerData the name',self.api.config.PLC_NAME)
+ sks=all_data.keys()
+ sks.sort()
+ for k in sks:
+ f=all_data[k]
+ try:
+ verbose ('GetPeerData[%s] -> %d'%(k,len(f)))
+ except:
+ pass
+
+ t_acquired = time.time()
# refresh sites
- all_sites = all_data['Sites']
- plocal_sites = self.get_locals (all_sites)
+ plocal_sites = all_data['Sites-local']
+ all_sites = plocal_sites + all_data['Sites-peer']
nb_new_sites = self.update_table('Site', plocal_sites)
+ t0 = time.time()
+ timers['process-sites']=t0-t_acquired
+
+
# refresh keys
- all_keys = all_data['Keys']
- plocal_keys = self.get_locals (all_keys)
+ plocal_keys = all_data['Keys-local']
+ all_keys = plocal_keys + all_data['Keys-peer']
nb_new_keys = self.update_table('Key', plocal_keys)
+ t=time.time()
+ timers['process-keys']=t-t0
+ t0=t
+
# refresh nodes
- all_nodes = all_data['Nodes']
- plocal_nodes = self.get_locals(all_nodes)
+ plocal_nodes = all_data['Nodes-local']
+ all_nodes = plocal_nodes + all_data['Nodes-peer']
nb_new_nodes = self.update_table('Node', plocal_nodes,
{ 'Site' : all_sites } )
+ t=time.time()
+ timers['process-nodes']=t-t0
+ t0=t
+
# refresh persons
- all_persons = all_data['Persons']
- plocal_persons = self.get_locals(all_persons)
+ plocal_persons = all_data['Persons-local']
+ all_persons = plocal_persons + all_data['Persons-peer']
nb_new_persons = self.update_table ('Person', plocal_persons,
{ 'Key': all_keys, 'Site' : all_sites } )
+ t=time.time()
+ timers['process-persons']=t-t0
+ t0=t
+
# refresh slice attribute types
- all_slice_attribute_types = all_data ['SliceAttibuteTypes']
- plocal_slice_attribute_types = self.get_locals(all_slice_attribute_types)
+ plocal_slice_attribute_types = all_data ['SliceAttibuteTypes-local']
nb_new_slice_attribute_types = self.update_table ('SliceAttributeType',
plocal_slice_attribute_types,
report_name_conflicts = False)
+ t=time.time()
+ timers['process-sat']=t-t0
+ t0=t
+
# refresh slices
- all_slices = all_data['Slices']
- plocal_slices = self.get_locals(all_slices)
+ plocal_slices = all_data['Slices-local']
+ all_slices = plocal_slices + all_data['Slices-peer']
- def is_system_slice (slice):
- return slice['creator_person_id'] == 1
+ # forget about ignoring remote system slices
+ # just update them too, we'll be able to filter them out later in GetSlivers
nb_new_slices = self.update_table ('Slice', plocal_slices,
- {'Node': all_nodes, 'Person': all_persons},
- is_system_slice)
+ {'Node': all_nodes,
+ 'Person': all_persons,
+ 'Site': all_sites})
+
+ t=time.time()
+ timers['process-slices']=t-t0
+ t0=t
# refresh slice attributes
- all_slice_attributes = all_data ['SliceAttributes']
- plocal_slice_attributes = self.get_locals(all_slice_attributes)
+ plocal_slice_attributes = all_data ['SliceAttributes-local']
nb_new_slice_attributes = self.update_slice_attributes (plocal_slice_attributes,
all_nodes,
all_slices)
+ t=time.time()
+ timers['process-sa']=t-t0
+ t0=t
+
+ t_end=time.time()
+ timers['time_gather'] = all_data['ellapsed']
+ timers['time_transmit'] = t_acquired-t_start-all_data['ellapsed']
+ timers['time_process'] = t_end-t_acquired
+ timers['time_all'] = t_end-t_start
+
### returned as-is by RefreshPeer
- return {'plcname':self.api.config.PLC_NAME,
+ return {
'new_sites':nb_new_sites,
'new_keys':nb_new_keys,
'new_nodes':nb_new_nodes,
'new_slice_attribute_types':nb_new_slice_attribute_types,
'new_slices':nb_new_slices,
'new_slice_attributes':nb_new_slice_attributes,
+ 'timers':timers,
}