This commit was manufactured by cvs2svn to create branch
[plcapi.git] / PLC / Cache.py
1 import time
2
3 from PLC.Faults import *
4 from PLC.Parameter import Parameter
5 from PLC.Filter import Filter
6 from PLC.Table import Row, Table
7
8 verbose_flag=False;
9 #verbose_flag=True;
10 def verbose (*args):
11     if verbose_flag:
12         print (args)
13
14 def class_attributes (classname):
15     """ locates various attributes defined in the row class """
16     topmodule = __import__ ('PLC.%ss'%classname)
17     module = topmodule.__dict__['%ss'%classname]
18     # local row-like class, e.g. Node
19     row_class = module.__dict__['%s'%classname]
20     # local tab-like class, e.g. Nodes
21     table_class = module.__dict__['%ss'%classname]
22
23     return {'row_class':row_class, 
24             'table_class':table_class,
25             'primary_key': row_class.__dict__['primary_key'],
26             'class_key': row_class.__dict__['class_key'],
27             'foreign_fields': row_class.__dict__['foreign_fields'],
28             'foreign_xrefs': row_class.__dict__['foreign_xrefs'],
29             }
30
31 class Cache:
32
33     """
34     This class is the core of the RefreshPeer method's implementation,
35     that basically calls Cache:refresh_peer
36
37     It manages caching on a table-by-table basis, and performs required
38     transcoding from remote ids to local ids - that's the job of the
39     Transcoder class
40
41     For the tables (classes) that it handles, it uses the following
42     attributes
43     (*) primary_key is the name of the field where db indexes are stored
44     (*) class_key is used to implement equality on objects
45     (*) foreign_fields is the list of fields that are copied verbatim from
46         foreign objects
47     (*) foreign_xrefs is the list of columns that are subject to transcoding
48         (.) when obj[field] is an int, the transcoded value is directly
49         inserted in the table
50         (.) if it's a list, it is assumed that the relationship is maintained
51             in an external 2-column table. That's where the XrefTable class comes in
52
53     The relationship between slices, slice attribute types and slice attributes being
54     more complex, it is handled in a specifically customized version of update_table
55
56     Of course the order in which the tables are managed is important, with different
57     orders several iterations might be needed for the update process to converge.
58
59     Of course the timers field was introduced for optimization and could safely be removed
60     """
61     
62     def __init__ (self, api, peer_id, peer_server):
63
64         self.api = api
65         self.peer_id = peer_id
66         self.peer_server = peer_server
67         
68     class Transcoder:
69
70         def __init__ (self, api, classname, alien_objects):
71             self.api = api
72             attrs = class_attributes (classname)
73             self.primary_key = attrs['primary_key']
74             self.class_key = attrs['class_key']
75
76             # cannot use dict, it's acquired by xmlrpc and is untyped
77             self.alien_objects_byid = dict( [ (x[self.primary_key],x) for x in alien_objects ] )
78
79             # retrieve local objects
80             local_objects = attrs['table_class'] (api)
81             self.local_objects_byname = local_objects.dict(self.class_key)
82
83             verbose ('Transcoder init :',classname,
84                      self.alien_objects_byid.keys(),
85                      self.local_objects_byname.keys())
86
87         def transcode (self, alien_id):
88             """ transforms an alien id into a local one """
89             # locate alien obj from alien_id
90             #verbose ('.entering transcode with alien_id',alien_id,)
91             alien_object=self.alien_objects_byid[alien_id]
92             #verbose ('..located alien_obj',)
93             name = alien_object [self.class_key]
94             #verbose ('...got name',name,)
95             local_object=self.local_objects_byname[name]
96             #verbose ('....found local obj')
97             local_id=local_object[self.primary_key]
98             #verbose ('.....and local_id',local_id)
99             return local_id
100             
101
102     # for handling simple n-to-n relation tables, like e.g. slice_node
103     class XrefTable: 
104
105         def __init__ (self, api, tablename, class1, class2):
106             self.api = api
107             self.tablename = tablename
108             self.lowerclass1 = class1.lower()
109             self.lowerclass2 = class2.lower()
110
111         def delete_old_items (self, id1, id2_set):
112             if id2_set:
113                 sql = ""
114                 sql += "DELETE FROM %s WHERE %s_id=%d"%(self.tablename,self.lowerclass1,id1)
115                 sql += " AND %s_id IN ("%self.lowerclass2
116                 sql += ",".join([str(i) for i in id2_set])
117                 sql += ")"
118                 self.api.db.do (sql)
119
120         def insert_new_items (self, id1, id2_set):
121             ### xxx needs to be optimized
122             ### tried to figure a way to use a single sql statement
123             ### like: insert into table (x,y) values (1,2),(3,4);
124             ### but apparently this is not supported under postgresql
125             for id2 in id2_set:
126                 sql = "INSERT INTO %s (%s_id,%s_id) VALUES (%d,%d)"% \
127                       (self.tablename,self.lowerclass1,self.lowerclass2,id1,id2)
128
129 # below is Tony's code but it's badly broken. I'm not sure we care much in fact.
130 #           if id2_set:
131 #               sql = "INSERT INTO %s select %d, %d " % \
132 #                       self.tablename, id1, id2[0] 
133 #               for id2 in id2_set[1:]:
134 #                       sql += " UNION ALL SELECT %d, %d " % \
135 #                       (id1,id2)
136
137                 self.api.db.do (sql)
138
139         def update_item (self, id1, old_id2s, new_id2s):
140             news = set (new_id2s)
141             olds = set (old_id2s)
142             to_delete = olds-news
143             self.delete_old_items (id1, to_delete)
144             to_create = news-olds
145             self.insert_new_items (id1, to_create)
146             self.api.db.commit()
147             
148     # classname: the type of objects we are talking about;       e.g. 'Slice'
149     # peer_object_list list of objects at a given peer -         e.g. peer.GetSlices()
150     # alien_xref_objs_dict : a dict {'classname':alien_obj_list} e.g. {'Node':peer.GetNodes()}
151     #    we need an entry for each class mentioned in the class's foreign_xrefs
152     def update_table (self,
153                       classname,
154                       alien_object_list,
155                       alien_xref_objs_dict = {},
156                       report_name_conflicts = True):
157         
158         verbose ("============================== entering update_table on",classname)
159         peer_id=self.peer_id
160
161         attrs = class_attributes (classname)
162         row_class = attrs['row_class']
163         table_class = attrs['table_class']
164         primary_key = attrs['primary_key']
165         class_key = attrs['class_key']
166         foreign_fields = attrs['foreign_fields']
167         foreign_xrefs = attrs['foreign_xrefs']
168
169         ## allocate transcoders and xreftables once, for each item in foreign_xrefs
170         # create a dict 'classname' -> {'transcoder' : ..., 'xref_table' : ...}
171         xref_accessories = dict(
172             [ (xref['field'],
173                {'transcoder' : Cache.Transcoder (self.api,xref['class'],alien_xref_objs_dict[xref['class']]),
174                 'xref_table' : Cache.XrefTable (self.api,xref['table'],classname,xref['class'])})
175               for xref in foreign_xrefs ])
176
177         # the fields that are direct references, like e.g. site_id in Node
178         # determined lazily, we need an alien_object to do that, and we may have none here
179         direct_ref_fields = None
180
181         ### get current local table
182         # get ALL local objects so as to cope with
183         # (*) potential moves between plcs
184         # (*) or naming conflicts
185         local_objects = table_class (self.api)
186         ### index upon class_key for future searches
187         local_objects_index = local_objects.dict(class_key)
188
189         ### mark entries for this peer outofdate
190         new_count=0
191         old_count=0;
192         for local_object in local_objects:
193             if local_object['peer_id'] == peer_id:
194                 local_object.uptodate=False
195                 old_count += 1
196             else:
197                 local_object.uptodate=True
198
199         for alien_object in alien_object_list:
200             verbose ('+++ Got alien object',alien_object)
201
202         # scan the peer's local objects
203         for alien_object in alien_object_list:
204
205             object_name = alien_object[class_key]
206
207             verbose ('----- update_table (%s) - considering'%classname,object_name)
208
209             # optimizing : avoid doing useless syncs
210             needs_sync = False
211
212             # create or update
213             try:
214                 ### We know about this object already
215                 local_object = local_objects_index[object_name]
216                 if local_object ['peer_id'] is None:
217                     if report_name_conflicts:
218                         ### xxx send e-mail
219                         print '!!!!!!!!!! We are in trouble here'
220                         print 'The %s object named %s is natively defined twice, '%(classname,object_name),
221                         print 'once on this PLC and once on peer %d'%peer_id
222                         print 'We dont raise an exception so that the remaining updates can still take place'
223                         print '!!!!!!!!!!'
224                     continue
225                 if local_object['peer_id'] != peer_id:
226                     ### the object has changed its plc, 
227                     ### Note, this is not problematic here because both definitions are remote
228                     ### we can assume the object just moved
229                     ### needs to update peer_id though
230                     local_object['peer_id'] = peer_id
231                     needs_sync = True
232                 # update all fields as per foreign_fields
233                 for field in foreign_fields:
234                     if (local_object[field] != alien_object [field]):
235                         local_object[field]=alien_object[field]
236                         needs_sync = True
237                 verbose ('update_table FOUND',object_name)
238             except:
239                 ### create a new entry
240                 local_object = row_class(self.api,
241                                           {class_key :object_name,'peer_id':peer_id})
242                 # insert in index
243                 local_objects_index[class_key]=local_object
244                 verbose ('update_table CREATED',object_name)
245                 # update all fields as per foreign_fields
246                 for field in foreign_fields:
247                     local_object[field]=alien_object[field]
248                 # this is tricky; at this point we may have primary_key unspecified,
249                 # but we need it for handling xrefs below, so we'd like to sync to get one
250                 # on the other hand some required fields may be still missing so
251                 #  the DB would refuse to sync in this case (e.g. site_id in Node)
252                 # so let's fill them with 1 so we can sync, this will be overridden below
253                 # lazily determine this set of fields now
254                 if direct_ref_fields is None:
255                     direct_ref_fields=[]
256                     for xref in foreign_xrefs:
257                         field=xref['field']
258                         #verbose('checking field %s for direct_ref'%field)
259                         if isinstance(alien_object[field],int):
260                             direct_ref_fields.append(field)
261                     verbose("FOUND DIRECT REFS",direct_ref_fields)
262                 for field in direct_ref_fields:
263                     local_object[field]=1
264                 verbose('Early sync on',local_object)
265                 local_object.sync()
266                 needs_sync = False
267
268             # this row is now valid
269             local_object.uptodate=True
270             new_count += 1
271
272             # manage cross-refs
273             for xref in foreign_xrefs:
274                 field=xref['field']
275                 alien_xref_obj_list = alien_xref_objs_dict[xref['class']]
276                 alien_value = alien_object[field]
277                 transcoder = xref_accessories[xref['field']]['transcoder']
278                 if isinstance (alien_value,list):
279                     #verbose ('update_table list-transcoding ',xref['class'],' aliens=',alien_value,)
280                     local_values=[]
281                     for a in alien_value:
282                         try:
283                             local_values.append(transcoder.transcode(a))
284                         except:
285                             # could not transcode - might be from another peer that we dont know about..
286                             pass
287                     #verbose (" transcoded as ",local_values)
288                     xref_table = xref_accessories[xref['field']]['xref_table']
289                     # newly created objects dont have xref fields set yet
290                     try:
291                         former_xrefs=local_object[xref['field']]
292                     except:
293                         former_xrefs=[]
294                     xref_table.update_item (local_object[primary_key],
295                                             former_xrefs,
296                                             local_values)
297                 elif isinstance (alien_value,int):
298                     #verbose ('update_table atom-transcoding ',xref['class'],' aliens=',alien_value,)
299                     new_value = transcoder.transcode(alien_value)
300                     if local_object[field] != new_value:
301                         local_object[field] = new_value
302                         needs_sync = True
303
304             ### this object is completely updated, let's save it
305             if needs_sync:
306                 verbose('FINAL sync on %s:'%object_name,local_object)
307                 local_object.sync(False)
308                     
309         ### delete entries that are not uptodate
310         for local_object in local_objects:
311             if not local_object.uptodate:
312                 local_object.delete()
313
314         self.api.db.commit()
315
316         ### return delta in number of objects 
317         return new_count-old_count
318
319     # slice attributes exhibit a special behaviour
320     # because there is no name we can use to retrieve/check for equality
321     # this object is like a 3-part xref, linking slice_attribute_type, slice,
322     #    and potentially node, together with a value that can change over time.
323     # extending the generic model to support a lambda rather than class_key
324     #    would clearly become overkill
325     def update_slice_attributes (self,
326                                  alien_slice_attributes,
327                                  alien_nodes,
328                                  alien_slices):
329
330         from PLC.SliceAttributeTypes import SliceAttributeTypes
331         from PLC.SliceAttributes import SliceAttribute, SliceAttributes
332
333         verbose ("============================== entering update_slice_attributes")
334
335         # init
336         peer_id = self.peer_id
337         
338         # create transcoders
339         node_xcoder = Cache.Transcoder (self.api, 'Node', alien_nodes)
340         slice_xcoder= Cache.Transcoder (self.api, 'Slice', alien_slices)
341         # no need to transcode SliceAttributeTypes, we have a name in the result
342         local_sat_dict = SliceAttributeTypes(self.api).dict('name')
343                
344         # load local objects
345         local_objects = SliceAttributes (self.api,{'peer_id':peer_id})
346
347         ### mark entries for this peer outofdate
348         new_count = 0
349         old_count=len(local_objects)
350         for local_object in local_objects:
351             local_object.uptodate=False
352
353         for alien_object in alien_slice_attributes:
354
355             verbose('----- update_slice_attributes: considering ...')
356             verbose('   ',alien_object)
357
358             # locate local slice
359             try:
360                 slice_id = slice_xcoder.transcode(alien_object['slice_id'])
361             except:
362                 verbose('update_slice_attributes: unable to locate slice',
363                         alien_object['slice_id'])
364                 continue
365             # locate slice_attribute_type
366             try:
367                 sat_id = local_sat_dict[alien_object['name']]['attribute_type_id']
368             except:
369                 verbose('update_slice_attributes: unable to locate slice attribute type',
370                         alien_object['name'])
371                 continue
372             # locate local node if specified
373             try:
374                 alien_node_id = alien_object['node_id']
375                 if alien_node_id is not None:
376                     node_id = node_xcoder.transcode(alien_node_id)
377                 else:
378                     node_id=None
379             except:
380                 verbose('update_slice_attributes: unable to locate node',
381                         alien_object['node_id'])
382                 continue
383
384             # locate the local SliceAttribute if any
385             try:
386                 verbose ('searching name=', alien_object['name'],
387                          'slice_id',slice_id, 'node_id',node_id)
388                 local_object = SliceAttributes (self.api,
389                                                 {'name':alien_object['name'],
390                                                  'slice_id':slice_id,
391                                                  'node_id':node_id})[0]
392                 
393                 if local_object['peer_id'] != peer_id:
394                     verbose ('FOUND local sa - skipped')
395                     continue
396                 verbose('FOUND already cached sa - setting value')
397                 local_object['value'] = alien_object['value']
398             # create it if missing
399             except:
400                 local_object = SliceAttribute(self.api,
401                                               {'peer_id':peer_id,
402                                                'slice_id':slice_id,
403                                                'node_id':node_id,
404                                                'attribute_type_id':sat_id,
405                                                'value':alien_object['value']})
406                 verbose('CREATED new sa')
407             local_object.uptodate=True
408             new_count += 1
409             local_object.sync(False)
410
411         for local_object in local_objects:
412             if not local_object.uptodate:
413                 local_object.delete()
414
415         self.api.db.commit()
416         ### return delta in number of objects 
417         return new_count-old_count
418
419     def refresh_peer (self):
420         
421         # so as to minimize the numer of requests
422         # we get all objects in a single call and sort afterwards
423         # xxx ideally get objects either local or the ones attached here
424         # requires to know remote peer's peer_id for ourselves, mmhh..
425         # does not make any difference in a 2-peer deployment though
426
427         ### uses GetPeerData to gather all info in a single xmlrpc request
428
429         timers={}
430         t_start=time.time()
431         # xxx see also GetPeerData - peer_id arg unused yet
432         all_data = self.peer_server.GetPeerData (self.api.config.PLC_NAME)
433
434         verbose ('Passed GetPeerData the name',self.api.config.PLC_NAME)
435         sks=all_data.keys()
436         sks.sort()
437         for k in sks:
438             f=all_data[k]
439             try:
440                 verbose ('GetPeerData[%s] -> %d'%(k,len(f)))
441             except:
442                 pass
443
444         t_acquired = time.time()
445         # refresh sites
446         plocal_sites = all_data['Sites-local']
447         all_sites = plocal_sites + all_data['Sites-peer']
448         nb_new_sites = self.update_table('Site', plocal_sites)
449
450         t0 = time.time()
451         timers['process-sites']=t0-t_acquired
452         
453
454         # refresh keys
455         plocal_keys = all_data['Keys-local']
456         all_keys = plocal_keys + all_data['Keys-peer']
457         nb_new_keys = self.update_table('Key', plocal_keys)
458
459         t=time.time()
460         timers['process-keys']=t-t0
461         t0=t
462
463         # refresh nodes
464         plocal_nodes = all_data['Nodes-local']
465         all_nodes = plocal_nodes + all_data['Nodes-peer']
466         nb_new_nodes = self.update_table('Node', plocal_nodes,
467                                          { 'Site' : all_sites } )
468
469         t=time.time()
470         timers['process-nodes']=t-t0
471         t0=t
472
473         # refresh persons
474         plocal_persons = all_data['Persons-local']
475         all_persons = plocal_persons + all_data['Persons-peer']
476         nb_new_persons = self.update_table ('Person', plocal_persons,
477                                             { 'Key': all_keys, 'Site' : all_sites } )
478
479         t=time.time()
480         timers['process-persons']=t-t0
481         t0=t
482
483         # refresh slice attribute types
484         plocal_slice_attribute_types = all_data ['SliceAttibuteTypes-local']
485         nb_new_slice_attribute_types = self.update_table ('SliceAttributeType',
486                                                           plocal_slice_attribute_types,
487                                                           report_name_conflicts = False)
488
489         t=time.time()
490         timers['process-sat']=t-t0
491         t0=t
492
493         # refresh slices
494         plocal_slices = all_data['Slices-local']
495         all_slices = plocal_slices + all_data['Slices-peer']
496
497         # forget about ignoring remote system slices
498         # just update them too, we'll be able to filter them out later in GetSlivers
499
500         nb_new_slices = self.update_table ('Slice', plocal_slices,
501                                            {'Node': all_nodes,
502                                             'Person': all_persons,
503                                             'Site': all_sites})
504
505         t=time.time()
506         timers['process-slices']=t-t0
507         t0=t
508
509         # refresh slice attributes
510         plocal_slice_attributes = all_data ['SliceAttributes-local']
511         nb_new_slice_attributes = self.update_slice_attributes (plocal_slice_attributes,
512                                                                 all_nodes,
513                                                                 all_slices)
514         t=time.time()
515         timers['process-sa']=t-t0
516         t0=t
517         
518         t_end=time.time()
519
520         timers['time_gather']   = all_data['ellapsed']
521         timers['time_transmit'] = t_acquired-t_start-all_data['ellapsed']
522         timers['time_process']  = t_end-t_acquired
523         timers['time_all']      = t_end-t_start
524         
525         ### returned as-is by RefreshPeer
526         return {
527                 'new_sites':nb_new_sites,
528                 'new_keys':nb_new_keys,
529                 'new_nodes':nb_new_nodes,
530                 'new_persons':nb_new_persons,
531                 'new_slice_attribute_types':nb_new_slice_attribute_types,
532                 'new_slices':nb_new_slices,
533                 'new_slice_attributes':nb_new_slice_attributes,
534                 'timers':timers,
535                 }
536