3 from PLC.Faults import *
4 from PLC.Parameter import Parameter
5 from PLC.Filter import Filter
6 from PLC.Table import Row, Table
14 def class_attributes (classname):
15 """ locates various attributes defined in the row class """
16 topmodule = __import__ ('PLC.%ss'%classname)
17 module = topmodule.__dict__['%ss'%classname]
18 # local row-like class, e.g. Node
19 row_class = module.__dict__['%s'%classname]
20 # local tab-like class, e.g. Nodes
21 table_class = module.__dict__['%ss'%classname]
23 return {'row_class':row_class,
24 'table_class':table_class,
25 'primary_key': row_class.__dict__['primary_key'],
26 'class_key': row_class.__dict__['class_key'],
27 'foreign_fields': row_class.__dict__['foreign_fields'],
28 'foreign_xrefs': row_class.__dict__['foreign_xrefs'],
33 # an attempt to provide genericity in the caching algorithm
35 def __init__ (self, api, peer_id, peer_server, auth):
38 self.peer_id = peer_id
39 self.peer_server = peer_server
44 def __init__ (self, api, classname, alien_objects):
46 attrs = class_attributes (classname)
47 self.primary_key = attrs['primary_key']
48 self.class_key = attrs['class_key']
50 # cannot use dict, it's acquired by xmlrpc and is untyped
51 self.alien_objects_byid = dict( [ (x[self.primary_key],x) for x in alien_objects ] )
53 # retrieve local objects
54 local_objects = attrs['table_class'] (api)
55 self.local_objects_byname = local_objects.dict(self.class_key)
57 verbose ('Transcoder init :',classname,
58 self.alien_objects_byid.keys(),
59 self.local_objects_byname.keys())
61 def transcode (self, alien_id):
62 """ transforms an alien id into a local one """
63 # locate alien obj from alien_id
64 #verbose ('.entering transcode with alien_id',alien_id,)
65 alien_object=self.alien_objects_byid[alien_id]
66 #verbose ('..located alien_obj',)
67 name = alien_object [self.class_key]
68 #verbose ('...got name',name,)
69 local_object=self.local_objects_byname[name]
70 #verbose ('....found local obj')
71 local_id=local_object[self.primary_key]
72 #verbose ('.....and local_id',local_id)
76 # for handling simple n-to-n relation tables, like e.g. slice_node
79 def __init__ (self, api, tablename, class1, class2):
81 self.tablename = tablename
82 self.lowerclass1 = class1.lower()
83 self.lowerclass2 = class2.lower()
85 def delete_old_items (self, id1, id2_set):
88 sql += "DELETE FROM %s WHERE %s_id=%d"%(self.tablename,self.lowerclass1,id1)
89 sql += " AND %s_id IN ("%self.lowerclass2
90 sql += ",".join([str(i) for i in id2_set])
94 def insert_new_items (self, id1, id2_set):
95 ### xxx needs to be optimized
96 ### tried to figure a way to use a single sql statement
97 ### like: insert into table (x,y) values (1,2),(3,4);
98 ### but apparently this is not supported under postgresql
100 sql = "INSERT INTO %s (%s_id,%s_id) VALUES (%d,%d)"% \
101 (self.tablename,self.lowerclass1,self.lowerclass2,id1,id2)
103 # below is Tony's code but it's badly broken. I'm not sure we care in fact.
105 # sql = "INSERT INTO %s select %d, %d " % \
106 # self.tablename, id1, id2[0]
107 # for id2 in id2_set[1:]:
108 # sql += " UNION ALL SELECT %d, %d " % \
113 def update_item (self, id1, old_id2s, new_id2s):
114 news = set (new_id2s)
115 olds = set (old_id2s)
116 to_delete = olds-news
117 self.delete_old_items (id1, to_delete)
118 to_create = news-olds
119 self.insert_new_items (id1, to_create)
122 # classname: the type of objects we are talking about; e.g. 'Slice'
123 # peer_object_list list of objects at a given peer - e.g. peer.GetSlices()
124 # alien_xref_objs_dict : a dict {'classname':alien_obj_list} e.g. {'Node':peer.GetNodes()}
125 # we need an entry for each class mentioned in the class's foreign_xrefs
126 # lambda_ignore : the alien objects are ignored if this returns true
127 def update_table (self,
130 alien_xref_objs_dict = {},
131 lambda_ignore=lambda x:False,
132 report_name_conflicts = True):
134 verbose ("============================== entering update_table on",classname)
137 attrs = class_attributes (classname)
138 row_class = attrs['row_class']
139 table_class = attrs['table_class']
140 primary_key = attrs['primary_key']
141 class_key = attrs['class_key']
142 foreign_fields = attrs['foreign_fields']
143 foreign_xrefs = attrs['foreign_xrefs']
145 ## allocate transcoders and xreftables once, for each item in foreign_xrefs
146 # create a dict 'classname' -> {'transcoder' : ..., 'xref_table' : ...}
147 xref_accessories = dict(
149 {'transcoder' : Cache.Transcoder (self.api,xref['class'],alien_xref_objs_dict[xref['class']]),
150 'xref_table' : Cache.XrefTable (self.api,xref['table'],classname,xref['class'])})
151 for xref in foreign_xrefs ])
153 # the fields that are direct references, like e.g. site_id in Node
154 # determined lazily, we need an alien_object to do that, and we may have none here
155 direct_ref_fields = None
157 ### get current local table
158 # get ALL local objects so as to cope with
159 # (*) potential moves between plcs
160 # (*) or naming conflicts
161 local_objects = table_class (self.api)
162 ### index upon class_key for future searches
163 local_objects_index = local_objects.dict(class_key)
165 ### mark entries for this peer outofdate
168 for local_object in local_objects:
169 if local_object['peer_id'] == peer_id:
170 local_object.uptodate=False
173 local_object.uptodate=True
175 # scan the peer's local objects
176 for alien_object in alien_object_list:
178 object_name = alien_object[class_key]
180 ### ignore, e.g. system-wide slices
181 if lambda_ignore(alien_object):
182 verbose('Ignoring',object_name)
185 verbose ('update_table (%s) - Considering'%classname,object_name)
187 # optimizing : avoid doing useless syncs
192 ### We know about this object already
193 local_object = local_objects_index[object_name]
194 if local_object ['peer_id'] is None:
195 if report_name_conflicts:
197 print '!!!!!!!!!! We are in trouble here'
198 print 'The %s object named %s is natively defined twice, '%(classname,object_name),
199 print 'once on this PLC and once on peer %d'%peer_id
200 print 'We dont raise an exception so that the remaining updates can still take place'
203 if local_object['peer_id'] != peer_id:
204 ### the object has changed its plc,
205 ### Note, this is not problematic here because both definitions are remote
206 ### we can assume the object just moved
207 ### needs to update peer_id though
208 local_object['peer_id'] = peer_id
210 # update all fields as per foreign_fields
211 for field in foreign_fields:
212 if (local_object[field] != alien_object [field]):
213 local_object[field]=alien_object[field]
215 verbose ('update_table FOUND',object_name)
217 ### create a new entry
218 local_object = row_class(self.api,
219 {class_key :object_name,'peer_id':peer_id})
221 local_objects_index[class_key]=local_object
222 verbose ('update_table CREATED',object_name)
223 # update all fields as per foreign_fields
224 for field in foreign_fields:
225 local_object[field]=alien_object[field]
226 # this is tricky; at this point we may have primary_key unspecified,
227 # but we need it for handling xrefs below, so we'd like to sync to get one
228 # on the other hand some required fields may be still missing so
229 # the DB would refuse to sync in this case (e.g. site_id in Node)
230 # so let's fill them with 1 so we can sync, this will be overridden below
231 # lazily determine this set of fields now
232 if direct_ref_fields is None:
234 for xref in foreign_xrefs:
236 #verbose('checking field %s for direct_ref'%field)
237 if isinstance(alien_object[field],int):
238 direct_ref_fields.append(field)
239 verbose("FOUND DIRECT REFS",direct_ref_fields)
240 for field in direct_ref_fields:
241 local_object[field]=1
242 verbose('Early sync on',local_object)
246 # this row is now valid
247 local_object.uptodate=True
251 for xref in foreign_xrefs:
253 alien_xref_obj_list = alien_xref_objs_dict[xref['class']]
254 alien_value = alien_object[field]
255 transcoder = xref_accessories[xref['field']]['transcoder']
256 if isinstance (alien_value,list):
257 #verbose ('update_table list-transcoding ',xref['class'],' aliens=',alien_value,)
259 for a in alien_value:
261 local_values.append(transcoder.transcode(a))
263 # could not transcode - might be from another peer that we dont know about..
265 #verbose (" transcoded as ",local_values)
266 xref_table = xref_accessories[xref['field']]['xref_table']
267 # newly created objects dont have xref fields set yet
269 former_xrefs=local_object[xref['field']]
272 xref_table.update_item (local_object[primary_key],
275 elif isinstance (alien_value,int):
276 #verbose ('update_table atom-transcoding ',xref['class'],' aliens=',alien_value,)
277 new_value = transcoder.transcode(alien_value)
278 if local_object[field] != new_value:
279 local_object[field] = new_value
282 ### this object is completely updated, let's save it
284 verbose('FINAL sync on %s:'%object_name,local_object)
285 local_object.sync(False)
287 ### delete entries that are not uptodate
288 for local_object in local_objects:
289 if not local_object.uptodate:
290 local_object.delete()
294 ### return delta in number of objects
295 return new_count-old_count
297 # slice attributes exhibit a special behaviour
298 # because there is no name we can use to retrieve/check for equality
299 # this object is like a 3-part xref, linking slice_attribute_type, slice,
300 # and potentially node, together with a value that can change over time.
301 # extending the generic model to support a lambda rather than class_key
302 # would clearly become overkill
303 def update_slice_attributes (self,
304 alien_slice_attributes,
308 from PLC.SliceAttributeTypes import SliceAttributeTypes
309 from PLC.SliceAttributes import SliceAttribute, SliceAttributes
312 peer_id = self.peer_id
315 node_xcoder = Cache.Transcoder (self.api, 'Node', alien_nodes)
316 slice_xcoder= Cache.Transcoder (self.api, 'Slice', alien_slices)
317 # no need to transcode SliceAttributeTypes, we have a name in the result
318 local_sat_dict = SliceAttributeTypes(self.api).dict('name')
321 local_objects = SliceAttributes (self.api,{'peer_id':peer_id})
323 ### mark entries for this peer outofdate
325 old_count=len(local_objects)
326 for local_object in local_objects:
327 local_object.uptodate=False
329 for alien_object in alien_slice_attributes:
331 verbose('----- update_slice_attributes: considering ...')
332 verbose(' ',alien_object)
336 slice_id = slice_xcoder.transcode(alien_object['slice_id'])
338 verbose('update_slice_attributes: unable to locate slice',
339 alien_object['slice_id'])
341 # locate slice_attribute_type
343 sat_id = local_sat_dict[alien_object['name']]['attribute_type_id']
345 verbose('update_slice_attributes: unable to locate slice attribute type',
346 alien_object['name'])
348 # locate local node if specified
350 alien_node_id = alien_object['node_id']
351 if alien_node_id is not None:
352 node_id = node_xcoder.transcode(alien_node_id)
356 verbose('update_slice_attributes: unable to locate node',
357 alien_object['node_id'])
360 # locate the local SliceAttribute if any
362 verbose ('searching name=', alien_object['name'],
363 'slice_id',slice_id, 'node_id',node_id)
364 local_object = SliceAttributes (self.api,
365 {'name':alien_object['name'],
367 'node_id':node_id})[0]
369 if local_object['peer_id'] != peer_id:
370 verbose ('FOUND local sa - skipped')
372 verbose('FOUND already cached sa - setting value')
373 local_object['value'] = alien_object['value']
374 # create it if missing
376 local_object = SliceAttribute(self.api,
380 'attribute_type_id':sat_id,
381 'value':alien_object['value']})
382 verbose('CREATED new sa')
383 local_object.uptodate=True
385 local_object.sync(False)
387 for local_object in local_objects:
388 if not local_object.uptodate:
389 local_object.delete()
392 ### return delta in number of objects
393 return new_count-old_count
395 def refresh_peer (self):
397 # so as to minimize the numer of requests
398 # we get all objects in a single call and sort afterwards
399 # xxx ideally get objects either local or the ones attached here
400 # requires to know remote peer's peer_id for ourselves, mmhh..
401 # does not make any difference in a 2-peer deployment though
403 ### uses GetPeerData to gather all info in a single xmlrpc request
407 # xxx see also GetPeerData - peer_id arg unused yet
408 all_data = self.peer_server.GetPeerData (self.auth,0)
410 t_acquired = time.time()
412 plocal_sites = all_data['Sites-local']
413 all_sites = plocal_sites + all_data['Sites-peer']
414 nb_new_sites = self.update_table('Site', plocal_sites)
417 timers['process-sites']=t0-t_acquired
421 plocal_keys = all_data['Keys-local']
422 all_keys = plocal_keys + all_data['Keys-peer']
423 nb_new_keys = self.update_table('Key', plocal_keys)
426 timers['process-keys']=t-t0
430 plocal_nodes = all_data['Nodes-local']
431 all_nodes = plocal_nodes + all_data['Nodes-peer']
432 nb_new_nodes = self.update_table('Node', plocal_nodes,
433 { 'Site' : all_sites } )
436 timers['process-nodes']=t-t0
440 plocal_persons = all_data['Persons-local']
441 all_persons = plocal_persons + all_data['Persons-peer']
442 nb_new_persons = self.update_table ('Person', plocal_persons,
443 { 'Key': all_keys, 'Site' : all_sites } )
446 timers['process-persons']=t-t0
449 # refresh slice attribute types
450 plocal_slice_attribute_types = all_data ['SliceAttibuteTypes-local']
451 nb_new_slice_attribute_types = self.update_table ('SliceAttributeType',
452 plocal_slice_attribute_types,
453 report_name_conflicts = False)
456 timers['process-sat']=t-t0
460 plocal_slices = all_data['Slices-local']
461 all_slices = plocal_slices + all_data['Slices-peer']
463 def is_system_slice (slice):
464 return slice['creator_person_id'] == 1
466 nb_new_slices = self.update_table ('Slice', plocal_slices,
468 'Person': all_persons,
473 timers['process-slices']=t-t0
476 # refresh slice attributes
477 plocal_slice_attributes = all_data ['SliceAttributes-local']
478 nb_new_slice_attributes = self.update_slice_attributes (plocal_slice_attributes,
482 timers['process-sa']=t-t0
487 timers['time_gather'] = all_data['ellapsed']
488 timers['time_transmit'] = t_acquired-t_start-all_data['ellapsed']
489 timers['time_process'] = t_end-t_acquired
490 timers['time_all'] = t_end-t_start
492 ### returned as-is by RefreshPeer
493 return {'plcname':self.api.config.PLC_NAME,
494 'new_sites':nb_new_sites,
495 'new_keys':nb_new_keys,
496 'new_nodes':nb_new_nodes,
497 'new_persons':nb_new_persons,
498 'new_slice_attribute_types':nb_new_slice_attribute_types,
499 'new_slices':nb_new_slices,
500 'new_slice_attributes':nb_new_slice_attributes,