(*) Peer has new fields person_ids and site_ids
[plcapi.git] / PLC / Cache.py
1 import time
2
3 from PLC.Faults import *
4 from PLC.Parameter import Parameter
5 from PLC.Filter import Filter
6 from PLC.Table import Row, Table
7
8 verbose_flag=False;
9 #verbose_flag=True;
10 def verbose (*args):
11     if verbose_flag:
12         print (args)
13
14 def class_attributes (classname):
15     """ locates various attributes defined in the row class """
16     topmodule = __import__ ('PLC.%ss'%classname)
17     module = topmodule.__dict__['%ss'%classname]
18     # local row-like class, e.g. Node
19     row_class = module.__dict__['%s'%classname]
20     # local tab-like class, e.g. Nodes
21     table_class = module.__dict__['%ss'%classname]
22
23     return {'row_class':row_class, 
24             'table_class':table_class,
25             'primary_key': row_class.__dict__['primary_key'],
26             'class_key': row_class.__dict__['class_key'],
27             'foreign_fields': row_class.__dict__['foreign_fields'],
28             'foreign_xrefs': row_class.__dict__['foreign_xrefs'],
29             }
30
31 class Cache:
32
33     """
34     This class is the core of the RefreshPeer method's implementation,
35     that basically calls Cache:refresh_peer
36
37     It manages caching on a table-by-table basis, and performs required
38     transcoding from remote ids to local ids - that's the job of the
39     Transcoder class
40
41     For the tables (classes) that it handles, it uses the following
42     attributes
43     (*) primary_key is the name of the field where db indexes are stored
44     (*) class_key is used to implement equality on objects
45     (*) foreign_fields is the list of fields that are copied verbatim from
46         foreign objects
47     (*) foreign_xrefs is the list of columns that are subject to transcoding
48         (.) when obj[field] is an int, the transcoded value is directly
49         inserted in the table
50         (.) if it's a list, it is assumed that the relationship is maintained
51             in an external 2-column table. That's where the XrefTable class comes in
52
53     The relationship between slices, slice attribute types and slice attributes being
54     more complex, it is handled in a specifically customized version of update_table
55
56     Of course the order in which the tables are managed is important, with different
57     orders several iterations might be needed for the update process to converge.
58
59     Of course the timers field was introduced for optimization and could safely be removed
60     """
61     
62     def __init__ (self, api, peer_id, peer_server, auth):
63
64         self.api = api
65         self.peer_id = peer_id
66         self.peer_server = peer_server
67         self.auth = auth
68         
69     class Transcoder:
70
71         def __init__ (self, api, classname, alien_objects):
72             self.api = api
73             attrs = class_attributes (classname)
74             self.primary_key = attrs['primary_key']
75             self.class_key = attrs['class_key']
76
77             # cannot use dict, it's acquired by xmlrpc and is untyped
78             self.alien_objects_byid = dict( [ (x[self.primary_key],x) for x in alien_objects ] )
79
80             # retrieve local objects
81             local_objects = attrs['table_class'] (api)
82             self.local_objects_byname = local_objects.dict(self.class_key)
83
84             verbose ('Transcoder init :',classname,
85                      self.alien_objects_byid.keys(),
86                      self.local_objects_byname.keys())
87
88         def transcode (self, alien_id):
89             """ transforms an alien id into a local one """
90             # locate alien obj from alien_id
91             #verbose ('.entering transcode with alien_id',alien_id,)
92             alien_object=self.alien_objects_byid[alien_id]
93             #verbose ('..located alien_obj',)
94             name = alien_object [self.class_key]
95             #verbose ('...got name',name,)
96             local_object=self.local_objects_byname[name]
97             #verbose ('....found local obj')
98             local_id=local_object[self.primary_key]
99             #verbose ('.....and local_id',local_id)
100             return local_id
101             
102
103     # for handling simple n-to-n relation tables, like e.g. slice_node
104     class XrefTable: 
105
106         def __init__ (self, api, tablename, class1, class2):
107             self.api = api
108             self.tablename = tablename
109             self.lowerclass1 = class1.lower()
110             self.lowerclass2 = class2.lower()
111
112         def delete_old_items (self, id1, id2_set):
113             if id2_set:
114                 sql = ""
115                 sql += "DELETE FROM %s WHERE %s_id=%d"%(self.tablename,self.lowerclass1,id1)
116                 sql += " AND %s_id IN ("%self.lowerclass2
117                 sql += ",".join([str(i) for i in id2_set])
118                 sql += ")"
119                 self.api.db.do (sql)
120
121         def insert_new_items (self, id1, id2_set):
122             ### xxx needs to be optimized
123             ### tried to figure a way to use a single sql statement
124             ### like: insert into table (x,y) values (1,2),(3,4);
125             ### but apparently this is not supported under postgresql
126             for id2 in id2_set:
127                 sql = "INSERT INTO %s (%s_id,%s_id) VALUES (%d,%d)"% \
128                       (self.tablename,self.lowerclass1,self.lowerclass2,id1,id2)
129
130 # below is Tony's code but it's badly broken. I'm not sure we care much in fact.
131 #           if id2_set:
132 #               sql = "INSERT INTO %s select %d, %d " % \
133 #                       self.tablename, id1, id2[0] 
134 #               for id2 in id2_set[1:]:
135 #                       sql += " UNION ALL SELECT %d, %d " % \
136 #                       (id1,id2)
137
138                 self.api.db.do (sql)
139
140         def update_item (self, id1, old_id2s, new_id2s):
141             news = set (new_id2s)
142             olds = set (old_id2s)
143             to_delete = olds-news
144             self.delete_old_items (id1, to_delete)
145             to_create = news-olds
146             self.insert_new_items (id1, to_create)
147             self.api.db.commit()
148             
149     # classname: the type of objects we are talking about;       e.g. 'Slice'
150     # peer_object_list list of objects at a given peer -         e.g. peer.GetSlices()
151     # alien_xref_objs_dict : a dict {'classname':alien_obj_list} e.g. {'Node':peer.GetNodes()}
152     #    we need an entry for each class mentioned in the class's foreign_xrefs
153     def update_table (self,
154                       classname,
155                       alien_object_list,
156                       alien_xref_objs_dict = {},
157                       report_name_conflicts = True):
158         
159         verbose ("============================== entering update_table on",classname)
160         peer_id=self.peer_id
161
162         attrs = class_attributes (classname)
163         row_class = attrs['row_class']
164         table_class = attrs['table_class']
165         primary_key = attrs['primary_key']
166         class_key = attrs['class_key']
167         foreign_fields = attrs['foreign_fields']
168         foreign_xrefs = attrs['foreign_xrefs']
169
170         ## allocate transcoders and xreftables once, for each item in foreign_xrefs
171         # create a dict 'classname' -> {'transcoder' : ..., 'xref_table' : ...}
172         xref_accessories = dict(
173             [ (xref['field'],
174                {'transcoder' : Cache.Transcoder (self.api,xref['class'],alien_xref_objs_dict[xref['class']]),
175                 'xref_table' : Cache.XrefTable (self.api,xref['table'],classname,xref['class'])})
176               for xref in foreign_xrefs ])
177
178         # the fields that are direct references, like e.g. site_id in Node
179         # determined lazily, we need an alien_object to do that, and we may have none here
180         direct_ref_fields = None
181
182         ### get current local table
183         # get ALL local objects so as to cope with
184         # (*) potential moves between plcs
185         # (*) or naming conflicts
186         local_objects = table_class (self.api)
187         ### index upon class_key for future searches
188         local_objects_index = local_objects.dict(class_key)
189
190         ### mark entries for this peer outofdate
191         new_count=0
192         old_count=0;
193         for local_object in local_objects:
194             if local_object['peer_id'] == peer_id:
195                 local_object.uptodate=False
196                 old_count += 1
197             else:
198                 local_object.uptodate=True
199
200         for alien_object in alien_object_list:
201             verbose ('+++ Got alien object',alien_object)
202
203         # scan the peer's local objects
204         for alien_object in alien_object_list:
205
206             object_name = alien_object[class_key]
207
208             verbose ('----- update_table (%s) - considering'%classname,object_name)
209
210             # optimizing : avoid doing useless syncs
211             needs_sync = False
212
213             # create or update
214             try:
215                 ### We know about this object already
216                 local_object = local_objects_index[object_name]
217                 if local_object ['peer_id'] is None:
218                     if report_name_conflicts:
219                         ### xxx send e-mail
220                         print '!!!!!!!!!! We are in trouble here'
221                         print 'The %s object named %s is natively defined twice, '%(classname,object_name),
222                         print 'once on this PLC and once on peer %d'%peer_id
223                         print 'We dont raise an exception so that the remaining updates can still take place'
224                         print '!!!!!!!!!!'
225                     continue
226                 if local_object['peer_id'] != peer_id:
227                     ### the object has changed its plc, 
228                     ### Note, this is not problematic here because both definitions are remote
229                     ### we can assume the object just moved
230                     ### needs to update peer_id though
231                     local_object['peer_id'] = peer_id
232                     needs_sync = True
233                 # update all fields as per foreign_fields
234                 for field in foreign_fields:
235                     if (local_object[field] != alien_object [field]):
236                         local_object[field]=alien_object[field]
237                         needs_sync = True
238                 verbose ('update_table FOUND',object_name)
239             except:
240                 ### create a new entry
241                 local_object = row_class(self.api,
242                                           {class_key :object_name,'peer_id':peer_id})
243                 # insert in index
244                 local_objects_index[class_key]=local_object
245                 verbose ('update_table CREATED',object_name)
246                 # update all fields as per foreign_fields
247                 for field in foreign_fields:
248                     local_object[field]=alien_object[field]
249                 # this is tricky; at this point we may have primary_key unspecified,
250                 # but we need it for handling xrefs below, so we'd like to sync to get one
251                 # on the other hand some required fields may be still missing so
252                 #  the DB would refuse to sync in this case (e.g. site_id in Node)
253                 # so let's fill them with 1 so we can sync, this will be overridden below
254                 # lazily determine this set of fields now
255                 if direct_ref_fields is None:
256                     direct_ref_fields=[]
257                     for xref in foreign_xrefs:
258                         field=xref['field']
259                         #verbose('checking field %s for direct_ref'%field)
260                         if isinstance(alien_object[field],int):
261                             direct_ref_fields.append(field)
262                     verbose("FOUND DIRECT REFS",direct_ref_fields)
263                 for field in direct_ref_fields:
264                     local_object[field]=1
265                 verbose('Early sync on',local_object)
266                 local_object.sync()
267                 needs_sync = False
268
269             # this row is now valid
270             local_object.uptodate=True
271             new_count += 1
272
273             # manage cross-refs
274             for xref in foreign_xrefs:
275                 field=xref['field']
276                 alien_xref_obj_list = alien_xref_objs_dict[xref['class']]
277                 alien_value = alien_object[field]
278                 transcoder = xref_accessories[xref['field']]['transcoder']
279                 if isinstance (alien_value,list):
280                     #verbose ('update_table list-transcoding ',xref['class'],' aliens=',alien_value,)
281                     local_values=[]
282                     for a in alien_value:
283                         try:
284                             local_values.append(transcoder.transcode(a))
285                         except:
286                             # could not transcode - might be from another peer that we dont know about..
287                             pass
288                     #verbose (" transcoded as ",local_values)
289                     xref_table = xref_accessories[xref['field']]['xref_table']
290                     # newly created objects dont have xref fields set yet
291                     try:
292                         former_xrefs=local_object[xref['field']]
293                     except:
294                         former_xrefs=[]
295                     xref_table.update_item (local_object[primary_key],
296                                             former_xrefs,
297                                             local_values)
298                 elif isinstance (alien_value,int):
299                     #verbose ('update_table atom-transcoding ',xref['class'],' aliens=',alien_value,)
300                     new_value = transcoder.transcode(alien_value)
301                     if local_object[field] != new_value:
302                         local_object[field] = new_value
303                         needs_sync = True
304
305             ### this object is completely updated, let's save it
306             if needs_sync:
307                 verbose('FINAL sync on %s:'%object_name,local_object)
308                 local_object.sync(False)
309                     
310         ### delete entries that are not uptodate
311         for local_object in local_objects:
312             if not local_object.uptodate:
313                 local_object.delete()
314
315         self.api.db.commit()
316
317         ### return delta in number of objects 
318         return new_count-old_count
319
320     # slice attributes exhibit a special behaviour
321     # because there is no name we can use to retrieve/check for equality
322     # this object is like a 3-part xref, linking slice_attribute_type, slice,
323     #    and potentially node, together with a value that can change over time.
324     # extending the generic model to support a lambda rather than class_key
325     #    would clearly become overkill
326     def update_slice_attributes (self,
327                                  alien_slice_attributes,
328                                  alien_nodes,
329                                  alien_slices):
330
331         from PLC.SliceAttributeTypes import SliceAttributeTypes
332         from PLC.SliceAttributes import SliceAttribute, SliceAttributes
333
334         verbose ("============================== entering update_slice_attributes")
335
336         # init
337         peer_id = self.peer_id
338         
339         # create transcoders
340         node_xcoder = Cache.Transcoder (self.api, 'Node', alien_nodes)
341         slice_xcoder= Cache.Transcoder (self.api, 'Slice', alien_slices)
342         # no need to transcode SliceAttributeTypes, we have a name in the result
343         local_sat_dict = SliceAttributeTypes(self.api).dict('name')
344                
345         # load local objects
346         local_objects = SliceAttributes (self.api,{'peer_id':peer_id})
347
348         ### mark entries for this peer outofdate
349         new_count = 0
350         old_count=len(local_objects)
351         for local_object in local_objects:
352             local_object.uptodate=False
353
354         for alien_object in alien_slice_attributes:
355
356             verbose('----- update_slice_attributes: considering ...')
357             verbose('   ',alien_object)
358
359             # locate local slice
360             try:
361                 slice_id = slice_xcoder.transcode(alien_object['slice_id'])
362             except:
363                 verbose('update_slice_attributes: unable to locate slice',
364                         alien_object['slice_id'])
365                 continue
366             # locate slice_attribute_type
367             try:
368                 sat_id = local_sat_dict[alien_object['name']]['attribute_type_id']
369             except:
370                 verbose('update_slice_attributes: unable to locate slice attribute type',
371                         alien_object['name'])
372                 continue
373             # locate local node if specified
374             try:
375                 alien_node_id = alien_object['node_id']
376                 if alien_node_id is not None:
377                     node_id = node_xcoder.transcode(alien_node_id)
378                 else:
379                     node_id=None
380             except:
381                 verbose('update_slice_attributes: unable to locate node',
382                         alien_object['node_id'])
383                 continue
384
385             # locate the local SliceAttribute if any
386             try:
387                 verbose ('searching name=', alien_object['name'],
388                          'slice_id',slice_id, 'node_id',node_id)
389                 local_object = SliceAttributes (self.api,
390                                                 {'name':alien_object['name'],
391                                                  'slice_id':slice_id,
392                                                  'node_id':node_id})[0]
393                 
394                 if local_object['peer_id'] != peer_id:
395                     verbose ('FOUND local sa - skipped')
396                     continue
397                 verbose('FOUND already cached sa - setting value')
398                 local_object['value'] = alien_object['value']
399             # create it if missing
400             except:
401                 local_object = SliceAttribute(self.api,
402                                               {'peer_id':peer_id,
403                                                'slice_id':slice_id,
404                                                'node_id':node_id,
405                                                'attribute_type_id':sat_id,
406                                                'value':alien_object['value']})
407                 verbose('CREATED new sa')
408             local_object.uptodate=True
409             new_count += 1
410             local_object.sync(False)
411
412         for local_object in local_objects:
413             if not local_object.uptodate:
414                 local_object.delete()
415
416         self.api.db.commit()
417         ### return delta in number of objects 
418         return new_count-old_count
419
420     def refresh_peer (self):
421         
422         # so as to minimize the numer of requests
423         # we get all objects in a single call and sort afterwards
424         # xxx ideally get objects either local or the ones attached here
425         # requires to know remote peer's peer_id for ourselves, mmhh..
426         # does not make any difference in a 2-peer deployment though
427
428         ### uses GetPeerData to gather all info in a single xmlrpc request
429
430         timers={}
431         t_start=time.time()
432         # xxx see also GetPeerData - peer_id arg unused yet
433         all_data = self.peer_server.GetPeerData (self.auth,self.api.config.PLC_NAME)
434
435         verbose ('Passed GetPeerData the name',self.api.config.PLC_NAME)
436         sks=all_data.keys()
437         sks.sort()
438         for k in sks:
439             f=all_data[k]
440             try:
441                 verbose ('GetPeerData[%s] -> %d'%(k,len(f)))
442             except:
443                 pass
444
445         t_acquired = time.time()
446         # refresh sites
447         plocal_sites = all_data['Sites-local']
448         all_sites = plocal_sites + all_data['Sites-peer']
449         nb_new_sites = self.update_table('Site', plocal_sites)
450
451         t0 = time.time()
452         timers['process-sites']=t0-t_acquired
453         
454
455         # refresh keys
456         plocal_keys = all_data['Keys-local']
457         all_keys = plocal_keys + all_data['Keys-peer']
458         nb_new_keys = self.update_table('Key', plocal_keys)
459
460         t=time.time()
461         timers['process-keys']=t-t0
462         t0=t
463
464         # refresh nodes
465         plocal_nodes = all_data['Nodes-local']
466         all_nodes = plocal_nodes + all_data['Nodes-peer']
467         nb_new_nodes = self.update_table('Node', plocal_nodes,
468                                          { 'Site' : all_sites } )
469
470         t=time.time()
471         timers['process-nodes']=t-t0
472         t0=t
473
474         # refresh persons
475         plocal_persons = all_data['Persons-local']
476         all_persons = plocal_persons + all_data['Persons-peer']
477         nb_new_persons = self.update_table ('Person', plocal_persons,
478                                             { 'Key': all_keys, 'Site' : all_sites } )
479
480         t=time.time()
481         timers['process-persons']=t-t0
482         t0=t
483
484         # refresh slice attribute types
485         plocal_slice_attribute_types = all_data ['SliceAttibuteTypes-local']
486         nb_new_slice_attribute_types = self.update_table ('SliceAttributeType',
487                                                           plocal_slice_attribute_types,
488                                                           report_name_conflicts = False)
489
490         t=time.time()
491         timers['process-sat']=t-t0
492         t0=t
493
494         # refresh slices
495         plocal_slices = all_data['Slices-local']
496         all_slices = plocal_slices + all_data['Slices-peer']
497
498         # forget about ignoring remote system slices
499         # just update them too, we'll be able to filter them out later in GetSlivers
500
501         nb_new_slices = self.update_table ('Slice', plocal_slices,
502                                            {'Node': all_nodes,
503                                             'Person': all_persons,
504                                             'Site': all_sites})
505
506         t=time.time()
507         timers['process-slices']=t-t0
508         t0=t
509
510         # refresh slice attributes
511         plocal_slice_attributes = all_data ['SliceAttributes-local']
512         nb_new_slice_attributes = self.update_slice_attributes (plocal_slice_attributes,
513                                                                 all_nodes,
514                                                                 all_slices)
515         t=time.time()
516         timers['process-sa']=t-t0
517         t0=t
518         
519         t_end=time.time()
520
521         timers['time_gather']   = all_data['ellapsed']
522         timers['time_transmit'] = t_acquired-t_start-all_data['ellapsed']
523         timers['time_process']  = t_end-t_acquired
524         timers['time_all']      = t_end-t_start
525         
526         ### returned as-is by RefreshPeer
527         return {
528                 'new_sites':nb_new_sites,
529                 'new_keys':nb_new_keys,
530                 'new_nodes':nb_new_nodes,
531                 'new_persons':nb_new_persons,
532                 'new_slice_attribute_types':nb_new_slice_attribute_types,
533                 'new_slices':nb_new_slices,
534                 'new_slice_attributes':nb_new_slice_attributes,
535                 'timers':timers,
536                 }
537