3 from PLC.Faults import *
4 from PLC.Parameter import Parameter
5 from PLC.Filter import Filter
6 from PLC.Table import Row, Table
14 def class_attributes (classname):
15 """ locates various attributes defined in the row class """
16 topmodule = __import__ ('PLC.%ss'%classname)
17 module = topmodule.__dict__['%ss'%classname]
18 # local row-like class, e.g. Node
19 row_class = module.__dict__['%s'%classname]
20 # local tab-like class, e.g. Nodes
21 table_class = module.__dict__['%ss'%classname]
23 return {'row_class':row_class,
24 'table_class':table_class,
25 'primary_key': row_class.__dict__['primary_key'],
26 'class_key': row_class.__dict__['class_key'],
27 'foreign_fields': row_class.__dict__['foreign_fields'],
28 'foreign_xrefs': row_class.__dict__['foreign_xrefs'],
34 This class is the core of the RefreshPeer method's implementation,
35 that basically calls Cache:refresh_peer
37 It manages caching on a table-by-table basis, and performs required
38 transcoding from remote ids to local ids - that's the job of the
41 For the tables (classes) that it handles, it uses the following
43 (*) primary_key is the name of the field where db indexes are stored
44 (*) class_key is used to implement equality on objects
45 (*) foreign_fields is the list of fields that are copied verbatim from
47 (*) foreign_xrefs is the list of columns that are subject to transcoding
48 (.) when obj[field] is an int, the transcoded value is directly
50 (.) if it's a list, it is assumed that the relationship is maintained
51 in an external 2-column table. That's where the XrefTable class comes in
53 The relationship between slices, slice attribute types and slice attributes being
54 more complex, it is handled in a specifically customized version of update_table
56 Of course the order in which the tables are managed is important, with different
57 orders several iterations might be needed for the update process to converge.
59 Of course the timers field was introduced for optimization and could safely be removed
62 def __init__ (self, api, peer_id, peer_server, auth):
65 self.peer_id = peer_id
66 self.peer_server = peer_server
71 def __init__ (self, api, classname, alien_objects):
73 attrs = class_attributes (classname)
74 self.primary_key = attrs['primary_key']
75 self.class_key = attrs['class_key']
77 # cannot use dict, it's acquired by xmlrpc and is untyped
78 self.alien_objects_byid = dict( [ (x[self.primary_key],x) for x in alien_objects ] )
80 # retrieve local objects
81 local_objects = attrs['table_class'] (api)
82 self.local_objects_byname = local_objects.dict(self.class_key)
84 verbose ('Transcoder init :',classname,
85 self.alien_objects_byid.keys(),
86 self.local_objects_byname.keys())
88 def transcode (self, alien_id):
89 """ transforms an alien id into a local one """
90 # locate alien obj from alien_id
91 #verbose ('.entering transcode with alien_id',alien_id,)
92 alien_object=self.alien_objects_byid[alien_id]
93 #verbose ('..located alien_obj',)
94 name = alien_object [self.class_key]
95 #verbose ('...got name',name,)
96 local_object=self.local_objects_byname[name]
97 #verbose ('....found local obj')
98 local_id=local_object[self.primary_key]
99 #verbose ('.....and local_id',local_id)
103 # for handling simple n-to-n relation tables, like e.g. slice_node
106 def __init__ (self, api, tablename, class1, class2):
108 self.tablename = tablename
109 self.lowerclass1 = class1.lower()
110 self.lowerclass2 = class2.lower()
112 def delete_old_items (self, id1, id2_set):
115 sql += "DELETE FROM %s WHERE %s_id=%d"%(self.tablename,self.lowerclass1,id1)
116 sql += " AND %s_id IN ("%self.lowerclass2
117 sql += ",".join([str(i) for i in id2_set])
121 def insert_new_items (self, id1, id2_set):
122 ### xxx needs to be optimized
123 ### tried to figure a way to use a single sql statement
124 ### like: insert into table (x,y) values (1,2),(3,4);
125 ### but apparently this is not supported under postgresql
127 sql = "INSERT INTO %s (%s_id,%s_id) VALUES (%d,%d)"% \
128 (self.tablename,self.lowerclass1,self.lowerclass2,id1,id2)
130 # below is Tony's code but it's badly broken. I'm not sure we care much in fact.
132 # sql = "INSERT INTO %s select %d, %d " % \
133 # self.tablename, id1, id2[0]
134 # for id2 in id2_set[1:]:
135 # sql += " UNION ALL SELECT %d, %d " % \
140 def update_item (self, id1, old_id2s, new_id2s):
141 news = set (new_id2s)
142 olds = set (old_id2s)
143 to_delete = olds-news
144 self.delete_old_items (id1, to_delete)
145 to_create = news-olds
146 self.insert_new_items (id1, to_create)
149 # classname: the type of objects we are talking about; e.g. 'Slice'
150 # peer_object_list list of objects at a given peer - e.g. peer.GetSlices()
151 # alien_xref_objs_dict : a dict {'classname':alien_obj_list} e.g. {'Node':peer.GetNodes()}
152 # we need an entry for each class mentioned in the class's foreign_xrefs
153 def update_table (self,
156 alien_xref_objs_dict = {},
157 report_name_conflicts = True):
159 verbose ("============================== entering update_table on",classname)
162 attrs = class_attributes (classname)
163 row_class = attrs['row_class']
164 table_class = attrs['table_class']
165 primary_key = attrs['primary_key']
166 class_key = attrs['class_key']
167 foreign_fields = attrs['foreign_fields']
168 foreign_xrefs = attrs['foreign_xrefs']
170 ## allocate transcoders and xreftables once, for each item in foreign_xrefs
171 # create a dict 'classname' -> {'transcoder' : ..., 'xref_table' : ...}
172 xref_accessories = dict(
174 {'transcoder' : Cache.Transcoder (self.api,xref['class'],alien_xref_objs_dict[xref['class']]),
175 'xref_table' : Cache.XrefTable (self.api,xref['table'],classname,xref['class'])})
176 for xref in foreign_xrefs ])
178 # the fields that are direct references, like e.g. site_id in Node
179 # determined lazily, we need an alien_object to do that, and we may have none here
180 direct_ref_fields = None
182 ### get current local table
183 # get ALL local objects so as to cope with
184 # (*) potential moves between plcs
185 # (*) or naming conflicts
186 local_objects = table_class (self.api)
187 ### index upon class_key for future searches
188 local_objects_index = local_objects.dict(class_key)
190 ### mark entries for this peer outofdate
193 for local_object in local_objects:
194 if local_object['peer_id'] == peer_id:
195 local_object.uptodate=False
198 local_object.uptodate=True
200 for alien_object in alien_object_list:
201 verbose ('+++ Got alien object',alien_object)
203 # scan the peer's local objects
204 for alien_object in alien_object_list:
206 object_name = alien_object[class_key]
208 verbose ('----- update_table (%s) - considering'%classname,object_name)
210 # optimizing : avoid doing useless syncs
215 ### We know about this object already
216 local_object = local_objects_index[object_name]
217 if local_object ['peer_id'] is None:
218 if report_name_conflicts:
220 print '!!!!!!!!!! We are in trouble here'
221 print 'The %s object named %s is natively defined twice, '%(classname,object_name),
222 print 'once on this PLC and once on peer %d'%peer_id
223 print 'We dont raise an exception so that the remaining updates can still take place'
226 if local_object['peer_id'] != peer_id:
227 ### the object has changed its plc,
228 ### Note, this is not problematic here because both definitions are remote
229 ### we can assume the object just moved
230 ### needs to update peer_id though
231 local_object['peer_id'] = peer_id
233 # update all fields as per foreign_fields
234 for field in foreign_fields:
235 if (local_object[field] != alien_object [field]):
236 local_object[field]=alien_object[field]
238 verbose ('update_table FOUND',object_name)
240 ### create a new entry
241 local_object = row_class(self.api,
242 {class_key :object_name,'peer_id':peer_id})
244 local_objects_index[class_key]=local_object
245 verbose ('update_table CREATED',object_name)
246 # update all fields as per foreign_fields
247 for field in foreign_fields:
248 local_object[field]=alien_object[field]
249 # this is tricky; at this point we may have primary_key unspecified,
250 # but we need it for handling xrefs below, so we'd like to sync to get one
251 # on the other hand some required fields may be still missing so
252 # the DB would refuse to sync in this case (e.g. site_id in Node)
253 # so let's fill them with 1 so we can sync, this will be overridden below
254 # lazily determine this set of fields now
255 if direct_ref_fields is None:
257 for xref in foreign_xrefs:
259 #verbose('checking field %s for direct_ref'%field)
260 if isinstance(alien_object[field],int):
261 direct_ref_fields.append(field)
262 verbose("FOUND DIRECT REFS",direct_ref_fields)
263 for field in direct_ref_fields:
264 local_object[field]=1
265 verbose('Early sync on',local_object)
269 # this row is now valid
270 local_object.uptodate=True
274 for xref in foreign_xrefs:
276 alien_xref_obj_list = alien_xref_objs_dict[xref['class']]
277 alien_value = alien_object[field]
278 transcoder = xref_accessories[xref['field']]['transcoder']
279 if isinstance (alien_value,list):
280 #verbose ('update_table list-transcoding ',xref['class'],' aliens=',alien_value,)
282 for a in alien_value:
284 local_values.append(transcoder.transcode(a))
286 # could not transcode - might be from another peer that we dont know about..
288 #verbose (" transcoded as ",local_values)
289 xref_table = xref_accessories[xref['field']]['xref_table']
290 # newly created objects dont have xref fields set yet
292 former_xrefs=local_object[xref['field']]
295 xref_table.update_item (local_object[primary_key],
298 elif isinstance (alien_value,int):
299 #verbose ('update_table atom-transcoding ',xref['class'],' aliens=',alien_value,)
300 new_value = transcoder.transcode(alien_value)
301 if local_object[field] != new_value:
302 local_object[field] = new_value
305 ### this object is completely updated, let's save it
307 verbose('FINAL sync on %s:'%object_name,local_object)
308 local_object.sync(False)
310 ### delete entries that are not uptodate
311 for local_object in local_objects:
312 if not local_object.uptodate:
313 local_object.delete()
317 ### return delta in number of objects
318 return new_count-old_count
320 # slice attributes exhibit a special behaviour
321 # because there is no name we can use to retrieve/check for equality
322 # this object is like a 3-part xref, linking slice_attribute_type, slice,
323 # and potentially node, together with a value that can change over time.
324 # extending the generic model to support a lambda rather than class_key
325 # would clearly become overkill
326 def update_slice_attributes (self,
327 alien_slice_attributes,
331 from PLC.SliceAttributeTypes import SliceAttributeTypes
332 from PLC.SliceAttributes import SliceAttribute, SliceAttributes
334 verbose ("============================== entering update_slice_attributes")
337 peer_id = self.peer_id
340 node_xcoder = Cache.Transcoder (self.api, 'Node', alien_nodes)
341 slice_xcoder= Cache.Transcoder (self.api, 'Slice', alien_slices)
342 # no need to transcode SliceAttributeTypes, we have a name in the result
343 local_sat_dict = SliceAttributeTypes(self.api).dict('name')
346 local_objects = SliceAttributes (self.api,{'peer_id':peer_id})
348 ### mark entries for this peer outofdate
350 old_count=len(local_objects)
351 for local_object in local_objects:
352 local_object.uptodate=False
354 for alien_object in alien_slice_attributes:
356 verbose('----- update_slice_attributes: considering ...')
357 verbose(' ',alien_object)
361 slice_id = slice_xcoder.transcode(alien_object['slice_id'])
363 verbose('update_slice_attributes: unable to locate slice',
364 alien_object['slice_id'])
366 # locate slice_attribute_type
368 sat_id = local_sat_dict[alien_object['name']]['attribute_type_id']
370 verbose('update_slice_attributes: unable to locate slice attribute type',
371 alien_object['name'])
373 # locate local node if specified
375 alien_node_id = alien_object['node_id']
376 if alien_node_id is not None:
377 node_id = node_xcoder.transcode(alien_node_id)
381 verbose('update_slice_attributes: unable to locate node',
382 alien_object['node_id'])
385 # locate the local SliceAttribute if any
387 verbose ('searching name=', alien_object['name'],
388 'slice_id',slice_id, 'node_id',node_id)
389 local_object = SliceAttributes (self.api,
390 {'name':alien_object['name'],
392 'node_id':node_id})[0]
394 if local_object['peer_id'] != peer_id:
395 verbose ('FOUND local sa - skipped')
397 verbose('FOUND already cached sa - setting value')
398 local_object['value'] = alien_object['value']
399 # create it if missing
401 local_object = SliceAttribute(self.api,
405 'attribute_type_id':sat_id,
406 'value':alien_object['value']})
407 verbose('CREATED new sa')
408 local_object.uptodate=True
410 local_object.sync(False)
412 for local_object in local_objects:
413 if not local_object.uptodate:
414 local_object.delete()
417 ### return delta in number of objects
418 return new_count-old_count
420 def refresh_peer (self):
422 # so as to minimize the numer of requests
423 # we get all objects in a single call and sort afterwards
424 # xxx ideally get objects either local or the ones attached here
425 # requires to know remote peer's peer_id for ourselves, mmhh..
426 # does not make any difference in a 2-peer deployment though
428 ### uses GetPeerData to gather all info in a single xmlrpc request
432 # xxx see also GetPeerData - peer_id arg unused yet
433 all_data = self.peer_server.GetPeerData (self.auth,self.api.config.PLC_NAME)
435 verbose ('Passed GetPeerData the name',self.api.config.PLC_NAME)
441 verbose ('GetPeerData[%s] -> %d'%(k,len(f)))
445 t_acquired = time.time()
447 plocal_sites = all_data['Sites-local']
448 all_sites = plocal_sites + all_data['Sites-peer']
449 nb_new_sites = self.update_table('Site', plocal_sites)
452 timers['process-sites']=t0-t_acquired
456 plocal_keys = all_data['Keys-local']
457 all_keys = plocal_keys + all_data['Keys-peer']
458 nb_new_keys = self.update_table('Key', plocal_keys)
461 timers['process-keys']=t-t0
465 plocal_nodes = all_data['Nodes-local']
466 all_nodes = plocal_nodes + all_data['Nodes-peer']
467 nb_new_nodes = self.update_table('Node', plocal_nodes,
468 { 'Site' : all_sites } )
471 timers['process-nodes']=t-t0
475 plocal_persons = all_data['Persons-local']
476 all_persons = plocal_persons + all_data['Persons-peer']
477 nb_new_persons = self.update_table ('Person', plocal_persons,
478 { 'Key': all_keys, 'Site' : all_sites } )
481 timers['process-persons']=t-t0
484 # refresh slice attribute types
485 plocal_slice_attribute_types = all_data ['SliceAttibuteTypes-local']
486 nb_new_slice_attribute_types = self.update_table ('SliceAttributeType',
487 plocal_slice_attribute_types,
488 report_name_conflicts = False)
491 timers['process-sat']=t-t0
495 plocal_slices = all_data['Slices-local']
496 all_slices = plocal_slices + all_data['Slices-peer']
498 # forget about ignoring remote system slices
499 # just update them too, we'll be able to filter them out later in GetSlivers
501 nb_new_slices = self.update_table ('Slice', plocal_slices,
503 'Person': all_persons,
507 timers['process-slices']=t-t0
510 # refresh slice attributes
511 plocal_slice_attributes = all_data ['SliceAttributes-local']
512 nb_new_slice_attributes = self.update_slice_attributes (plocal_slice_attributes,
516 timers['process-sa']=t-t0
521 timers['time_gather'] = all_data['ellapsed']
522 timers['time_transmit'] = t_acquired-t_start-all_data['ellapsed']
523 timers['time_process'] = t_end-t_acquired
524 timers['time_all'] = t_end-t_start
526 ### returned as-is by RefreshPeer
528 'new_sites':nb_new_sites,
529 'new_keys':nb_new_keys,
530 'new_nodes':nb_new_nodes,
531 'new_persons':nb_new_persons,
532 'new_slice_attribute_types':nb_new_slice_attribute_types,
533 'new_slices':nb_new_slices,
534 'new_slice_attributes':nb_new_slice_attributes,