-import time
-
-from PLC.Faults import *
-from PLC.Parameter import Parameter
-from PLC.Filter import Filter
-from PLC.Table import Row, Table
-
-verbose_flag=False;
-#verbose_flag=True;
-def verbose (*args):
- if verbose_flag:
- print (args)
-
-def class_attributes (classname):
- """ locates various attributes defined in the row class """
- topmodule = __import__ ('PLC.%ss'%classname)
- module = topmodule.__dict__['%ss'%classname]
- # local row-like class, e.g. Node
- row_class = module.__dict__['%s'%classname]
- # local tab-like class, e.g. Nodes
- table_class = module.__dict__['%ss'%classname]
-
- return {'row_class':row_class,
- 'table_class':table_class,
- 'primary_key': row_class.__dict__['primary_key'],
- 'class_key': row_class.__dict__['class_key'],
- 'foreign_fields': row_class.__dict__['foreign_fields'],
- 'foreign_xrefs': row_class.__dict__['foreign_xrefs'],
- }
-
-class Cache:
-
- """
- This class is the core of the RefreshPeer method's implementation,
- that basically calls Cache:refresh_peer
-
- It manages caching on a table-by-table basis, and performs required
- transcoding from remote ids to local ids - that's the job of the
- Transcoder class
-
- For the tables (classes) that it handles, it uses the following
- attributes
- (*) primary_key is the name of the field where db indexes are stored
- (*) class_key is used to implement equality on objects
- (*) foreign_fields is the list of fields that are copied verbatim from
- foreign objects
- (*) foreign_xrefs is the list of columns that are subject to transcoding
- (.) when obj[field] is an int, the transcoded value is directly
- inserted in the table
- (.) if it's a list, it is assumed that the relationship is maintained
- in an external 2-column table. That's where the XrefTable class comes in
-
- The relationship between slices, slice attribute types and slice attributes being
- more complex, it is handled in a specifically customized version of update_table
-
- Of course the order in which the tables are managed is important, with different
- orders several iterations might be needed for the update process to converge.
-
- Of course the timers field was introduced for optimization and could safely be removed
- """
-
- def __init__ (self, api, peer_id, peer_server):
-
- self.api = api
- self.peer_id = peer_id
- self.peer_server = peer_server
-
- class Transcoder:
-
- def __init__ (self, api, classname, alien_objects):
- self.api = api
- attrs = class_attributes (classname)
- self.primary_key = attrs['primary_key']
- self.class_key = attrs['class_key']
-
- # cannot use dict, it's acquired by xmlrpc and is untyped
- self.alien_objects_byid = dict( [ (x[self.primary_key],x) for x in alien_objects ] )
-
- # retrieve local objects
- local_objects = attrs['table_class'] (api)
- self.local_objects_byname = local_objects.dict(self.class_key)
-
- verbose ('Transcoder init :',classname,
- self.alien_objects_byid.keys(),
- self.local_objects_byname.keys())
-
- def transcode (self, alien_id):
- """ transforms an alien id into a local one """
- # locate alien obj from alien_id
- #verbose ('.entering transcode with alien_id',alien_id,)
- alien_object=self.alien_objects_byid[alien_id]
- #verbose ('..located alien_obj',)
- name = alien_object [self.class_key]
- #verbose ('...got name',name,)
- local_object=self.local_objects_byname[name]
- #verbose ('....found local obj')
- local_id=local_object[self.primary_key]
- #verbose ('.....and local_id',local_id)
- return local_id
-
-
- # for handling simple n-to-n relation tables, like e.g. slice_node
- class XrefTable:
-
- def __init__ (self, api, tablename, class1, class2):
- self.api = api
- self.tablename = tablename
- self.lowerclass1 = class1.lower()
- self.lowerclass2 = class2.lower()
-
- def delete_old_items (self, id1, id2_set):
- if id2_set:
- sql = ""
- sql += "DELETE FROM %s WHERE %s_id=%d"%(self.tablename,self.lowerclass1,id1)
- sql += " AND %s_id IN ("%self.lowerclass2
- sql += ",".join([str(i) for i in id2_set])
- sql += ")"
- self.api.db.do (sql)
-
- def insert_new_items (self, id1, id2_set):
- ### xxx needs to be optimized
- ### tried to figure a way to use a single sql statement
- ### like: insert into table (x,y) values (1,2),(3,4);
- ### but apparently this is not supported under postgresql
- for id2 in id2_set:
- sql = "INSERT INTO %s (%s_id,%s_id) VALUES (%d,%d)"% \
- (self.tablename,self.lowerclass1,self.lowerclass2,id1,id2)
-
-# below is Tony's code but it's badly broken. I'm not sure we care much in fact.
-# if id2_set:
-# sql = "INSERT INTO %s select %d, %d " % \
-# self.tablename, id1, id2[0]
-# for id2 in id2_set[1:]:
-# sql += " UNION ALL SELECT %d, %d " % \
-# (id1,id2)
-
- self.api.db.do (sql)
-
- def update_item (self, id1, old_id2s, new_id2s):
- news = set (new_id2s)
- olds = set (old_id2s)
- to_delete = olds-news
- self.delete_old_items (id1, to_delete)
- to_create = news-olds
- self.insert_new_items (id1, to_create)
- self.api.db.commit()
-
- # classname: the type of objects we are talking about; e.g. 'Slice'
- # peer_object_list list of objects at a given peer - e.g. peer.GetSlices()
- # alien_xref_objs_dict : a dict {'classname':alien_obj_list} e.g. {'Node':peer.GetNodes()}
- # we need an entry for each class mentioned in the class's foreign_xrefs
- def update_table (self,
- classname,
- alien_object_list,
- alien_xref_objs_dict = {},
- report_name_conflicts = True):
-
- verbose ("============================== entering update_table on",classname)
- peer_id=self.peer_id
-
- attrs = class_attributes (classname)
- row_class = attrs['row_class']
- table_class = attrs['table_class']
- primary_key = attrs['primary_key']
- class_key = attrs['class_key']
- foreign_fields = attrs['foreign_fields']
- foreign_xrefs = attrs['foreign_xrefs']
-
- ## allocate transcoders and xreftables once, for each item in foreign_xrefs
- # create a dict 'classname' -> {'transcoder' : ..., 'xref_table' : ...}
- xref_accessories = dict(
- [ (xref['field'],
- {'transcoder' : Cache.Transcoder (self.api,xref['class'],alien_xref_objs_dict[xref['class']]),
- 'xref_table' : Cache.XrefTable (self.api,xref['table'],classname,xref['class'])})
- for xref in foreign_xrefs ])
-
- # the fields that are direct references, like e.g. site_id in Node
- # determined lazily, we need an alien_object to do that, and we may have none here
- direct_ref_fields = None
-
- ### get current local table
- # get ALL local objects so as to cope with
- # (*) potential moves between plcs
- # (*) or naming conflicts
- local_objects = table_class (self.api)
- ### index upon class_key for future searches
- local_objects_index = local_objects.dict(class_key)
-
- ### mark entries for this peer outofdate
- new_count=0
- old_count=0;
- for local_object in local_objects:
- if local_object['peer_id'] == peer_id:
- local_object.uptodate=False
- old_count += 1
- else:
- local_object.uptodate=True
-
- for alien_object in alien_object_list:
- verbose ('+++ Got alien object',alien_object)
-
- # scan the peer's local objects
- for alien_object in alien_object_list:
-
- object_name = alien_object[class_key]
-
- verbose ('----- update_table (%s) - considering'%classname,object_name)
-
- # optimizing : avoid doing useless syncs
- needs_sync = False
-
- # create or update
- try:
- ### We know about this object already
- local_object = local_objects_index[object_name]
- if local_object ['peer_id'] is None:
- if report_name_conflicts:
- ### xxx send e-mail
- print '!!!!!!!!!! We are in trouble here'
- print 'The %s object named %s is natively defined twice, '%(classname,object_name),
- print 'once on this PLC and once on peer %d'%peer_id
- print 'We dont raise an exception so that the remaining updates can still take place'
- print '!!!!!!!!!!'
- continue
- if local_object['peer_id'] != peer_id:
- ### the object has changed its plc,
- ### Note, this is not problematic here because both definitions are remote
- ### we can assume the object just moved
- ### needs to update peer_id though
- local_object['peer_id'] = peer_id
- needs_sync = True
- # update all fields as per foreign_fields
- for field in foreign_fields:
- if (local_object[field] != alien_object [field]):
- local_object[field]=alien_object[field]
- needs_sync = True
- verbose ('update_table FOUND',object_name)
- except:
- ### create a new entry
- local_object = row_class(self.api,
- {class_key :object_name,'peer_id':peer_id})
- # insert in index
- local_objects_index[class_key]=local_object
- verbose ('update_table CREATED',object_name)
- # update all fields as per foreign_fields
- for field in foreign_fields:
- local_object[field]=alien_object[field]
- # this is tricky; at this point we may have primary_key unspecified,
- # but we need it for handling xrefs below, so we'd like to sync to get one
- # on the other hand some required fields may be still missing so
- # the DB would refuse to sync in this case (e.g. site_id in Node)
- # so let's fill them with 1 so we can sync, this will be overridden below
- # lazily determine this set of fields now
- if direct_ref_fields is None:
- direct_ref_fields=[]
- for xref in foreign_xrefs:
- field=xref['field']
- #verbose('checking field %s for direct_ref'%field)
- if isinstance(alien_object[field],int):
- direct_ref_fields.append(field)
- verbose("FOUND DIRECT REFS",direct_ref_fields)
- for field in direct_ref_fields:
- local_object[field]=1
- verbose('Early sync on',local_object)
- local_object.sync()
- needs_sync = False
-
- # this row is now valid
- local_object.uptodate=True
- new_count += 1
-
- # manage cross-refs
- for xref in foreign_xrefs:
- field=xref['field']
- alien_xref_obj_list = alien_xref_objs_dict[xref['class']]
- alien_value = alien_object[field]
- transcoder = xref_accessories[xref['field']]['transcoder']
- if isinstance (alien_value,list):
- #verbose ('update_table list-transcoding ',xref['class'],' aliens=',alien_value,)
- local_values=[]
- for a in alien_value:
- try:
- local_values.append(transcoder.transcode(a))
- except:
- # could not transcode - might be from another peer that we dont know about..
- pass
- #verbose (" transcoded as ",local_values)
- xref_table = xref_accessories[xref['field']]['xref_table']
- # newly created objects dont have xref fields set yet
- try:
- former_xrefs=local_object[xref['field']]
- except:
- former_xrefs=[]
- xref_table.update_item (local_object[primary_key],
- former_xrefs,
- local_values)
- elif isinstance (alien_value,int):
- #verbose ('update_table atom-transcoding ',xref['class'],' aliens=',alien_value,)
- new_value = transcoder.transcode(alien_value)
- if local_object[field] != new_value:
- local_object[field] = new_value
- needs_sync = True
-
- ### this object is completely updated, let's save it
- if needs_sync:
- verbose('FINAL sync on %s:'%object_name,local_object)
- local_object.sync(False)
-
- ### delete entries that are not uptodate
- for local_object in local_objects:
- if not local_object.uptodate:
- local_object.delete()
-
- self.api.db.commit()
-
- ### return delta in number of objects
- return new_count-old_count
-
- # slice attributes exhibit a special behaviour
- # because there is no name we can use to retrieve/check for equality
- # this object is like a 3-part xref, linking slice_attribute_type, slice,
- # and potentially node, together with a value that can change over time.
- # extending the generic model to support a lambda rather than class_key
- # would clearly become overkill
- def update_slice_attributes (self,
- alien_slice_attributes,
- alien_nodes,
- alien_slices):
-
- from PLC.SliceAttributeTypes import SliceAttributeTypes
- from PLC.SliceAttributes import SliceAttribute, SliceAttributes
-
- verbose ("============================== entering update_slice_attributes")
-
- # init
- peer_id = self.peer_id
-
- # create transcoders
- node_xcoder = Cache.Transcoder (self.api, 'Node', alien_nodes)
- slice_xcoder= Cache.Transcoder (self.api, 'Slice', alien_slices)
- # no need to transcode SliceAttributeTypes, we have a name in the result
- local_sat_dict = SliceAttributeTypes(self.api).dict('name')
-
- # load local objects
- local_objects = SliceAttributes (self.api,{'peer_id':peer_id})
-
- ### mark entries for this peer outofdate
- new_count = 0
- old_count=len(local_objects)
- for local_object in local_objects:
- local_object.uptodate=False
-
- for alien_object in alien_slice_attributes:
-
- verbose('----- update_slice_attributes: considering ...')
- verbose(' ',alien_object)
-
- # locate local slice
- try:
- slice_id = slice_xcoder.transcode(alien_object['slice_id'])
- except:
- verbose('update_slice_attributes: unable to locate slice',
- alien_object['slice_id'])
- continue
- # locate slice_attribute_type
- try:
- sat_id = local_sat_dict[alien_object['name']]['attribute_type_id']
- except:
- verbose('update_slice_attributes: unable to locate slice attribute type',
- alien_object['name'])
- continue
- # locate local node if specified
- try:
- alien_node_id = alien_object['node_id']
- if alien_node_id is not None:
- node_id = node_xcoder.transcode(alien_node_id)
- else:
- node_id=None
- except:
- verbose('update_slice_attributes: unable to locate node',
- alien_object['node_id'])
- continue
-
- # locate the local SliceAttribute if any
- try:
- verbose ('searching name=', alien_object['name'],
- 'slice_id',slice_id, 'node_id',node_id)
- local_object = SliceAttributes (self.api,
- {'name':alien_object['name'],
- 'slice_id':slice_id,
- 'node_id':node_id})[0]
-
- if local_object['peer_id'] != peer_id:
- verbose ('FOUND local sa - skipped')
- continue
- verbose('FOUND already cached sa - setting value')
- local_object['value'] = alien_object['value']
- # create it if missing
- except:
- local_object = SliceAttribute(self.api,
- {'peer_id':peer_id,
- 'slice_id':slice_id,
- 'node_id':node_id,
- 'attribute_type_id':sat_id,
- 'value':alien_object['value']})
- verbose('CREATED new sa')
- local_object.uptodate=True
- new_count += 1
- local_object.sync(False)
-
- for local_object in local_objects:
- if not local_object.uptodate:
- local_object.delete()
-
- self.api.db.commit()
- ### return delta in number of objects
- return new_count-old_count
-
- def refresh_peer (self):
-
- # so as to minimize the numer of requests
- # we get all objects in a single call and sort afterwards
- # xxx ideally get objects either local or the ones attached here
- # requires to know remote peer's peer_id for ourselves, mmhh..
- # does not make any difference in a 2-peer deployment though
-
- ### uses GetPeerData to gather all info in a single xmlrpc request
-
- timers={}
- t_start=time.time()
- # xxx see also GetPeerData - peer_id arg unused yet
- all_data = self.peer_server.GetPeerData (self.api.config.PLC_NAME)
-
- verbose ('Passed GetPeerData the name',self.api.config.PLC_NAME)
- sks=all_data.keys()
- sks.sort()
- for k in sks:
- f=all_data[k]
- try:
- verbose ('GetPeerData[%s] -> %d'%(k,len(f)))
- except:
- pass
-
- t_acquired = time.time()
- # refresh sites
- plocal_sites = all_data['Sites-local']
- all_sites = plocal_sites + all_data['Sites-peer']
- nb_new_sites = self.update_table('Site', plocal_sites)
-
- t0 = time.time()
- timers['process-sites']=t0-t_acquired
-
-
- # refresh keys
- plocal_keys = all_data['Keys-local']
- all_keys = plocal_keys + all_data['Keys-peer']
- nb_new_keys = self.update_table('Key', plocal_keys)
-
- t=time.time()
- timers['process-keys']=t-t0
- t0=t
-
- # refresh nodes
- plocal_nodes = all_data['Nodes-local']
- all_nodes = plocal_nodes + all_data['Nodes-peer']
- nb_new_nodes = self.update_table('Node', plocal_nodes,
- { 'Site' : all_sites } )
-
- t=time.time()
- timers['process-nodes']=t-t0
- t0=t
-
- # refresh persons
- plocal_persons = all_data['Persons-local']
- all_persons = plocal_persons + all_data['Persons-peer']
- nb_new_persons = self.update_table ('Person', plocal_persons,
- { 'Key': all_keys, 'Site' : all_sites } )
-
- t=time.time()
- timers['process-persons']=t-t0
- t0=t
-
- # refresh slice attribute types
- plocal_slice_attribute_types = all_data ['SliceAttibuteTypes-local']
- nb_new_slice_attribute_types = self.update_table ('SliceAttributeType',
- plocal_slice_attribute_types,
- report_name_conflicts = False)
-
- t=time.time()
- timers['process-sat']=t-t0
- t0=t
-
- # refresh slices
- plocal_slices = all_data['Slices-local']
- all_slices = plocal_slices + all_data['Slices-peer']
-
- # forget about ignoring remote system slices
- # just update them too, we'll be able to filter them out later in GetSlivers
-
- nb_new_slices = self.update_table ('Slice', plocal_slices,
- {'Node': all_nodes,
- 'Person': all_persons,
- 'Site': all_sites})
-
- t=time.time()
- timers['process-slices']=t-t0
- t0=t
-
- # refresh slice attributes
- plocal_slice_attributes = all_data ['SliceAttributes-local']
- nb_new_slice_attributes = self.update_slice_attributes (plocal_slice_attributes,
- all_nodes,
- all_slices)
- t=time.time()
- timers['process-sa']=t-t0
- t0=t
-
- t_end=time.time()
-
- timers['time_gather'] = all_data['ellapsed']
- timers['time_transmit'] = t_acquired-t_start-all_data['ellapsed']
- timers['time_process'] = t_end-t_acquired
- timers['time_all'] = t_end-t_start
-
- ### returned as-is by RefreshPeer
- return {
- 'new_sites':nb_new_sites,
- 'new_keys':nb_new_keys,
- 'new_nodes':nb_new_nodes,
- 'new_persons':nb_new_persons,
- 'new_slice_attribute_types':nb_new_slice_attribute_types,
- 'new_slices':nb_new_slices,
- 'new_slice_attributes':nb_new_slice_attributes,
- 'timers':timers,
- }
-