+import time
+
from PLC.Faults import *
from PLC.Parameter import Parameter
from PLC.Filter import Filter
class Cache:
- # an attempt to provide genericity in the caching algorithm
+ """
+ This class is the core of the RefreshPeer method's implementation,
+ that basically calls Cache:refresh_peer
+
+ It manages caching on a table-by-table basis, and performs required
+ transcoding from remote ids to local ids - that's the job of the
+ Transcoder class
+
+ For the tables (classes) that it handles, it uses the following
+ attributes
+ (*) primary_key is the name of the field where db indexes are stored
+ (*) class_key is used to implement equality on objects
+ (*) foreign_fields is the list of fields that are copied verbatim from
+ foreign objects
+ (*) foreign_xrefs is the list of columns that are subject to transcoding
+ (.) when obj[field] is an int, the transcoded value is directly
+ inserted in the table
+ (.) if it's a list, it is assumed that the relationship is maintained
+ in an external 2-column table. That's where the XrefTable class comes in
+
+ The relationship between slices, slice attribute types and slice attributes being
+ more complex, it is handled in a specifically customized version of update_table
+
+ Of course the order in which the tables are managed is important, with different
+ orders several iterations might be needed for the update process to converge.
+
+ Of course the timers field was introduced for optimization and could safely be removed
+ """
- # the Peer object we are syncing with
- def __init__ (self, api, peer, peer_server, auth):
-
- import PLC.Peers
+ def __init__ (self, api, peer_id, peer_server):
self.api = api
- assert isinstance(peer,PLC.Peers.Peer)
- self.peer = peer
+ self.peer_id = peer_id
self.peer_server = peer_server
- self.auth = auth
class Transcoder:
def transcode (self, alien_id):
""" transforms an alien id into a local one """
# locate alien obj from alien_id
- verbose ('entering transcode with alien_id',alien_id,)
+ #verbose ('.entering transcode with alien_id',alien_id,)
alien_object=self.alien_objects_byid[alien_id]
- verbose ('located alien_obj',)
+ #verbose ('..located alien_obj',)
name = alien_object [self.class_key]
- verbose ('got name',name,)
+ #verbose ('...got name',name,)
local_object=self.local_objects_byname[name]
- verbose ('found local obj')
+ #verbose ('....found local obj')
local_id=local_object[self.primary_key]
- verbose ('and local_id',local_id)
+ #verbose ('.....and local_id',local_id)
return local_id
+ # for handling simple n-to-n relation tables, like e.g. slice_node
class XrefTable:
def __init__ (self, api, tablename, class1, class2):
self.api.db.do (sql)
def insert_new_items (self, id1, id2_set):
- ### xxx needs to be optimized
- ### tried to figure a way to use a single sql statement
- ### like: insert into table (x,y) values (1,2),(3,4);
- ### but apparently this is not supported under postgresql
- for id2 in id2_set:
- sql = "INSERT INTO %s VALUES (%d,%d)"%(self.tablename,id1,id2)
+ ### xxx needs to be optimized
+ ### tried to figure a way to use a single sql statement
+ ### like: insert into table (x,y) values (1,2),(3,4);
+ ### but apparently this is not supported under postgresql
+ for id2 in id2_set:
+ sql = "INSERT INTO %s (%s_id,%s_id) VALUES (%d,%d)"% \
+ (self.tablename,self.lowerclass1,self.lowerclass2,id1,id2)
+
+# below is Tony's code but it's badly broken. I'm not sure we care much in fact.
+# if id2_set:
+# sql = "INSERT INTO %s select %d, %d " % \
+# self.tablename, id1, id2[0]
+# for id2 in id2_set[1:]:
+# sql += " UNION ALL SELECT %d, %d " % \
+# (id1,id2)
+
self.api.db.do (sql)
def update_item (self, id1, old_id2s, new_id2s):
# classname: the type of objects we are talking about; e.g. 'Slice'
# peer_object_list list of objects at a given peer - e.g. peer.GetSlices()
# alien_xref_objs_dict : a dict {'classname':alien_obj_list} e.g. {'Node':peer.GetNodes()}
- # his must match the keys in xref_specs
- # lambda_ignore : the alien objects are ignored if this returns true
+ # we need an entry for each class mentioned in the class's foreign_xrefs
def update_table (self,
classname,
alien_object_list,
alien_xref_objs_dict = {},
- lambda_ignore=lambda x:False):
+ report_name_conflicts = True):
- peer = self.peer
- peer_id = peer['peer_id']
+ verbose ("============================== entering update_table on",classname)
+ peer_id=self.peer_id
attrs = class_attributes (classname)
row_class = attrs['row_class']
## allocate transcoders and xreftables once, for each item in foreign_xrefs
# create a dict 'classname' -> {'transcoder' : ..., 'xref_table' : ...}
- accessories = dict(
- [ (xref_classname,
- {'transcoder':Cache.Transcoder (self.api,xref_classname,alien_xref_objs_dict[xref_classname]),
- 'xref_table':Cache.XrefTable (self.api,xref_spec['table'],classname,xref_classname)})
- for xref_classname,xref_spec in foreign_xrefs.iteritems()])
+ xref_accessories = dict(
+ [ (xref['field'],
+ {'transcoder' : Cache.Transcoder (self.api,xref['class'],alien_xref_objs_dict[xref['class']]),
+ 'xref_table' : Cache.XrefTable (self.api,xref['table'],classname,xref['class'])})
+ for xref in foreign_xrefs ])
+
+ # the fields that are direct references, like e.g. site_id in Node
+ # determined lazily, we need an alien_object to do that, and we may have none here
+ direct_ref_fields = None
### get current local table
# get ALL local objects so as to cope with
# (*) or naming conflicts
local_objects = table_class (self.api)
### index upon class_key for future searches
- #verbose ('local objects:',local_objects)
- verbose ('class_key',class_key)
local_objects_index = local_objects.dict(class_key)
- verbose ('update_table',classname,local_objects_index.keys())
### mark entries for this peer outofdate
+ new_count=0
old_count=0;
for local_object in local_objects:
if local_object['peer_id'] == peer_id:
else:
local_object.uptodate=True
- new_count=0
+ for alien_object in alien_object_list:
+ verbose ('+++ Got alien object',alien_object)
+
# scan the peer's local objects
for alien_object in alien_object_list:
object_name = alien_object[class_key]
- ### ignore, e.g. system-wide slices
- if lambda_ignore(alien_object):
- verbose('Ignoring',object_name)
- continue
+ verbose ('----- update_table (%s) - considering'%classname,object_name)
+
+ # optimizing : avoid doing useless syncs
+ needs_sync = False
- verbose ('update_table - Considering',object_name)
-
# create or update
try:
### We know about this object already
local_object = local_objects_index[object_name]
if local_object ['peer_id'] is None:
- ### xxx send e-mail
- print 'We are in trouble here'
- print 'The %s object named %s is natively defined twice'%(classname,object_name)
- print 'Once on this PLC and once on peer %d'%peer_id
- print 'We dont raise an exception so that the remaining updates can still take place'
+ if report_name_conflicts:
+ ### xxx send e-mail
+ print '!!!!!!!!!! We are in trouble here'
+ print 'The %s object named %s is natively defined twice, '%(classname,object_name),
+ print 'once on this PLC and once on peer %d'%peer_id
+ print 'We dont raise an exception so that the remaining updates can still take place'
+ print '!!!!!!!!!!'
continue
if local_object['peer_id'] != peer_id:
### the object has changed its plc,
### we can assume the object just moved
### needs to update peer_id though
local_object['peer_id'] = peer_id
+ needs_sync = True
+ # update all fields as per foreign_fields
+ for field in foreign_fields:
+ if (local_object[field] != alien_object [field]):
+ local_object[field]=alien_object[field]
+ needs_sync = True
verbose ('update_table FOUND',object_name)
except:
### create a new entry
# insert in index
local_objects_index[class_key]=local_object
verbose ('update_table CREATED',object_name)
-
- # go on with update
- for field in foreign_fields:
- local_object[field]=alien_object[field]
+ # update all fields as per foreign_fields
+ for field in foreign_fields:
+ local_object[field]=alien_object[field]
+ # this is tricky; at this point we may have primary_key unspecified,
+ # but we need it for handling xrefs below, so we'd like to sync to get one
+ # on the other hand some required fields may be still missing so
+ # the DB would refuse to sync in this case (e.g. site_id in Node)
+ # so let's fill them with 1 so we can sync, this will be overridden below
+ # lazily determine this set of fields now
+ if direct_ref_fields is None:
+ direct_ref_fields=[]
+ for xref in foreign_xrefs:
+ field=xref['field']
+ #verbose('checking field %s for direct_ref'%field)
+ if isinstance(alien_object[field],int):
+ direct_ref_fields.append(field)
+ verbose("FOUND DIRECT REFS",direct_ref_fields)
+ for field in direct_ref_fields:
+ local_object[field]=1
+ verbose('Early sync on',local_object)
+ local_object.sync()
+ needs_sync = False
# this row is now valid
local_object.uptodate=True
new_count += 1
- local_object.sync()
# manage cross-refs
- for xref_classname,xref_spec in foreign_xrefs.iteritems():
- field=xref_spec['field']
- alien_xref_obj_list = alien_xref_objs_dict[xref_classname]
+ for xref in foreign_xrefs:
+ field=xref['field']
+ alien_xref_obj_list = alien_xref_objs_dict[xref['class']]
alien_value = alien_object[field]
+ transcoder = xref_accessories[xref['field']]['transcoder']
if isinstance (alien_value,list):
- verbose ('update_table list-transcoding ',xref_classname,' aliens=',alien_value,)
- transcoder = accessories[xref_classname]['transcoder']
+ #verbose ('update_table list-transcoding ',xref['class'],' aliens=',alien_value,)
local_values=[]
for a in alien_value:
try:
except:
# could not transcode - might be from another peer that we dont know about..
pass
- verbose (" transcoded as ",local_values)
- xref_table = accessories[xref_classname]['xref_table']
- # newly created objects dont have xrefs yet
+ #verbose (" transcoded as ",local_values)
+ xref_table = xref_accessories[xref['field']]['xref_table']
+ # newly created objects dont have xref fields set yet
try:
- former_xrefs=local_object[xref_spec['field']]
+ former_xrefs=local_object[xref['field']]
except:
former_xrefs=[]
xref_table.update_item (local_object[primary_key],
former_xrefs,
local_values)
elif isinstance (alien_value,int):
+ #verbose ('update_table atom-transcoding ',xref['class'],' aliens=',alien_value,)
new_value = transcoder.transcode(alien_value)
- local_object[field] = new_value
- local_object.sync()
-
+ if local_object[field] != new_value:
+ local_object[field] = new_value
+ needs_sync = True
+
+ ### this object is completely updated, let's save it
+ if needs_sync:
+ verbose('FINAL sync on %s:'%object_name,local_object)
+ local_object.sync(False)
+
### delete entries that are not uptodate
for local_object in local_objects:
if not local_object.uptodate:
### return delta in number of objects
return new_count-old_count
-
- def refresh_nodes (self, peer_get_nodes):
- """
- refreshes the foreign_nodes and peer_node tables
- expected input is the current list of local nodes
- as returned from the peer by GetNodes {'peer_id':None}
- returns the number of new nodes (can be negative)
- """
+ # slice attributes exhibit a special behaviour
+ # because there is no name we can use to retrieve/check for equality
+ # this object is like a 3-part xref, linking slice_attribute_type, slice,
+ # and potentially node, together with a value that can change over time.
+ # extending the generic model to support a lambda rather than class_key
+ # would clearly become overkill
+ def update_slice_attributes (self,
+ alien_slice_attributes,
+ alien_nodes,
+ alien_slices):
- return self.update_table ('Node', peer_get_nodes)
-
- def refresh_slices (self, peer_get_slices, peer_foreign_nodes):
- """
- refreshes the foreign_slices and peer_slice tables
- expected input is the current list of slices as returned by GetSlices
-
- returns the number of new slices on this peer (can be negative)
- """
-
- # xxx use 'system' flag for finding system slices
- return self.update_table ('Slice', peer_get_slices,
- {'Node':peer_foreign_nodes},
- lambda x: x['creator_person_id']==1)
+ from PLC.SliceAttributeTypes import SliceAttributeTypes
+ from PLC.SliceAttributes import SliceAttribute, SliceAttributes
+
+ verbose ("============================== entering update_slice_attributes")
+
+ # init
+ peer_id = self.peer_id
+ # create transcoders
+ node_xcoder = Cache.Transcoder (self.api, 'Node', alien_nodes)
+ slice_xcoder= Cache.Transcoder (self.api, 'Slice', alien_slices)
+ # no need to transcode SliceAttributeTypes, we have a name in the result
+ local_sat_dict = SliceAttributeTypes(self.api).dict('name')
+
+ # load local objects
+ local_objects = SliceAttributes (self.api,{'peer_id':peer_id})
+
+ ### mark entries for this peer outofdate
+ new_count = 0
+ old_count=len(local_objects)
+ for local_object in local_objects:
+ local_object.uptodate=False
+
+ for alien_object in alien_slice_attributes:
+
+ verbose('----- update_slice_attributes: considering ...')
+ verbose(' ',alien_object)
+
+ # locate local slice
+ try:
+ slice_id = slice_xcoder.transcode(alien_object['slice_id'])
+ except:
+ verbose('update_slice_attributes: unable to locate slice',
+ alien_object['slice_id'])
+ continue
+ # locate slice_attribute_type
+ try:
+ sat_id = local_sat_dict[alien_object['name']]['attribute_type_id']
+ except:
+ verbose('update_slice_attributes: unable to locate slice attribute type',
+ alien_object['name'])
+ continue
+ # locate local node if specified
+ try:
+ alien_node_id = alien_object['node_id']
+ if alien_node_id is not None:
+ node_id = node_xcoder.transcode(alien_node_id)
+ else:
+ node_id=None
+ except:
+ verbose('update_slice_attributes: unable to locate node',
+ alien_object['node_id'])
+ continue
+
+ # locate the local SliceAttribute if any
+ try:
+ verbose ('searching name=', alien_object['name'],
+ 'slice_id',slice_id, 'node_id',node_id)
+ local_object = SliceAttributes (self.api,
+ {'name':alien_object['name'],
+ 'slice_id':slice_id,
+ 'node_id':node_id})[0]
+
+ if local_object['peer_id'] != peer_id:
+ verbose ('FOUND local sa - skipped')
+ continue
+ verbose('FOUND already cached sa - setting value')
+ local_object['value'] = alien_object['value']
+ # create it if missing
+ except:
+ local_object = SliceAttribute(self.api,
+ {'peer_id':peer_id,
+ 'slice_id':slice_id,
+ 'node_id':node_id,
+ 'attribute_type_id':sat_id,
+ 'value':alien_object['value']})
+ verbose('CREATED new sa')
+ local_object.uptodate=True
+ new_count += 1
+ local_object.sync(False)
+
+ for local_object in local_objects:
+ if not local_object.uptodate:
+ local_object.delete()
+
+ self.api.db.commit()
+ ### return delta in number of objects
+ return new_count-old_count
+
def refresh_peer (self):
- peer_local_slices = self.peer_server.GetSlices(self.auth,{'peer_id':None})
+ # so as to minimize the numer of requests
+ # we get all objects in a single call and sort afterwards
+ # xxx ideally get objects either local or the ones attached here
+ # requires to know remote peer's peer_id for ourselves, mmhh..
+ # does not make any difference in a 2-peer deployment though
+
+ ### uses GetPeerData to gather all info in a single xmlrpc request
+
+ timers={}
+ t_start=time.time()
+ # xxx see also GetPeerData - peer_id arg unused yet
+ all_data = self.peer_server.GetPeerData (self.api.config.PLC_NAME)
+
+ verbose ('Passed GetPeerData the name',self.api.config.PLC_NAME)
+ sks=all_data.keys()
+ sks.sort()
+ for k in sks:
+ f=all_data[k]
+ try:
+ verbose ('GetPeerData[%s] -> %d'%(k,len(f)))
+ except:
+ pass
+
+ t_acquired = time.time()
+ # refresh sites
+ plocal_sites = all_data['Sites-local']
+ all_sites = plocal_sites + all_data['Sites-peer']
+ nb_new_sites = self.update_table('Site', plocal_sites)
+
+ t0 = time.time()
+ timers['process-sites']=t0-t_acquired
+
# refresh keys
- peer_local_keys = self.peer_server.GetKeys(self.auth,{'peer_id':None})
- nb_new_keys = self.update_table('Key', peer_local_keys)
+ plocal_keys = all_data['Keys-local']
+ all_keys = plocal_keys + all_data['Keys-peer']
+ nb_new_keys = self.update_table('Key', plocal_keys)
+
+ t=time.time()
+ timers['process-keys']=t-t0
+ t0=t
# refresh nodes
- peer_local_nodes = self.peer_server.GetNodes(self.auth,{'peer_id':None})
- nb_new_nodes = self.update_table('Node', peer_local_nodes)
+ plocal_nodes = all_data['Nodes-local']
+ all_nodes = plocal_nodes + all_data['Nodes-peer']
+ nb_new_nodes = self.update_table('Node', plocal_nodes,
+ { 'Site' : all_sites } )
+
+ t=time.time()
+ timers['process-nodes']=t-t0
+ t0=t
# refresh persons
- peer_local_persons = self.peer_server.GetPersons(self.auth,{'peer_id':None})
- # xxx ideally get our own persons only
- # requires to know remote peer's peer_id for ourselves, mmhh
- peer_all_keys = peer_local_keys + self.peer_server.GetKeys(self.auth,{'~peer_id':None})
- nb_new_persons = self.update_table ('Person', peer_local_persons,
- { 'Key': peer_all_keys} )
+ plocal_persons = all_data['Persons-local']
+ all_persons = plocal_persons + all_data['Persons-peer']
+ nb_new_persons = self.update_table ('Person', plocal_persons,
+ { 'Key': all_keys, 'Site' : all_sites } )
- # refresh slices
- def is_system_slice (slice):
- return slice['creator_person_id'] == 1
- # xxx would ideally get our own nodes only,
- peer_all_nodes = peer_local_nodes+self.peer_server.GetNodes(self.auth,{'~peer_id':None})
+ t=time.time()
+ timers['process-persons']=t-t0
+ t0=t
+
+ # refresh slice attribute types
+ plocal_slice_attribute_types = all_data ['SliceAttibuteTypes-local']
+ nb_new_slice_attribute_types = self.update_table ('SliceAttributeType',
+ plocal_slice_attribute_types,
+ report_name_conflicts = False)
- nb_new_slices = self.update_table ('Slice', peer_local_slices,
- {'Node':peer_all_nodes},
- is_system_slice)
+ t=time.time()
+ timers['process-sat']=t-t0
+ t0=t
+ # refresh slices
+ plocal_slices = all_data['Slices-local']
+ all_slices = plocal_slices + all_data['Slices-peer']
+
+ # forget about ignoring remote system slices
+ # just update them too, we'll be able to filter them out later in GetSlivers
+
+ nb_new_slices = self.update_table ('Slice', plocal_slices,
+ {'Node': all_nodes,
+ 'Person': all_persons,
+ 'Site': all_sites})
+
+ t=time.time()
+ timers['process-slices']=t-t0
+ t0=t
+
+ # refresh slice attributes
+ plocal_slice_attributes = all_data ['SliceAttributes-local']
+ nb_new_slice_attributes = self.update_slice_attributes (plocal_slice_attributes,
+ all_nodes,
+ all_slices)
+ t=time.time()
+ timers['process-sa']=t-t0
+ t0=t
+
+ t_end=time.time()
- return {'plcname':self.api.config.PLC_NAME,
+ timers['time_gather'] = all_data['ellapsed']
+ timers['time_transmit'] = t_acquired-t_start-all_data['ellapsed']
+ timers['time_process'] = t_end-t_acquired
+ timers['time_all'] = t_end-t_start
+
+ ### returned as-is by RefreshPeer
+ return {
+ 'new_sites':nb_new_sites,
'new_keys':nb_new_keys,
'new_nodes':nb_new_nodes,
'new_persons':nb_new_persons,
- 'new_slices':nb_new_slices}
+ 'new_slice_attribute_types':nb_new_slice_attribute_types,
+ 'new_slices':nb_new_slices,
+ 'new_slice_attributes':nb_new_slice_attributes,
+ 'timers':timers,
+ }