3 from PLC.Faults import *
4 from PLC.Parameter import Parameter
5 from PLC.Filter import Filter
6 from PLC.Table import Row, Table
14 def class_attributes (classname):
15 """ locates various attributes defined in the row class """
16 topmodule = __import__ ('PLC.%ss'%classname)
17 module = topmodule.__dict__['%ss'%classname]
18 # local row-like class, e.g. Node
19 row_class = module.__dict__['%s'%classname]
20 # local tab-like class, e.g. Nodes
21 table_class = module.__dict__['%ss'%classname]
23 return {'row_class':row_class,
24 'table_class':table_class,
25 'primary_key': row_class.__dict__['primary_key'],
26 'class_key': row_class.__dict__['class_key'],
27 'foreign_fields': row_class.__dict__['foreign_fields'],
28 'foreign_xrefs': row_class.__dict__['foreign_xrefs'],
34 This class is the core of the RefreshPeer method's implementation,
35 that basically calls Cache:refresh_peer
37 It manages caching on a table-by-table basis, and performs required
38 transcoding from remote ids to local ids - that's the job of the
41 For the tables (classes) that it handles, it uses the following
43 (*) primary_key is the name of the field where db indexes are stored
44 (*) class_key is used to implement equality on objects
45 (*) foreign_fields is the list of fields that are copied verbatim from
47 (*) foreign_xrefs is the list of columns that are subject to transcoding
48 (.) when obj[field] is an int, the transcoded value is directly
50 (.) if it's a list, it is assumed that the relationship is maintained
51 in an external 2-column table. That's where the XrefTable class comes in
53 The relationship between slices, slice attribute types and slice attributes being
54 more complex, it is handled in a specifically customized version of update_table
56 Of course the order in which the tables are managed is important, with different
57 orders several iterations might be needed for the update process to converge.
59 Of course the timers field was introduced for optimization and could safely be removed
62 def __init__ (self, api, peer_id, peer_server):
65 self.peer_id = peer_id
66 self.peer_server = peer_server
70 def __init__ (self, api, classname, alien_objects):
72 attrs = class_attributes (classname)
73 self.primary_key = attrs['primary_key']
74 self.class_key = attrs['class_key']
76 # cannot use dict, it's acquired by xmlrpc and is untyped
77 self.alien_objects_byid = dict( [ (x[self.primary_key],x) for x in alien_objects ] )
79 # retrieve local objects
80 local_objects = attrs['table_class'] (api)
81 self.local_objects_byname = local_objects.dict(self.class_key)
83 verbose ('Transcoder init :',classname,
84 self.alien_objects_byid.keys(),
85 self.local_objects_byname.keys())
87 def transcode (self, alien_id):
88 """ transforms an alien id into a local one """
89 # locate alien obj from alien_id
90 #verbose ('.entering transcode with alien_id',alien_id,)
91 alien_object=self.alien_objects_byid[alien_id]
92 #verbose ('..located alien_obj',)
93 name = alien_object [self.class_key]
94 #verbose ('...got name',name,)
95 local_object=self.local_objects_byname[name]
96 #verbose ('....found local obj')
97 local_id=local_object[self.primary_key]
98 #verbose ('.....and local_id',local_id)
102 # for handling simple n-to-n relation tables, like e.g. slice_node
105 def __init__ (self, api, tablename, class1, class2):
107 self.tablename = tablename
108 self.lowerclass1 = class1.lower()
109 self.lowerclass2 = class2.lower()
111 def delete_old_items (self, id1, id2_set):
114 sql += "DELETE FROM %s WHERE %s_id=%d"%(self.tablename,self.lowerclass1,id1)
115 sql += " AND %s_id IN ("%self.lowerclass2
116 sql += ",".join([str(i) for i in id2_set])
120 def insert_new_items (self, id1, id2_set):
121 ### xxx needs to be optimized
122 ### tried to figure a way to use a single sql statement
123 ### like: insert into table (x,y) values (1,2),(3,4);
124 ### but apparently this is not supported under postgresql
126 sql = "INSERT INTO %s (%s_id,%s_id) VALUES (%d,%d)"% \
127 (self.tablename,self.lowerclass1,self.lowerclass2,id1,id2)
129 # below is Tony's code but it's badly broken. I'm not sure we care much in fact.
131 # sql = "INSERT INTO %s select %d, %d " % \
132 # self.tablename, id1, id2[0]
133 # for id2 in id2_set[1:]:
134 # sql += " UNION ALL SELECT %d, %d " % \
139 def update_item (self, id1, old_id2s, new_id2s):
140 news = set (new_id2s)
141 olds = set (old_id2s)
142 to_delete = olds-news
143 self.delete_old_items (id1, to_delete)
144 to_create = news-olds
145 self.insert_new_items (id1, to_create)
148 # classname: the type of objects we are talking about; e.g. 'Slice'
149 # peer_object_list list of objects at a given peer - e.g. peer.GetSlices()
150 # alien_xref_objs_dict : a dict {'classname':alien_obj_list} e.g. {'Node':peer.GetNodes()}
151 # we need an entry for each class mentioned in the class's foreign_xrefs
152 def update_table (self,
155 alien_xref_objs_dict = {},
156 report_name_conflicts = True):
158 verbose ("============================== entering update_table on",classname)
161 attrs = class_attributes (classname)
162 row_class = attrs['row_class']
163 table_class = attrs['table_class']
164 primary_key = attrs['primary_key']
165 class_key = attrs['class_key']
166 foreign_fields = attrs['foreign_fields']
167 foreign_xrefs = attrs['foreign_xrefs']
169 ## allocate transcoders and xreftables once, for each item in foreign_xrefs
170 # create a dict 'classname' -> {'transcoder' : ..., 'xref_table' : ...}
171 xref_accessories = dict(
173 {'transcoder' : Cache.Transcoder (self.api,xref['class'],alien_xref_objs_dict[xref['class']]),
174 'xref_table' : Cache.XrefTable (self.api,xref['table'],classname,xref['class'])})
175 for xref in foreign_xrefs ])
177 # the fields that are direct references, like e.g. site_id in Node
178 # determined lazily, we need an alien_object to do that, and we may have none here
179 direct_ref_fields = None
181 ### get current local table
182 # get ALL local objects so as to cope with
183 # (*) potential moves between plcs
184 # (*) or naming conflicts
185 local_objects = table_class (self.api)
186 ### index upon class_key for future searches
187 local_objects_index = local_objects.dict(class_key)
189 ### mark entries for this peer outofdate
192 for local_object in local_objects:
193 if local_object['peer_id'] == peer_id:
194 local_object.uptodate=False
197 local_object.uptodate=True
199 for alien_object in alien_object_list:
200 verbose ('+++ Got alien object',alien_object)
202 # scan the peer's local objects
203 for alien_object in alien_object_list:
205 object_name = alien_object[class_key]
207 verbose ('----- update_table (%s) - considering'%classname,object_name)
209 # optimizing : avoid doing useless syncs
214 ### We know about this object already
215 local_object = local_objects_index[object_name]
216 if local_object ['peer_id'] is None:
217 if report_name_conflicts:
219 print '!!!!!!!!!! We are in trouble here'
220 print 'The %s object named %s is natively defined twice, '%(classname,object_name),
221 print 'once on this PLC and once on peer %d'%peer_id
222 print 'We dont raise an exception so that the remaining updates can still take place'
225 if local_object['peer_id'] != peer_id:
226 ### the object has changed its plc,
227 ### Note, this is not problematic here because both definitions are remote
228 ### we can assume the object just moved
229 ### needs to update peer_id though
230 local_object['peer_id'] = peer_id
232 # update all fields as per foreign_fields
233 for field in foreign_fields:
234 if (local_object[field] != alien_object [field]):
235 local_object[field]=alien_object[field]
237 verbose ('update_table FOUND',object_name)
239 ### create a new entry
240 local_object = row_class(self.api,
241 {class_key :object_name,'peer_id':peer_id})
243 local_objects_index[class_key]=local_object
244 verbose ('update_table CREATED',object_name)
245 # update all fields as per foreign_fields
246 for field in foreign_fields:
247 local_object[field]=alien_object[field]
248 # this is tricky; at this point we may have primary_key unspecified,
249 # but we need it for handling xrefs below, so we'd like to sync to get one
250 # on the other hand some required fields may be still missing so
251 # the DB would refuse to sync in this case (e.g. site_id in Node)
252 # so let's fill them with 1 so we can sync, this will be overridden below
253 # lazily determine this set of fields now
254 if direct_ref_fields is None:
256 for xref in foreign_xrefs:
258 #verbose('checking field %s for direct_ref'%field)
259 if isinstance(alien_object[field],int):
260 direct_ref_fields.append(field)
261 verbose("FOUND DIRECT REFS",direct_ref_fields)
262 for field in direct_ref_fields:
263 local_object[field]=1
264 verbose('Early sync on',local_object)
268 # this row is now valid
269 local_object.uptodate=True
273 for xref in foreign_xrefs:
275 alien_xref_obj_list = alien_xref_objs_dict[xref['class']]
276 alien_value = alien_object[field]
277 transcoder = xref_accessories[xref['field']]['transcoder']
278 if isinstance (alien_value,list):
279 #verbose ('update_table list-transcoding ',xref['class'],' aliens=',alien_value,)
281 for a in alien_value:
283 local_values.append(transcoder.transcode(a))
285 # could not transcode - might be from another peer that we dont know about..
287 #verbose (" transcoded as ",local_values)
288 xref_table = xref_accessories[xref['field']]['xref_table']
289 # newly created objects dont have xref fields set yet
291 former_xrefs=local_object[xref['field']]
294 xref_table.update_item (local_object[primary_key],
297 elif isinstance (alien_value,int):
298 #verbose ('update_table atom-transcoding ',xref['class'],' aliens=',alien_value,)
299 new_value = transcoder.transcode(alien_value)
300 if local_object[field] != new_value:
301 local_object[field] = new_value
304 ### this object is completely updated, let's save it
306 verbose('FINAL sync on %s:'%object_name,local_object)
307 local_object.sync(False)
309 ### delete entries that are not uptodate
310 for local_object in local_objects:
311 if not local_object.uptodate:
312 local_object.delete()
316 ### return delta in number of objects
317 return new_count-old_count
319 # slice attributes exhibit a special behaviour
320 # because there is no name we can use to retrieve/check for equality
321 # this object is like a 3-part xref, linking slice_attribute_type, slice,
322 # and potentially node, together with a value that can change over time.
323 # extending the generic model to support a lambda rather than class_key
324 # would clearly become overkill
325 def update_slice_attributes (self,
326 alien_slice_attributes,
330 from PLC.SliceAttributeTypes import SliceAttributeTypes
331 from PLC.SliceAttributes import SliceAttribute, SliceAttributes
333 verbose ("============================== entering update_slice_attributes")
336 peer_id = self.peer_id
339 node_xcoder = Cache.Transcoder (self.api, 'Node', alien_nodes)
340 slice_xcoder= Cache.Transcoder (self.api, 'Slice', alien_slices)
341 # no need to transcode SliceAttributeTypes, we have a name in the result
342 local_sat_dict = SliceAttributeTypes(self.api).dict('name')
345 local_objects = SliceAttributes (self.api,{'peer_id':peer_id})
347 ### mark entries for this peer outofdate
349 old_count=len(local_objects)
350 for local_object in local_objects:
351 local_object.uptodate=False
353 for alien_object in alien_slice_attributes:
355 verbose('----- update_slice_attributes: considering ...')
356 verbose(' ',alien_object)
360 slice_id = slice_xcoder.transcode(alien_object['slice_id'])
362 verbose('update_slice_attributes: unable to locate slice',
363 alien_object['slice_id'])
365 # locate slice_attribute_type
367 sat_id = local_sat_dict[alien_object['name']]['attribute_type_id']
369 verbose('update_slice_attributes: unable to locate slice attribute type',
370 alien_object['name'])
372 # locate local node if specified
374 alien_node_id = alien_object['node_id']
375 if alien_node_id is not None:
376 node_id = node_xcoder.transcode(alien_node_id)
380 verbose('update_slice_attributes: unable to locate node',
381 alien_object['node_id'])
384 # locate the local SliceAttribute if any
386 verbose ('searching name=', alien_object['name'],
387 'slice_id',slice_id, 'node_id',node_id)
388 local_object = SliceAttributes (self.api,
389 {'name':alien_object['name'],
391 'node_id':node_id})[0]
393 if local_object['peer_id'] != peer_id:
394 verbose ('FOUND local sa - skipped')
396 verbose('FOUND already cached sa - setting value')
397 local_object['value'] = alien_object['value']
398 # create it if missing
400 local_object = SliceAttribute(self.api,
404 'attribute_type_id':sat_id,
405 'value':alien_object['value']})
406 verbose('CREATED new sa')
407 local_object.uptodate=True
409 local_object.sync(False)
411 for local_object in local_objects:
412 if not local_object.uptodate:
413 local_object.delete()
416 ### return delta in number of objects
417 return new_count-old_count
419 def refresh_peer (self):
421 # so as to minimize the numer of requests
422 # we get all objects in a single call and sort afterwards
423 # xxx ideally get objects either local or the ones attached here
424 # requires to know remote peer's peer_id for ourselves, mmhh..
425 # does not make any difference in a 2-peer deployment though
427 ### uses GetPeerData to gather all info in a single xmlrpc request
431 # xxx see also GetPeerData - peer_id arg unused yet
432 all_data = self.peer_server.GetPeerData (self.api.config.PLC_NAME)
434 verbose ('Passed GetPeerData the name',self.api.config.PLC_NAME)
440 verbose ('GetPeerData[%s] -> %d'%(k,len(f)))
444 t_acquired = time.time()
446 plocal_sites = all_data['Sites-local']
447 all_sites = plocal_sites + all_data['Sites-peer']
448 nb_new_sites = self.update_table('Site', plocal_sites)
451 timers['process-sites']=t0-t_acquired
455 plocal_keys = all_data['Keys-local']
456 all_keys = plocal_keys + all_data['Keys-peer']
457 nb_new_keys = self.update_table('Key', plocal_keys)
460 timers['process-keys']=t-t0
464 plocal_nodes = all_data['Nodes-local']
465 all_nodes = plocal_nodes + all_data['Nodes-peer']
466 nb_new_nodes = self.update_table('Node', plocal_nodes,
467 { 'Site' : all_sites } )
470 timers['process-nodes']=t-t0
474 plocal_persons = all_data['Persons-local']
475 all_persons = plocal_persons + all_data['Persons-peer']
476 nb_new_persons = self.update_table ('Person', plocal_persons,
477 { 'Key': all_keys, 'Site' : all_sites } )
480 timers['process-persons']=t-t0
483 # refresh slice attribute types
484 plocal_slice_attribute_types = all_data ['SliceAttibuteTypes-local']
485 nb_new_slice_attribute_types = self.update_table ('SliceAttributeType',
486 plocal_slice_attribute_types,
487 report_name_conflicts = False)
490 timers['process-sat']=t-t0
494 plocal_slices = all_data['Slices-local']
495 all_slices = plocal_slices + all_data['Slices-peer']
497 # forget about ignoring remote system slices
498 # just update them too, we'll be able to filter them out later in GetSlivers
500 nb_new_slices = self.update_table ('Slice', plocal_slices,
502 'Person': all_persons,
506 timers['process-slices']=t-t0
509 # refresh slice attributes
510 plocal_slice_attributes = all_data ['SliceAttributes-local']
511 nb_new_slice_attributes = self.update_slice_attributes (plocal_slice_attributes,
515 timers['process-sa']=t-t0
520 timers['time_gather'] = all_data['ellapsed']
521 timers['time_transmit'] = t_acquired-t_start-all_data['ellapsed']
522 timers['time_process'] = t_end-t_acquired
523 timers['time_all'] = t_end-t_start
525 ### returned as-is by RefreshPeer
527 'new_sites':nb_new_sites,
528 'new_keys':nb_new_keys,
529 'new_nodes':nb_new_nodes,
530 'new_persons':nb_new_persons,
531 'new_slice_attribute_types':nb_new_slice_attribute_types,
532 'new_slices':nb_new_slices,
533 'new_slice_attributes':nb_new_slice_attributes,