- Change .py files to use 4-space indents and no hard tab characters.
[plcapi.git] / PLC / Methods / RefreshPeer.py
1 #
2 # Thierry Parmentelat - INRIA
3 #
4 # $Id$
5 # $URL$
6
7 import os
8 import sys
9 import fcntl
10 import time
11
12 from PLC.Debug import log
13 from PLC.Faults import *
14 from PLC.Method import Method
15 from PLC.Parameter import Parameter, Mixed
16 from PLC.Auth import Auth
17
18 from PLC.Peers import Peer, Peers
19 from PLC.Sites import Site, Sites
20 from PLC.Persons import Person, Persons
21 from PLC.KeyTypes import KeyType, KeyTypes
22 from PLC.Keys import Key, Keys
23 from PLC.BootStates import BootState, BootStates
24 from PLC.Nodes import Node, Nodes
25 from PLC.SliceInstantiations import SliceInstantiations
26 from PLC.Slices import Slice, Slices
27
28 verbose=False
29
30 # initial version was doing only one final commit
31 # * set commit_mode to False to get that behaviour
32 # * set comit_mode to True to get everything synced at once
33 commit_mode=True
34
35 def message (to_print=None,verbose_only=False):
36     if verbose_only and not verbose:
37         return
38     print >> log, time.strftime("%m-%d-%H-%M-%S:"),
39     if to_print:
40         print >>log, to_print
41
42 def message_verbose(to_print=None):
43     message(to_print,verbose_only=True)
44
45
46 class FileLock:
47     """
48     Lock/Unlock file
49     """
50     def __init__(self, file_path, expire = 60 * 60 * 2):
51         self.expire = expire
52         self.fpath = file_path
53         self.fd = None
54
55     def lock(self):
56         if os.path.exists(self.fpath):
57             if (time.time() - os.stat(self.fpath).st_ctime) > self.expire:
58                 try:
59                     os.unlink(self.fpath)
60                 except Exception, e:
61                     message('FileLock.lock(%s) : %s' % (self.fpath, e))
62                     return False
63         try:
64             self.fd = open(self.fpath, 'w')
65             fcntl.flock(self.fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
66         except IOError, e:
67             message('FileLock.lock(%s) : %s' % (self.fpath, e))
68             return False
69         return True
70
71     def unlock(self):
72         try:
73             fcntl.flock(self.fd, fcntl.LOCK_UN | fcntl.LOCK_NB)
74             self.fd.close()
75         except IOError, e:
76             message('FileLock.unlock(%s) : %s' % (self.fpath, e))
77
78
79 class RefreshPeer(Method):
80     """
81     Fetches site, node, slice, person and key data from the specified peer
82     and caches it locally; also deletes stale entries.
83     Upon successful completion, returns a dict reporting various timers.
84     Faults otherwise.
85     """
86
87     roles = ['admin']
88
89     accepts = [
90         Auth(),
91         Mixed(Peer.fields['peer_id'],
92               Peer.fields['peername']),
93         ]
94
95     returns = Parameter(int, "1 if successful")
96
97     def call(self, auth, peer_id_or_peername):
98         ret_val = None
99         peername = Peers(self.api, [peer_id_or_peername], ['peername'])[0]['peername']
100         file_lock = FileLock("/tmp/refresh-peer-%s.lock" % peername)
101         if not file_lock.lock():
102             raise Exception, "Another instance of RefreshPeer is running."
103         try:
104             ret_val = self.real_call(auth, peer_id_or_peername)
105         except Exception, e:
106             file_lock.unlock()
107             raise Exception, e
108         file_lock.unlock()
109         return ret_val
110
111
112     def real_call(self, auth, peer_id_or_peername):
113         # Get peer
114         peers = Peers(self.api, [peer_id_or_peername])
115         if not peers:
116             raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
117         peer = peers[0]
118         peer_id = peer['peer_id']
119
120         # Connect to peer API
121         peer.connect()
122
123         timers = {}
124
125         # Get peer data
126         start = time.time()
127         message('RefreshPeer starting up (commit_mode=%r)'%commit_mode)
128         message('Issuing GetPeerData')
129         peer_tables = peer.GetPeerData()
130         # for smooth federation with 4.2 - ignore fields that are useless anyway, and rewrite boot_state
131         boot_state_rewrite={'dbg':'safeboot','diag':'safeboot','disable':'disabled',
132                             'inst':'reinstall','rins':'reinstall','new':'reinstall','rcnf':'reinstall'}
133         for node in peer_tables['Nodes']:
134             for key in ['nodenetwork_ids','dummybox_id']:
135                 if key in node:
136                     del node[key]
137             if node['boot_state'] in boot_state_rewrite: node['boot_state']=boot_state_rewrite[node['boot_state']]
138         for slice in peer_tables['Slices']:
139             for key in ['slice_attribute_ids']:
140                 if key in slice:
141                     del slice[key]
142         timers['transport'] = time.time() - start - peer_tables['db_time']
143         timers['peer_db'] = peer_tables['db_time']
144         message_verbose('GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport']))
145
146         def sync(objects, peer_objects, classobj):
147             """
148             Synchronizes two dictionaries of objects. objects should
149             be a dictionary of local objects keyed on their foreign
150             identifiers. peer_objects should be a dictionary of
151             foreign objects keyed on their local (i.e., foreign to us)
152             identifiers. Returns a final dictionary of local objects
153             keyed on their foreign identifiers.
154             """
155
156             classname=classobj(self.api).__class__.__name__
157             message_verbose('Entering sync on %s'%classname)
158
159             synced = {}
160
161             # Delete stale objects
162             for peer_object_id, object in objects.iteritems():
163                 if peer_object_id not in peer_objects:
164                     object.delete(commit = commit_mode)
165                     message("%s %s %s deleted"%(peer['peername'],classname, object[object.primary_key]))
166
167             total = len(peer_objects)
168             count=1
169             # set this to something realistic to trace down a given object(s)
170             trace_type="Node"
171             trace_ids=[]
172             def trace (message):
173                 if classname == trace_type and peer_object_id in trace_ids:
174                     message_verbose('TRACE>>'+message)
175
176             # Add/update new/existing objects
177             for peer_object_id, peer_object in peer_objects.iteritems():
178                 message_verbose ('DBG %s peer_object_id=%d (%d/%d)'%(classname,peer_object_id,count,total))
179                 count += 1
180                 if peer_object_id in synced:
181                     message("Warning: %s Skipping already added %s: %r"%(
182                             peer['peername'], classname, peer_object))
183                     continue
184                 if classname == 'Node':
185                     message_verbose ('DBG>> hostname=%s'%peer_object['hostname'])
186                 elif classname == "Slice":
187                     message_verbose ('DBG>> slicename=%s'%peer_object['name'])
188                 if peer_object_id in objects:
189                     # Update existing object
190                     object = objects[peer_object_id]
191
192                     # Replace foreign identifier with existing local
193                     # identifier temporarily for the purposes of
194                     # comparison.
195                     peer_object[object.primary_key] = object[object.primary_key]
196
197                     # Must use __eq__() instead of == since
198                     # peer_object may be a raw dict instead of a Peer
199                     # object.
200                     trace ("in objects : comparing")
201                     if not object.__eq__(peer_object):
202                         # Only update intrinsic fields
203                         trace ("updating")
204                         object.update(object.db_fields(peer_object))
205                         trace ("updated")
206                         sync = True
207                         dbg = "changed"
208                     else:
209                         trace ("intact")
210                         sync = False
211                         dbg = None
212
213                     # Restore foreign identifier
214                     peer_object[object.primary_key] = peer_object_id
215                 else:
216                     trace ("not in objects -- creating")
217                     # Add new object
218                     object = classobj(self.api, peer_object)
219                     trace ("created")
220                     # Replace foreign identifier with new local identifier
221                     del object[object.primary_key]
222                     trace ("forced clean id")
223                     sync = True
224                     dbg = "added"
225
226                 if sync:
227                     message_verbose("DBG>> syncing %s %d - commit_mode=%r"%(classname,peer_object_id,commit_mode))
228                     try:
229                         object.sync(commit = commit_mode)
230                     except PLCInvalidArgument, err:
231                         # Skip if validation fails
232                         # XXX Log an event instead of printing to logfile
233                         message("Warning: %s Skipping invalid %s %r : %r"%(\
234                                 peer['peername'], classname, peer_object, err))
235                         continue
236
237                 synced[peer_object_id] = object
238
239                 if dbg:
240                     message("%s: %s %d %s"%(peer['peername'], classname, object[object.primary_key], dbg))
241
242             message_verbose("Exiting sync on %s"%classname)
243
244             return synced
245
246         #
247         # Synchronize foreign sites
248         #
249
250         start = time.time()
251
252         message('Dealing with Sites')
253
254         # Compare only the columns returned by the GetPeerData() call
255         if peer_tables['Sites']:
256             columns = peer_tables['Sites'][0].keys()
257         else:
258             columns = None
259
260         # Keyed on foreign site_id
261         old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
262         sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
263
264         # Synchronize new set (still keyed on foreign site_id)
265         peer_sites = sync(old_peer_sites, sites_at_peer, Site)
266
267         for peer_site_id, site in peer_sites.iteritems():
268             # Bind any newly cached sites to peer
269             if peer_site_id not in old_peer_sites:
270                 peer.add_site(site, peer_site_id, commit = commit_mode)
271                 site['peer_id'] = peer_id
272                 site['peer_site_id'] = peer_site_id
273
274         timers['site'] = time.time() - start
275
276         #
277         # XXX Synchronize foreign key types
278         #
279
280         message('Dealing with Keys')
281
282         key_types = KeyTypes(self.api).dict()
283
284         #
285         # Synchronize foreign keys
286         #
287
288         start = time.time()
289
290         # Compare only the columns returned by the GetPeerData() call
291         if peer_tables['Keys']:
292             columns = peer_tables['Keys'][0].keys()
293         else:
294             columns = None
295
296         # Keyed on foreign key_id
297         old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
298         keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
299
300         # Fix up key_type references
301         for peer_key_id, key in keys_at_peer.items():
302             if key['key_type'] not in key_types:
303                 # XXX Log an event instead of printing to logfile
304                 message("Warning: Skipping invalid %s key %r" % ( peer['peername'], key))
305                 del keys_at_peer[peer_key_id]
306                 continue
307
308         # Synchronize new set (still keyed on foreign key_id)
309         peer_keys = sync(old_peer_keys, keys_at_peer, Key)
310         for peer_key_id, key in peer_keys.iteritems():
311             # Bind any newly cached keys to peer
312             if peer_key_id not in old_peer_keys:
313                 peer.add_key(key, peer_key_id, commit = commit_mode)
314                 key['peer_id'] = peer_id
315                 key['peer_key_id'] = peer_key_id
316
317         timers['keys'] = time.time() - start
318
319         #
320         # Synchronize foreign users
321         #
322
323         start = time.time()
324
325         message('Dealing with Persons')
326
327         # Compare only the columns returned by the GetPeerData() call
328         if peer_tables['Persons']:
329             columns = peer_tables['Persons'][0].keys()
330         else:
331             columns = None
332
333         # Keyed on foreign person_id
334         old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
335
336         # artificially attach the persons returned by GetPeerData to the new peer
337         # this is because validate_email needs peer_id to be correct when checking for duplicates
338         for person in peer_tables['Persons']:
339             person['peer_id']=peer_id
340         persons_at_peer = dict([(peer_person['person_id'], peer_person) \
341                                 for peer_person in peer_tables['Persons']])
342
343         # XXX Do we care about membership in foreign site(s)?
344
345         # Synchronize new set (still keyed on foreign person_id)
346         peer_persons = sync(old_peer_persons, persons_at_peer, Person)
347
348         # transcoder : retrieve a local key_id from a peer_key_id
349         key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
350                                   for peer_key_id,key in peer_keys.iteritems()])
351
352         for peer_person_id, person in peer_persons.iteritems():
353             # Bind any newly cached users to peer
354             if peer_person_id not in old_peer_persons:
355                 peer.add_person(person, peer_person_id, commit = commit_mode)
356                 person['peer_id'] = peer_id
357                 person['peer_person_id'] = peer_person_id
358                 person['key_ids'] = []
359
360             # User as viewed by peer
361             peer_person = persons_at_peer[peer_person_id]
362
363             # Foreign keys currently belonging to the user
364             old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
365                                   if key_transcoder[key_id] in peer_keys]
366
367             # Foreign keys that should belong to the user
368             # this is basically peer_person['key_ids'], we just check it makes sense
369             # (e.g. we might have failed importing it)
370             person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
371
372             # Remove stale keys from user
373             for key_id in (set(old_person_key_ids) - set(person_key_ids)):
374                 person.remove_key(peer_keys[key_id], commit = commit_mode)
375                 message ("%s Key %d removed from person %s"%(peer['peername'], key_id, person['email']))
376
377             # Add new keys to user
378             for key_id in (set(person_key_ids) - set(old_person_key_ids)):
379                 person.add_key(peer_keys[key_id], commit = commit_mode)
380                 message ("%s Key %d added into person %s"%(peer['peername'],key_id, person['email']))
381
382         timers['persons'] = time.time() - start
383
384         #
385         # XXX Synchronize foreign boot states
386         #
387
388         boot_states = BootStates(self.api).dict()
389
390         #
391         # Synchronize foreign nodes
392         #
393
394         start = time.time()
395
396         message('Dealing with Nodes (1)')
397
398         # Compare only the columns returned by the GetPeerData() call
399         if peer_tables['Nodes']:
400             columns = peer_tables['Nodes'][0].keys()
401         else:
402             # smooth federation with a 4.2 peer - ignore these fields that are useless anyway
403             columns = Node.fields
404             if 'interface_ids' in columns: columns.remove('interface_ids')
405             if 'dummybox_id' in columns: columns.remove('dummybox_id')
406
407         # Keyed on foreign node_id
408         old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
409         nodes_at_peer = dict([(node['node_id'], node) \
410                               for node in peer_tables['Nodes']])
411
412         # Fix up site_id and boot_states references
413         for peer_node_id, node in nodes_at_peer.items():
414             errors = []
415             if node['site_id'] not in peer_sites:
416                 errors.append("invalid site %d" % node['site_id'])
417             if node['boot_state'] not in boot_states:
418                 errors.append("invalid boot state %s" % node['boot_state'])
419             if errors:
420                 # XXX Log an event instead of printing to logfile
421                 message ("Warning: Skipping invalid %s node %r : " % (peer['peername'], node)\
422                              + ", ".join(errors))
423                 del nodes_at_peer[peer_node_id]
424                 continue
425             else:
426                 node['site_id'] = peer_sites[node['site_id']]['site_id']
427
428         # Synchronize new set
429         peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
430
431         for peer_node_id, node in peer_nodes.iteritems():
432             # Bind any newly cached foreign nodes to peer
433             if peer_node_id not in old_peer_nodes:
434                 peer.add_node(node, peer_node_id, commit = commit_mode)
435                 node['peer_id'] = peer_id
436                 node['peer_node_id'] = peer_node_id
437
438         timers['nodes'] = time.time() - start
439
440         #
441         # Synchronize local nodes
442         #
443
444         start = time.time()
445         message('Dealing with Nodes (2)')
446
447         # Keyed on local node_id
448         local_nodes = Nodes(self.api).dict()
449
450         for node in peer_tables['PeerNodes']:
451             # Foreign identifier for our node as maintained by peer
452             peer_node_id = node['node_id']
453             # Local identifier for our node as cached by peer
454             node_id = node['peer_node_id']
455             if node_id in local_nodes:
456                 # Still a valid local node, add it to the synchronized
457                 # set of local node objects keyed on foreign node_id.
458                 peer_nodes[peer_node_id] = local_nodes[node_id]
459
460         timers['local_nodes'] = time.time() - start
461
462         #
463         # XXX Synchronize foreign slice instantiation states
464         #
465
466         slice_instantiations = SliceInstantiations(self.api).dict()
467
468         #
469         # Synchronize foreign slices
470         #
471
472         start = time.time()
473
474         message('Dealing with Slices (1)')
475
476         # Compare only the columns returned by the GetPeerData() call
477         if peer_tables['Slices']:
478             columns = peer_tables['Slices'][0].keys()
479         else:
480             columns = None
481
482         # Keyed on foreign slice_id
483         old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
484         slices_at_peer = dict([(slice['slice_id'], slice) \
485                                for slice in peer_tables['Slices']])
486
487         # Fix up site_id, instantiation, and creator_person_id references
488         for peer_slice_id, slice in slices_at_peer.items():
489             errors = []
490             if slice['site_id'] not in peer_sites:
491                 errors.append("invalid site %d" % slice['site_id'])
492             if slice['instantiation'] not in slice_instantiations:
493                 errors.append("invalid instantiation %s" % slice['instantiation'])
494             if slice['creator_person_id'] not in peer_persons:
495                 # Just NULL it out
496                 slice['creator_person_id'] = None
497             else:
498                 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
499             if errors:
500                 message("Warning: Skipping invalid %s slice %r : " % (peer['peername'], slice) \
501                             + ", ".join(errors))
502                 del slices_at_peer[peer_slice_id]
503                 continue
504             else:
505                 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
506
507         # Synchronize new set
508         peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
509
510         message('Dealing with Slices (2)')
511         # transcoder : retrieve a local node_id from a peer_node_id
512         node_transcoder = dict ( [ (node['node_id'],peer_node_id) \
513                                    for peer_node_id,node in peer_nodes.iteritems()])
514         person_transcoder = dict ( [ (person['person_id'],peer_person_id) \
515                                      for peer_person_id,person in peer_persons.iteritems()])
516
517         for peer_slice_id, slice in peer_slices.iteritems():
518             # Bind any newly cached foreign slices to peer
519             if peer_slice_id not in old_peer_slices:
520                 peer.add_slice(slice, peer_slice_id, commit = commit_mode)
521                 slice['peer_id'] = peer_id
522                 slice['peer_slice_id'] = peer_slice_id
523                 slice['node_ids'] = []
524                 slice['person_ids'] = []
525
526             # Slice as viewed by peer
527             peer_slice = slices_at_peer[peer_slice_id]
528
529             # Nodes that are currently part of the slice
530             old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
531                                    if node_id in node_transcoder and node_transcoder[node_id] in peer_nodes]
532
533             # Nodes that should be part of the slice
534             slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
535
536             # Remove stale nodes from slice
537             for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
538                 slice.remove_node(peer_nodes[node_id], commit = commit_mode)
539                 message ("%s node %s removed from slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
540
541             # Add new nodes to slice
542             for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
543                 slice.add_node(peer_nodes[node_id], commit = commit_mode)
544                 message ("%s node %s added into slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
545
546             # N.B.: Local nodes that may have been added to the slice
547             # by hand, are removed. In other words, don't do this.
548
549             # Foreign users that are currently part of the slice
550             #old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
551             #                if person_transcoder[person_id] in peer_persons]
552             # An issue occurred with a user who registered on both sites (same email)
553             # So the remote person could not get cached locally
554             # The one-line map/filter style is nicer but ineffective here
555             old_slice_person_ids = []
556             for person_id in slice['person_ids']:
557                 if not person_transcoder.has_key(person_id):
558                     message ('WARNING : person_id %d in %s not transcodable (1) - skipped'%(person_id,slice['name']))
559                 elif person_transcoder[person_id] not in peer_persons:
560                     message('WARNING : person_id %d in %s not transcodable (2) - skipped'%(person_id,slice['name']))
561                 else:
562                     old_slice_person_ids += [person_transcoder[person_id]]
563
564             # Foreign users that should be part of the slice
565             slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
566
567             # Remove stale users from slice
568             for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
569                 slice.remove_person(peer_persons[person_id], commit = commit_mode)
570                 message ("%s user %s removed from slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
571
572             # Add new users to slice
573             for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
574                 slice.add_person(peer_persons[person_id], commit = commit_mode)
575                 message ("%s user %s added into slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
576
577             # N.B.: Local users that may have been added to the slice
578             # by hand, are not touched.
579
580         timers['slices'] = time.time() - start
581
582         # Update peer itself and commit
583         peer.sync(commit = True)
584
585         return timers