Cache:
[plcapi.git] / PLC / Cache.py
1 import time
2
3 from PLC.Faults import *
4 from PLC.Parameter import Parameter
5 from PLC.Filter import Filter
6 from PLC.Table import Row, Table
7
8 verbose_flag=False;
9 #verbose_flag=True;
10 def verbose (*args):
11     if verbose_flag:
12         print (args)
13
14 def class_attributes (classname):
15     """ locates various attributes defined in the row class """
16     topmodule = __import__ ('PLC.%ss'%classname)
17     module = topmodule.__dict__['%ss'%classname]
18     # local row-like class, e.g. Node
19     row_class = module.__dict__['%s'%classname]
20     # local tab-like class, e.g. Nodes
21     table_class = module.__dict__['%ss'%classname]
22
23     return {'row_class':row_class, 
24             'table_class':table_class,
25             'primary_key': row_class.__dict__['primary_key'],
26             'class_key': row_class.__dict__['class_key'],
27             'foreign_fields': row_class.__dict__['foreign_fields'],
28             'foreign_xrefs': row_class.__dict__['foreign_xrefs'],
29             }
30
31 class Cache:
32
33     # an attempt to provide genericity in the caching algorithm
34     
35     def __init__ (self, api, peer_id, peer_server, auth):
36
37         self.api = api
38         self.peer_id = peer_id
39         self.peer_server = peer_server
40         self.auth = auth
41         
42     class Transcoder:
43
44         def __init__ (self, api, classname, alien_objects):
45             self.api = api
46             attrs = class_attributes (classname)
47             self.primary_key = attrs['primary_key']
48             self.class_key = attrs['class_key']
49
50             # cannot use dict, it's acquired by xmlrpc and is untyped
51             self.alien_objects_byid = dict( [ (x[self.primary_key],x) for x in alien_objects ] )
52
53             # retrieve local objects
54             local_objects = attrs['table_class'] (api)
55             self.local_objects_byname = local_objects.dict(self.class_key)
56
57             verbose ('Transcoder init :',classname,
58                      self.alien_objects_byid.keys(),
59                      self.local_objects_byname.keys())
60
61         def transcode (self, alien_id):
62             """ transforms an alien id into a local one """
63             # locate alien obj from alien_id
64             #verbose ('.entering transcode with alien_id',alien_id,)
65             alien_object=self.alien_objects_byid[alien_id]
66             #verbose ('..located alien_obj',)
67             name = alien_object [self.class_key]
68             #verbose ('...got name',name,)
69             local_object=self.local_objects_byname[name]
70             #verbose ('....found local obj')
71             local_id=local_object[self.primary_key]
72             #verbose ('.....and local_id',local_id)
73             return local_id
74             
75
76     # for handling simple n-to-n relation tables, like e.g. slice_node
77     class XrefTable: 
78
79         def __init__ (self, api, tablename, class1, class2):
80             self.api = api
81             self.tablename = tablename
82             self.lowerclass1 = class1.lower()
83             self.lowerclass2 = class2.lower()
84
85         def delete_old_items (self, id1, id2_set):
86             if id2_set:
87                 sql = ""
88                 sql += "DELETE FROM %s WHERE %s_id=%d"%(self.tablename,self.lowerclass1,id1)
89                 sql += " AND %s_id IN ("%self.lowerclass2
90                 sql += ",".join([str(i) for i in id2_set])
91                 sql += ")"
92                 self.api.db.do (sql)
93
94         def insert_new_items (self, id1, id2_set):
95             ### xxx needs to be optimized
96             ### tried to figure a way to use a single sql statement
97             ### like: insert into table (x,y) values (1,2),(3,4);
98             ### but apparently this is not supported under postgresql
99             for id2 in id2_set:
100                 sql = "INSERT INTO %s (%s_id,%s_id) VALUES (%d,%d)"% \
101                       (self.tablename,self.lowerclass1,self.lowerclass2,id1,id2)
102
103 # below is Tony's code but it's badly broken. I'm not sure we care in fact.
104 #           if id2_set:
105 #               sql = "INSERT INTO %s select %d, %d " % \
106 #                       self.tablename, id1, id2[0] 
107 #               for id2 in id2_set[1:]:
108 #                       sql += " UNION ALL SELECT %d, %d " % \
109 #                       (id1,id2)
110
111                 self.api.db.do (sql)
112
113         def update_item (self, id1, old_id2s, new_id2s):
114             news = set (new_id2s)
115             olds = set (old_id2s)
116             to_delete = olds-news
117             self.delete_old_items (id1, to_delete)
118             to_create = news-olds
119             self.insert_new_items (id1, to_create)
120             self.api.db.commit()
121             
122     # classname: the type of objects we are talking about;       e.g. 'Slice'
123     # peer_object_list list of objects at a given peer -         e.g. peer.GetSlices()
124     # alien_xref_objs_dict : a dict {'classname':alien_obj_list} e.g. {'Node':peer.GetNodes()}
125     #    we need an entry for each class mentioned in the class's foreign_xrefs
126     # lambda_ignore : the alien objects are ignored if this returns true
127     def update_table (self,
128                       classname,
129                       alien_object_list,
130                       alien_xref_objs_dict = {},
131                       lambda_ignore=lambda x:False,
132                       report_name_conflicts = True):
133         
134         verbose ("============================== entering update_table on",classname)
135         peer_id=self.peer_id
136
137         attrs = class_attributes (classname)
138         row_class = attrs['row_class']
139         table_class = attrs['table_class']
140         primary_key = attrs['primary_key']
141         class_key = attrs['class_key']
142         foreign_fields = attrs['foreign_fields']
143         foreign_xrefs = attrs['foreign_xrefs']
144
145         ## allocate transcoders and xreftables once, for each item in foreign_xrefs
146         # create a dict 'classname' -> {'transcoder' : ..., 'xref_table' : ...}
147         xref_accessories = dict(
148             [ (xref['field'],
149                {'transcoder' : Cache.Transcoder (self.api,xref['class'],alien_xref_objs_dict[xref['class']]),
150                 'xref_table' : Cache.XrefTable (self.api,xref['table'],classname,xref['class'])})
151               for xref in foreign_xrefs ])
152
153         # the fields that are direct references, like e.g. site_id in Node
154         # determined lazily, we need an alien_object to do that, and we may have none here
155         direct_ref_fields = None
156
157         ### get current local table
158         # get ALL local objects so as to cope with
159         # (*) potential moves between plcs
160         # (*) or naming conflicts
161         local_objects = table_class (self.api)
162         ### index upon class_key for future searches
163         local_objects_index = local_objects.dict(class_key)
164
165         ### mark entries for this peer outofdate
166         new_count=0
167         old_count=0;
168         for local_object in local_objects:
169             if local_object['peer_id'] == peer_id:
170                 local_object.uptodate=False
171                 old_count += 1
172             else:
173                 local_object.uptodate=True
174
175         # scan the peer's local objects
176         for alien_object in alien_object_list:
177
178             object_name = alien_object[class_key]
179
180             ### ignore, e.g. system-wide slices
181             if lambda_ignore(alien_object):
182                 verbose('Ignoring',object_name)
183                 continue
184
185             verbose ('update_table (%s) - Considering'%classname,object_name)
186                 
187             # optimizing : avoid doing useless syncs
188             needs_sync = False
189
190             # create or update
191             try:
192                 ### We know about this object already
193                 local_object = local_objects_index[object_name]
194                 if local_object ['peer_id'] is None:
195                     if report_name_conflicts:
196                         ### xxx send e-mail
197                         print '!!!!!!!!!! We are in trouble here'
198                         print 'The %s object named %s is natively defined twice, '%(classname,object_name),
199                         print 'once on this PLC and once on peer %d'%peer_id
200                         print 'We dont raise an exception so that the remaining updates can still take place'
201                         print '!!!!!!!!!!'
202                     continue
203                 if local_object['peer_id'] != peer_id:
204                     ### the object has changed its plc, 
205                     ### Note, this is not problematic here because both definitions are remote
206                     ### we can assume the object just moved
207                     ### needs to update peer_id though
208                     local_object['peer_id'] = peer_id
209                     needs_sync = True
210                 # update all fields as per foreign_fields
211                 for field in foreign_fields:
212                     if (local_object[field] != alien_object [field]):
213                         local_object[field]=alien_object[field]
214                         needs_sync = True
215                 verbose ('update_table FOUND',object_name)
216             except:
217                 ### create a new entry
218                 local_object = row_class(self.api,
219                                           {class_key :object_name,'peer_id':peer_id})
220                 # insert in index
221                 local_objects_index[class_key]=local_object
222                 verbose ('update_table CREATED',object_name)
223                 # update all fields as per foreign_fields
224                 for field in foreign_fields:
225                     local_object[field]=alien_object[field]
226                 # this is tricky; at this point we may have primary_key unspecified,
227                 # but we need it for handling xrefs below, so we'd like to sync to get one
228                 # on the other hand some required fields may be still missing so
229                 #  the DB would refuse to sync in this case (e.g. site_id in Node)
230                 # so let's fill them with 1 so we can sync, this will be overridden below
231                 # lazily determine this set of fields now
232                 if direct_ref_fields is None:
233                     direct_ref_fields=[]
234                     for xref in foreign_xrefs:
235                         field=xref['field']
236                         #verbose('checking field %s for direct_ref'%field)
237                         if isinstance(alien_object[field],int):
238                             direct_ref_fields.append(field)
239                     verbose("FOUND DIRECT REFS",direct_ref_fields)
240                 for field in direct_ref_fields:
241                     local_object[field]=1
242                 verbose('Early sync on',local_object)
243                 local_object.sync()
244                 needs_sync = False
245
246             # this row is now valid
247             local_object.uptodate=True
248             new_count += 1
249
250             # manage cross-refs
251             for xref in foreign_xrefs:
252                 field=xref['field']
253                 alien_xref_obj_list = alien_xref_objs_dict[xref['class']]
254                 alien_value = alien_object[field]
255                 transcoder = xref_accessories[xref['field']]['transcoder']
256                 if isinstance (alien_value,list):
257                     #verbose ('update_table list-transcoding ',xref['class'],' aliens=',alien_value,)
258                     local_values=[]
259                     for a in alien_value:
260                         try:
261                             local_values.append(transcoder.transcode(a))
262                         except:
263                             # could not transcode - might be from another peer that we dont know about..
264                             pass
265                     #verbose (" transcoded as ",local_values)
266                     xref_table = xref_accessories[xref['field']]['xref_table']
267                     # newly created objects dont have xref fields set yet
268                     try:
269                         former_xrefs=local_object[xref['field']]
270                     except:
271                         former_xrefs=[]
272                     xref_table.update_item (local_object[primary_key],
273                                             former_xrefs,
274                                             local_values)
275                 elif isinstance (alien_value,int):
276                     #verbose ('update_table atom-transcoding ',xref['class'],' aliens=',alien_value,)
277                     new_value = transcoder.transcode(alien_value)
278                     if local_object[field] != new_value:
279                         local_object[field] = new_value
280                         needs_sync = True
281
282             ### this object is completely updated, let's save it
283             if needs_sync:
284                 verbose('FINAL sync on %s:'%object_name,local_object)
285                 local_object.sync(False)
286                     
287         ### delete entries that are not uptodate
288         for local_object in local_objects:
289             if not local_object.uptodate:
290                 local_object.delete()
291
292         self.api.db.commit()
293
294         ### return delta in number of objects 
295         return new_count-old_count
296
297     # slice attributes exhibit a special behaviour
298     # because there is no name we can use to retrieve/check for equality
299     # this object is like a 3-part xref, linking slice_attribute_type, slice,
300     #    and potentially node, together with a value that can change over time.
301     # extending the generic model to support a lambda rather than class_key
302     #    would clearly become overkill
303     def update_slice_attributes (self,
304                                  alien_slice_attributes,
305                                  alien_nodes,
306                                  alien_slices):
307
308         from PLC.SliceAttributeTypes import SliceAttributeTypes
309         from PLC.SliceAttributes import SliceAttribute, SliceAttributes
310
311         # init
312         peer_id = self.peer_id
313         
314         # create transcoders
315         node_xcoder = Cache.Transcoder (self.api, 'Node', alien_nodes)
316         slice_xcoder= Cache.Transcoder (self.api, 'Slice', alien_slices)
317         # no need to transcode SliceAttributeTypes, we have a name in the result
318         local_sat_dict = SliceAttributeTypes(self.api).dict('name')
319                
320         # load local objects
321         local_objects = SliceAttributes (self.api,{'peer_id':peer_id})
322
323         ### mark entries for this peer outofdate
324         new_count = 0
325         old_count=len(local_objects)
326         for local_object in local_objects:
327             local_object.uptodate=False
328
329         for alien_object in alien_slice_attributes:
330
331             verbose('----- update_slice_attributes: considering ...')
332             verbose('   ',alien_object)
333
334             # locate local slice
335             try:
336                 slice_id = slice_xcoder.transcode(alien_object['slice_id'])
337             except:
338                 verbose('update_slice_attributes: unable to locate slice',
339                         alien_object['slice_id'])
340                 continue
341             # locate slice_attribute_type
342             try:
343                 sat_id = local_sat_dict[alien_object['name']]['attribute_type_id']
344             except:
345                 verbose('update_slice_attributes: unable to locate slice attribute type',
346                         alien_object['name'])
347                 continue
348             # locate local node if specified
349             try:
350                 alien_node_id = alien_object['node_id']
351                 if alien_node_id is not None:
352                     node_id = node_xcoder.transcode(alien_node_id)
353                 else:
354                     node_id=None
355             except:
356                 verbose('update_slice_attributes: unable to locate node',
357                         alien_object['node_id'])
358                 continue
359
360             # locate the local SliceAttribute if any
361             try:
362                 verbose ('searching name=', alien_object['name'],
363                          'slice_id',slice_id, 'node_id',node_id)
364                 local_object = SliceAttributes (self.api,
365                                                 {'name':alien_object['name'],
366                                                  'slice_id':slice_id,
367                                                  'node_id':node_id})[0]
368                 
369                 if local_object['peer_id'] != peer_id:
370                     verbose ('FOUND local sa - skipped')
371                     continue
372                 verbose('FOUND already cached sa - setting value')
373                 local_object['value'] = alien_object['value']
374             # create it if missing
375             except:
376                 local_object = SliceAttribute(self.api,
377                                               {'peer_id':peer_id,
378                                                'slice_id':slice_id,
379                                                'node_id':node_id,
380                                                'attribute_type_id':sat_id,
381                                                'value':alien_object['value']})
382                 verbose('CREATED new sa')
383             local_object.uptodate=True
384             new_count += 1
385             local_object.sync(False)
386
387         for local_object in local_objects:
388             if not local_object.uptodate:
389                 local_object.delete()
390
391         self.api.db.commit()
392         ### return delta in number of objects 
393         return new_count-old_count
394
395     def refresh_peer (self):
396         
397         # so as to minimize the numer of requests
398         # we get all objects in a single call and sort afterwards
399         # xxx ideally get objects either local or the ones attached here
400         # requires to know remote peer's peer_id for ourselves, mmhh..
401         # does not make any difference in a 2-peer deployment though
402
403         ### uses GetPeerData to gather all info in a single xmlrpc request
404
405         timers={}
406         t_start=time.time()
407         # xxx see also GetPeerData - peer_id arg unused yet
408         all_data = self.peer_server.GetPeerData (self.auth,0)
409
410         t_acquired = time.time()
411         # refresh sites
412         plocal_sites = all_data['Sites-local']
413         all_sites = plocal_sites + all_data['Sites-peer']
414         nb_new_sites = self.update_table('Site', plocal_sites)
415
416         t0 = time.time()
417         timers['process-sites']=t0-t_acquired
418         
419
420         # refresh keys
421         plocal_keys = all_data['Keys-local']
422         all_keys = plocal_keys + all_data['Keys-peer']
423         nb_new_keys = self.update_table('Key', plocal_keys)
424
425         t=time.time()
426         timers['process-keys']=t-t0
427         t0=t
428
429         # refresh nodes
430         plocal_nodes = all_data['Nodes-local']
431         all_nodes = plocal_nodes + all_data['Nodes-peer']
432         nb_new_nodes = self.update_table('Node', plocal_nodes,
433                                          { 'Site' : all_sites } )
434
435         t=time.time()
436         timers['process-nodes']=t-t0
437         t0=t
438
439         # refresh persons
440         plocal_persons = all_data['Persons-local']
441         all_persons = plocal_persons + all_data['Persons-peer']
442         nb_new_persons = self.update_table ('Person', plocal_persons,
443                                             { 'Key': all_keys, 'Site' : all_sites } )
444
445         t=time.time()
446         timers['process-persons']=t-t0
447         t0=t
448
449         # refresh slice attribute types
450         plocal_slice_attribute_types = all_data ['SliceAttibuteTypes-local']
451         nb_new_slice_attribute_types = self.update_table ('SliceAttributeType',
452                                                           plocal_slice_attribute_types,
453                                                           report_name_conflicts = False)
454
455         t=time.time()
456         timers['process-sat']=t-t0
457         t0=t
458
459         # refresh slices
460         plocal_slices = all_data['Slices-local']
461         all_slices = plocal_slices + all_data['Slices-peer']
462
463         def is_system_slice (slice):
464             return slice['creator_person_id'] == 1
465
466         nb_new_slices = self.update_table ('Slice', plocal_slices,
467                                            {'Node': all_nodes,
468                                             'Person': all_persons,
469                                             'Site': all_sites},
470                                            is_system_slice)
471
472         t=time.time()
473         timers['process-slices']=t-t0
474         t0=t
475
476         # refresh slice attributes
477         plocal_slice_attributes = all_data ['SliceAttributes-local']
478         nb_new_slice_attributes = self.update_slice_attributes (plocal_slice_attributes,
479                                                                 all_nodes,
480                                                                 all_slices)
481         t=time.time()
482         timers['process-sa']=t-t0
483         t0=t
484         
485         t_end=time.time()
486
487         timers['time_gather']   = all_data['ellapsed']
488         timers['time_transmit'] = t_acquired-t_start-all_data['ellapsed']
489         timers['time_process']  = t_end-t_acquired
490         timers['time_all']      = t_end-t_start
491         
492         ### returned as-is by RefreshPeer
493         return {'plcname':self.api.config.PLC_NAME,
494                 'new_sites':nb_new_sites,
495                 'new_keys':nb_new_keys,
496                 'new_nodes':nb_new_nodes,
497                 'new_persons':nb_new_persons,
498                 'new_slice_attribute_types':nb_new_slice_attribute_types,
499                 'new_slices':nb_new_slices,
500                 'new_slice_attributes':nb_new_slice_attributes,
501                 'timers':timers,
502                 }
503