implement file lock based on peername to prevent running two RefreshPeer at a time.
[plcapi.git] / PLC / Methods / RefreshPeer.py
1 #
2 # Thierry Parmentelat - INRIA
3
4 # $Id$
5
6 import os
7 import sys
8 import fcntl
9 import time
10
11 from PLC.Debug import log
12 from PLC.Faults import *
13 from PLC.Method import Method
14 from PLC.Parameter import Parameter, Mixed
15 from PLC.Auth import Auth
16
17 from PLC.Peers import Peer, Peers
18 from PLC.Sites import Site, Sites
19 from PLC.Persons import Person, Persons
20 from PLC.KeyTypes import KeyType, KeyTypes
21 from PLC.Keys import Key, Keys
22 from PLC.BootStates import BootState, BootStates
23 from PLC.Nodes import Node, Nodes
24 from PLC.SliceInstantiations import SliceInstantiations
25 from PLC.Slices import Slice, Slices
26
27 verbose=False
28
29 # initial version was doing only one final commit
30 # * set commit_mode to False to get that behaviour
31 # * set comit_mode to True to get everything synced at once
32 commit_mode=True
33
34 def message (to_print=None,verbose_only=False):
35     if verbose_only and not verbose:
36         return
37     print >> log, time.strftime("%m-%d-%H-%M-%S:"),
38     if to_print:
39         print >>log, to_print
40
41 def message_verbose(to_print=None):
42     message(to_print,verbose_only=True)
43
44
45 class FileLock:
46     """
47     Lock/Unlock file
48     """
49     def __init__(self, file_path, expire = 60 * 60 * 2):
50         self.expire = expire
51         self.fpath = file_path
52         self.fd = None
53
54     def lock(self):
55         if os.path.exists(self.fpath):
56             if (time.time() - os.stat(self.fpath).st_ctime) > self.expire:
57                 try:
58                     os.unlink(self.fpath)
59                 except Exception, e:
60                     message('FileLock.lock(%s) : %s' % (self.fpath, e))
61                     return False
62         try:
63             self.fd = open(self.fpath, 'w')
64             fcntl.flock(self.fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
65         except IOError, e:
66             message('FileLock.lock(%s) : %s' % (self.fpath, e))
67             return False
68         return True
69
70     def unlock(self):
71         try:
72             fcntl.flock(self.fd, fcntl.LOCK_UN | fcntl.LOCK_NB)
73             self.fd.close()
74         except IOError, e:
75             message('FileLock.unlock(%s) : %s' % (self.fpath, e))
76
77
78 class RefreshPeer(Method):
79     """
80     Fetches site, node, slice, person and key data from the specified peer
81     and caches it locally; also deletes stale entries.
82     Upon successful completion, returns a dict reporting various timers.
83     Faults otherwise.
84     """
85
86     roles = ['admin']
87
88     accepts = [
89         Auth(),
90         Mixed(Peer.fields['peer_id'],
91               Peer.fields['peername']),
92         ]
93
94     returns = Parameter(int, "1 if successful")
95
96     def call(self, auth, peer_id_or_peername):
97         
98         peername = Peers(self.api, [peer_id_or_peername], ['peername'])[0]['peername']
99         file_lock = FileLock("/tmp/refresh-peer-%s.lock" % peername)
100         if not file_lock.lock():
101             raise Exception, "Another instance of RefreshPeer is running."
102         try:
103             self.real_call(auth, peer_id_or_peername)
104         except Exception, e:
105             file_lock.unlock()
106             raise Exception, e
107         file_lock.unlock()
108
109         
110     def real_call(self, auth, peer_id_or_peername):
111         # Get peer
112         peers = Peers(self.api, [peer_id_or_peername])
113         if not peers:
114             raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
115         peer = peers[0]
116         peer_id = peer['peer_id']
117
118         # Connect to peer API
119         peer.connect()
120
121         timers = {}
122
123         # Get peer data
124         start = time.time()
125         message('RefreshPeer starting up (commit_mode=%r)'%commit_mode)
126         message('Issuing GetPeerData')
127         peer_tables = peer.GetPeerData()
128         # for smooth federation with 4.2 - ignore fields that are useless anyway, and rewrite boot_state
129         boot_state_rewrite={'dbg':'safeboot','diag':'safeboot','disable':'disabled',
130                             'inst':'reinstall','rins':'reinstall','new':'reinstall','rcnf':'reinstall'}
131         for node in peer_tables['Nodes']:
132             for key in ['nodenetwork_ids','dummybox_id']:
133                 if key in node:
134                     del node[key]
135             if node['boot_state'] in boot_state_rewrite: node['boot_state']=boot_state_rewrite[node['boot_state']]
136         for slice in peer_tables['Slices']:
137             for key in ['slice_attribute_ids']:
138                 if key in slice:
139                     del slice[key]
140         timers['transport'] = time.time() - start - peer_tables['db_time']
141         timers['peer_db'] = peer_tables['db_time']
142         message_verbose('GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport']))
143
144         def sync(objects, peer_objects, classobj):
145             """
146             Synchronizes two dictionaries of objects. objects should
147             be a dictionary of local objects keyed on their foreign
148             identifiers. peer_objects should be a dictionary of
149             foreign objects keyed on their local (i.e., foreign to us)
150             identifiers. Returns a final dictionary of local objects
151             keyed on their foreign identifiers.
152             """
153
154             classname=classobj(self.api).__class__.__name__
155             message_verbose('Entering sync on %s'%classname)
156
157             synced = {}
158
159             # Delete stale objects
160             for peer_object_id, object in objects.iteritems():
161                 if peer_object_id not in peer_objects:
162                     object.delete(commit = commit_mode)
163                     message("%s %s %s deleted"%(peer['peername'],classname, object[object.primary_key]))
164
165             total = len(peer_objects)
166             count=1
167             # set this to something realistic to trace down a given object(s)
168             trace_type="Node"
169             trace_ids=[]
170             def trace (message):
171                 if classname == trace_type and peer_object_id in trace_ids:
172                     message_verbose('TRACE>>'+message)
173                 
174             # Add/update new/existing objects
175             for peer_object_id, peer_object in peer_objects.iteritems():
176                 message_verbose ('DBG %s peer_object_id=%d (%d/%d)'%(classname,peer_object_id,count,total))
177                 count += 1
178                 if peer_object_id in synced:
179                     message("Warning: %s Skipping already added %s: %r"%(
180                             peer['peername'], classname, peer_object))
181                     continue
182                 if classname == 'Node':
183                     message_verbose ('DBG>> hostname=%s'%peer_object['hostname'])
184                 elif classname == "Slice":
185                     message_verbose ('DBG>> slicename=%s'%peer_object['name'])
186                 if peer_object_id in objects:
187                     # Update existing object
188                     object = objects[peer_object_id]
189
190                     # Replace foreign identifier with existing local
191                     # identifier temporarily for the purposes of
192                     # comparison.
193                     peer_object[object.primary_key] = object[object.primary_key]
194
195                     # Must use __eq__() instead of == since
196                     # peer_object may be a raw dict instead of a Peer
197                     # object.
198                     trace ("in objects : comparing")
199                     if not object.__eq__(peer_object):
200                         # Only update intrinsic fields
201                         trace ("updating")
202                         object.update(object.db_fields(peer_object))
203                         trace ("updated")
204                         sync = True
205                         dbg = "changed"
206                     else:
207                         trace ("intact")
208                         sync = False
209                         dbg = None
210
211                     # Restore foreign identifier
212                     peer_object[object.primary_key] = peer_object_id
213                 else:
214                     trace ("not in objects -- creating")
215                     # Add new object
216                     object = classobj(self.api, peer_object)
217                     trace ("created")
218                     # Replace foreign identifier with new local identifier
219                     del object[object.primary_key]
220                     trace ("forced clean id")
221                     sync = True
222                     dbg = "added"
223
224                 if sync:
225                     message_verbose("DBG>> syncing %s %d - commit_mode=%r"%(classname,peer_object_id,commit_mode))
226                     try:
227                         object.sync(commit = commit_mode)
228                     except PLCInvalidArgument, err:
229                         # Skip if validation fails
230                         # XXX Log an event instead of printing to logfile
231                         message("Warning: %s Skipping invalid %s %r : %r"%(\
232                                 peer['peername'], classname, peer_object, err))
233                         continue
234
235                 synced[peer_object_id] = object
236
237                 if dbg:
238                     message("%s: %s %d %s"%(peer['peername'], classname, object[object.primary_key], dbg))
239
240             message_verbose("Exiting sync on %s"%classname)
241
242             return synced
243
244         #
245         # Synchronize foreign sites
246         #
247
248         start = time.time()
249
250         message('Dealing with Sites')
251
252         # Compare only the columns returned by the GetPeerData() call
253         if peer_tables['Sites']:
254             columns = peer_tables['Sites'][0].keys()
255         else:
256             columns = None
257
258         # Keyed on foreign site_id
259         old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
260         sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
261
262         # Synchronize new set (still keyed on foreign site_id)
263         peer_sites = sync(old_peer_sites, sites_at_peer, Site)
264
265         for peer_site_id, site in peer_sites.iteritems():
266             # Bind any newly cached sites to peer
267             if peer_site_id not in old_peer_sites:
268                 peer.add_site(site, peer_site_id, commit = commit_mode)
269                 site['peer_id'] = peer_id
270                 site['peer_site_id'] = peer_site_id
271
272         timers['site'] = time.time() - start
273
274         #
275         # XXX Synchronize foreign key types
276         #
277
278         message('Dealing with Keys')
279
280         key_types = KeyTypes(self.api).dict()
281
282         #
283         # Synchronize foreign keys
284         #
285
286         start = time.time()
287
288         # Compare only the columns returned by the GetPeerData() call
289         if peer_tables['Keys']:
290             columns = peer_tables['Keys'][0].keys()
291         else:
292             columns = None
293
294         # Keyed on foreign key_id
295         old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
296         keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
297
298         # Fix up key_type references
299         for peer_key_id, key in keys_at_peer.items():
300             if key['key_type'] not in key_types:
301                 # XXX Log an event instead of printing to logfile
302                 message("Warning: Skipping invalid %s key %r" % ( peer['peername'], key))
303                 del keys_at_peer[peer_key_id]
304                 continue
305
306         # Synchronize new set (still keyed on foreign key_id)
307         peer_keys = sync(old_peer_keys, keys_at_peer, Key)
308         for peer_key_id, key in peer_keys.iteritems():
309             # Bind any newly cached keys to peer
310             if peer_key_id not in old_peer_keys:
311                 peer.add_key(key, peer_key_id, commit = commit_mode)
312                 key['peer_id'] = peer_id
313                 key['peer_key_id'] = peer_key_id
314
315         timers['keys'] = time.time() - start
316
317         #
318         # Synchronize foreign users
319         #
320
321         start = time.time()
322
323         message('Dealing with Persons')
324
325         # Compare only the columns returned by the GetPeerData() call
326         if peer_tables['Persons']:
327             columns = peer_tables['Persons'][0].keys()
328         else:
329             columns = None
330
331         # Keyed on foreign person_id
332         old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
333
334         # artificially attach the persons returned by GetPeerData to the new peer 
335         # this is because validate_email needs peer_id to be correct when checking for duplicates 
336         for person in peer_tables['Persons']: 
337             person['peer_id']=peer_id
338         persons_at_peer = dict([(peer_person['person_id'], peer_person) \
339                                 for peer_person in peer_tables['Persons']])
340
341         # XXX Do we care about membership in foreign site(s)?
342
343         # Synchronize new set (still keyed on foreign person_id)
344         peer_persons = sync(old_peer_persons, persons_at_peer, Person)
345
346         # transcoder : retrieve a local key_id from a peer_key_id
347         key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
348                                   for peer_key_id,key in peer_keys.iteritems()])
349
350         for peer_person_id, person in peer_persons.iteritems():
351             # Bind any newly cached users to peer
352             if peer_person_id not in old_peer_persons:
353                 peer.add_person(person, peer_person_id, commit = commit_mode)
354                 person['peer_id'] = peer_id
355                 person['peer_person_id'] = peer_person_id
356                 person['key_ids'] = []
357
358             # User as viewed by peer
359             peer_person = persons_at_peer[peer_person_id]
360             
361             # Foreign keys currently belonging to the user
362             old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
363                                   if key_transcoder[key_id] in peer_keys]
364
365             # Foreign keys that should belong to the user
366             # this is basically peer_person['key_ids'], we just check it makes sense 
367             # (e.g. we might have failed importing it)
368             person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
369
370             # Remove stale keys from user
371             for key_id in (set(old_person_key_ids) - set(person_key_ids)):
372                 person.remove_key(peer_keys[key_id], commit = commit_mode)
373                 message ("%s Key %d removed from person %s"%(peer['peername'], key_id, person['email']))
374
375             # Add new keys to user
376             for key_id in (set(person_key_ids) - set(old_person_key_ids)):
377                 person.add_key(peer_keys[key_id], commit = commit_mode)
378                 message ("%s Key %d added into person %s"%(peer['peername'],key_id, person['email']))
379
380         timers['persons'] = time.time() - start
381
382         #
383         # XXX Synchronize foreign boot states
384         #
385
386         boot_states = BootStates(self.api).dict()
387
388         #
389         # Synchronize foreign nodes
390         #
391
392         start = time.time()
393
394         message('Dealing with Nodes (1)')
395
396         # Compare only the columns returned by the GetPeerData() call
397         if peer_tables['Nodes']:
398             columns = peer_tables['Nodes'][0].keys()
399         else:
400             # smooth federation with a 4.2 peer - ignore these fields that are useless anyway
401             columns = Node.fields
402             if 'interface_ids' in columns: columns.remove('interface_ids')
403             if 'dummybox_id' in columns: columns.remove('dummybox_id')
404
405         # Keyed on foreign node_id
406         old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
407         nodes_at_peer = dict([(node['node_id'], node) \
408                               for node in peer_tables['Nodes']])
409
410         # Fix up site_id and boot_states references
411         for peer_node_id, node in nodes_at_peer.items():
412             errors = []
413             if node['site_id'] not in peer_sites:
414                 errors.append("invalid site %d" % node['site_id'])
415             if node['boot_state'] not in boot_states:
416                 errors.append("invalid boot state %s" % node['boot_state'])
417             if errors:
418                 # XXX Log an event instead of printing to logfile
419                 message ("Warning: Skipping invalid %s node %r : " % (peer['peername'], node)\
420                              + ", ".join(errors))
421                 del nodes_at_peer[peer_node_id]
422                 continue
423             else:
424                 node['site_id'] = peer_sites[node['site_id']]['site_id']
425
426         # Synchronize new set
427         peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
428
429         for peer_node_id, node in peer_nodes.iteritems():
430             # Bind any newly cached foreign nodes to peer
431             if peer_node_id not in old_peer_nodes:
432                 peer.add_node(node, peer_node_id, commit = commit_mode)
433                 node['peer_id'] = peer_id
434                 node['peer_node_id'] = peer_node_id
435
436         timers['nodes'] = time.time() - start
437
438         #
439         # Synchronize local nodes
440         #
441
442         start = time.time()
443         message('Dealing with Nodes (2)')
444
445         # Keyed on local node_id
446         local_nodes = Nodes(self.api).dict()
447
448         for node in peer_tables['PeerNodes']:
449             # Foreign identifier for our node as maintained by peer
450             peer_node_id = node['node_id']
451             # Local identifier for our node as cached by peer
452             node_id = node['peer_node_id']
453             if node_id in local_nodes:
454                 # Still a valid local node, add it to the synchronized
455                 # set of local node objects keyed on foreign node_id.
456                 peer_nodes[peer_node_id] = local_nodes[node_id]
457
458         timers['local_nodes'] = time.time() - start
459
460         #
461         # XXX Synchronize foreign slice instantiation states
462         #
463
464         slice_instantiations = SliceInstantiations(self.api).dict()
465
466         #
467         # Synchronize foreign slices
468         #
469
470         start = time.time()
471
472         message('Dealing with Slices (1)')
473
474         # Compare only the columns returned by the GetPeerData() call
475         if peer_tables['Slices']:
476             columns = peer_tables['Slices'][0].keys()
477         else:
478             columns = None
479
480         # Keyed on foreign slice_id
481         old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
482         slices_at_peer = dict([(slice['slice_id'], slice) \
483                                for slice in peer_tables['Slices']])
484
485         # Fix up site_id, instantiation, and creator_person_id references
486         for peer_slice_id, slice in slices_at_peer.items():
487             errors = []
488             if slice['site_id'] not in peer_sites:
489                 errors.append("invalid site %d" % slice['site_id'])
490             if slice['instantiation'] not in slice_instantiations:
491                 errors.append("invalid instantiation %s" % slice['instantiation'])
492             if slice['creator_person_id'] not in peer_persons:
493                 # Just NULL it out
494                 slice['creator_person_id'] = None
495             else:
496                 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
497             if errors:
498                 message("Warning: Skipping invalid %s slice %r : " % (peer['peername'], slice) \
499                             + ", ".join(errors))
500                 del slices_at_peer[peer_slice_id]
501                 continue
502             else:
503                 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
504
505         # Synchronize new set
506         peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
507
508         message('Dealing with Slices (2)')
509         # transcoder : retrieve a local node_id from a peer_node_id
510         node_transcoder = dict ( [ (node['node_id'],peer_node_id) \
511                                    for peer_node_id,node in peer_nodes.iteritems()])
512         person_transcoder = dict ( [ (person['person_id'],peer_person_id) \
513                                      for peer_person_id,person in peer_persons.iteritems()])
514
515         for peer_slice_id, slice in peer_slices.iteritems():
516             # Bind any newly cached foreign slices to peer
517             if peer_slice_id not in old_peer_slices:
518                 peer.add_slice(slice, peer_slice_id, commit = commit_mode)
519                 slice['peer_id'] = peer_id
520                 slice['peer_slice_id'] = peer_slice_id
521                 slice['node_ids'] = []
522                 slice['person_ids'] = []
523
524             # Slice as viewed by peer
525             peer_slice = slices_at_peer[peer_slice_id]
526
527             # Nodes that are currently part of the slice
528             old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
529                                    if node_id in node_transcoder and node_transcoder[node_id] in peer_nodes]
530
531             # Nodes that should be part of the slice
532             slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
533
534             # Remove stale nodes from slice
535             for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
536                 slice.remove_node(peer_nodes[node_id], commit = commit_mode)
537                 message ("%s node %s removed from slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
538
539             # Add new nodes to slice
540             for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
541                 slice.add_node(peer_nodes[node_id], commit = commit_mode)
542                 message ("%s node %s added into slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
543
544             # N.B.: Local nodes that may have been added to the slice
545             # by hand, are removed. In other words, don't do this.
546
547             # Foreign users that are currently part of the slice
548             #old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
549             #                if person_transcoder[person_id] in peer_persons]
550             # An issue occurred with a user who registered on both sites (same email)
551             # So the remote person could not get cached locally
552             # The one-line map/filter style is nicer but ineffective here
553             old_slice_person_ids = []
554             for person_id in slice['person_ids']:
555                 if not person_transcoder.has_key(person_id):
556                     message ('WARNING : person_id %d in %s not transcodable (1) - skipped'%(person_id,slice['name']))
557                 elif person_transcoder[person_id] not in peer_persons:
558                     message('WARNING : person_id %d in %s not transcodable (2) - skipped'%(person_id,slice['name']))
559                 else:
560                     old_slice_person_ids += [person_transcoder[person_id]]
561
562             # Foreign users that should be part of the slice
563             slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
564
565             # Remove stale users from slice
566             for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
567                 slice.remove_person(peer_persons[person_id], commit = commit_mode)
568                 message ("%s user %s removed from slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
569
570             # Add new users to slice
571             for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
572                 slice.add_person(peer_persons[person_id], commit = commit_mode)
573                 message ("%s user %s added into slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
574
575             # N.B.: Local users that may have been added to the slice
576             # by hand, are not touched.
577
578         timers['slices'] = time.time() - start
579
580         # Update peer itself and commit
581         peer.sync(commit = True)
582         
583         return timers