2 # Thierry Parmentelat - INRIA
11 from PLC.Debug import log
12 from PLC.Faults import *
13 from PLC.Method import Method
14 from PLC.Parameter import Parameter, Mixed
15 from PLC.Auth import Auth
17 from PLC.Peers import Peer, Peers
18 from PLC.Sites import Site, Sites
19 from PLC.Persons import Person, Persons
20 from PLC.KeyTypes import KeyType, KeyTypes
21 from PLC.Keys import Key, Keys
22 from PLC.BootStates import BootState, BootStates
23 from PLC.Nodes import Node, Nodes
24 from PLC.SliceInstantiations import SliceInstantiations
25 from PLC.Slices import Slice, Slices
29 # initial version was doing only one final commit
30 # * set commit_mode to False to get that behaviour
31 # * set comit_mode to True to get everything synced at once
34 def message (to_print=None,verbose_only=False):
35 if verbose_only and not verbose:
37 print >> log, time.strftime("%m-%d-%H-%M-%S:"),
41 def message_verbose(to_print=None):
42 message(to_print,verbose_only=True)
49 def __init__(self, file_path, expire = 60 * 60 * 2):
51 self.fpath = file_path
55 if os.path.exists(self.fpath):
56 if (time.time() - os.stat(self.fpath).st_ctime) > self.expire:
60 message('FileLock.lock(%s) : %s' % (self.fpath, e))
63 self.fd = open(self.fpath, 'w')
64 fcntl.flock(self.fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
66 message('FileLock.lock(%s) : %s' % (self.fpath, e))
72 fcntl.flock(self.fd, fcntl.LOCK_UN | fcntl.LOCK_NB)
75 message('FileLock.unlock(%s) : %s' % (self.fpath, e))
78 class RefreshPeer(Method):
80 Fetches site, node, slice, person and key data from the specified peer
81 and caches it locally; also deletes stale entries.
82 Upon successful completion, returns a dict reporting various timers.
90 Mixed(Peer.fields['peer_id'],
91 Peer.fields['peername']),
94 returns = Parameter(int, "1 if successful")
96 def call(self, auth, peer_id_or_peername):
98 peername = Peers(self.api, [peer_id_or_peername], ['peername'])[0]['peername']
99 file_lock = FileLock("/tmp/refresh-peer-%s.lock" % peername)
100 if not file_lock.lock():
101 raise Exception, "Another instance of RefreshPeer is running."
103 ret_val = self.real_call(auth, peer_id_or_peername)
111 def real_call(self, auth, peer_id_or_peername):
113 peers = Peers(self.api, [peer_id_or_peername])
115 raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
117 peer_id = peer['peer_id']
119 # Connect to peer API
126 message('RefreshPeer starting up (commit_mode=%r)'%commit_mode)
127 message('Issuing GetPeerData')
128 peer_tables = peer.GetPeerData()
129 # for smooth federation with 4.2 - ignore fields that are useless anyway, and rewrite boot_state
130 boot_state_rewrite={'dbg':'safeboot','diag':'safeboot','disable':'disabled',
131 'inst':'reinstall','rins':'reinstall','new':'reinstall','rcnf':'reinstall'}
132 for node in peer_tables['Nodes']:
133 for key in ['nodenetwork_ids','dummybox_id']:
136 if node['boot_state'] in boot_state_rewrite: node['boot_state']=boot_state_rewrite[node['boot_state']]
137 for slice in peer_tables['Slices']:
138 for key in ['slice_attribute_ids']:
141 timers['transport'] = time.time() - start - peer_tables['db_time']
142 timers['peer_db'] = peer_tables['db_time']
143 message_verbose('GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport']))
145 def sync(objects, peer_objects, classobj):
147 Synchronizes two dictionaries of objects. objects should
148 be a dictionary of local objects keyed on their foreign
149 identifiers. peer_objects should be a dictionary of
150 foreign objects keyed on their local (i.e., foreign to us)
151 identifiers. Returns a final dictionary of local objects
152 keyed on their foreign identifiers.
155 classname=classobj(self.api).__class__.__name__
156 message_verbose('Entering sync on %s'%classname)
160 # Delete stale objects
161 for peer_object_id, object in objects.iteritems():
162 if peer_object_id not in peer_objects:
163 object.delete(commit = commit_mode)
164 message("%s %s %s deleted"%(peer['peername'],classname, object[object.primary_key]))
166 total = len(peer_objects)
168 # set this to something realistic to trace down a given object(s)
172 if classname == trace_type and peer_object_id in trace_ids:
173 message_verbose('TRACE>>'+message)
175 # Add/update new/existing objects
176 for peer_object_id, peer_object in peer_objects.iteritems():
177 message_verbose ('DBG %s peer_object_id=%d (%d/%d)'%(classname,peer_object_id,count,total))
179 if peer_object_id in synced:
180 message("Warning: %s Skipping already added %s: %r"%(
181 peer['peername'], classname, peer_object))
183 if classname == 'Node':
184 message_verbose ('DBG>> hostname=%s'%peer_object['hostname'])
185 elif classname == "Slice":
186 message_verbose ('DBG>> slicename=%s'%peer_object['name'])
187 if peer_object_id in objects:
188 # Update existing object
189 object = objects[peer_object_id]
191 # Replace foreign identifier with existing local
192 # identifier temporarily for the purposes of
194 peer_object[object.primary_key] = object[object.primary_key]
196 # Must use __eq__() instead of == since
197 # peer_object may be a raw dict instead of a Peer
199 trace ("in objects : comparing")
200 if not object.__eq__(peer_object):
201 # Only update intrinsic fields
203 object.update(object.db_fields(peer_object))
212 # Restore foreign identifier
213 peer_object[object.primary_key] = peer_object_id
215 trace ("not in objects -- creating")
217 object = classobj(self.api, peer_object)
219 # Replace foreign identifier with new local identifier
220 del object[object.primary_key]
221 trace ("forced clean id")
226 message_verbose("DBG>> syncing %s %d - commit_mode=%r"%(classname,peer_object_id,commit_mode))
228 object.sync(commit = commit_mode)
229 except PLCInvalidArgument, err:
230 # Skip if validation fails
231 # XXX Log an event instead of printing to logfile
232 message("Warning: %s Skipping invalid %s %r : %r"%(\
233 peer['peername'], classname, peer_object, err))
236 synced[peer_object_id] = object
239 message("%s: %s %d %s"%(peer['peername'], classname, object[object.primary_key], dbg))
241 message_verbose("Exiting sync on %s"%classname)
246 # Synchronize foreign sites
251 message('Dealing with Sites')
253 # Compare only the columns returned by the GetPeerData() call
254 if peer_tables['Sites']:
255 columns = peer_tables['Sites'][0].keys()
259 # Keyed on foreign site_id
260 old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
261 sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
263 # Synchronize new set (still keyed on foreign site_id)
264 peer_sites = sync(old_peer_sites, sites_at_peer, Site)
266 for peer_site_id, site in peer_sites.iteritems():
267 # Bind any newly cached sites to peer
268 if peer_site_id not in old_peer_sites:
269 peer.add_site(site, peer_site_id, commit = commit_mode)
270 site['peer_id'] = peer_id
271 site['peer_site_id'] = peer_site_id
273 timers['site'] = time.time() - start
276 # XXX Synchronize foreign key types
279 message('Dealing with Keys')
281 key_types = KeyTypes(self.api).dict()
284 # Synchronize foreign keys
289 # Compare only the columns returned by the GetPeerData() call
290 if peer_tables['Keys']:
291 columns = peer_tables['Keys'][0].keys()
295 # Keyed on foreign key_id
296 old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
297 keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
299 # Fix up key_type references
300 for peer_key_id, key in keys_at_peer.items():
301 if key['key_type'] not in key_types:
302 # XXX Log an event instead of printing to logfile
303 message("Warning: Skipping invalid %s key %r" % ( peer['peername'], key))
304 del keys_at_peer[peer_key_id]
307 # Synchronize new set (still keyed on foreign key_id)
308 peer_keys = sync(old_peer_keys, keys_at_peer, Key)
309 for peer_key_id, key in peer_keys.iteritems():
310 # Bind any newly cached keys to peer
311 if peer_key_id not in old_peer_keys:
312 peer.add_key(key, peer_key_id, commit = commit_mode)
313 key['peer_id'] = peer_id
314 key['peer_key_id'] = peer_key_id
316 timers['keys'] = time.time() - start
319 # Synchronize foreign users
324 message('Dealing with Persons')
326 # Compare only the columns returned by the GetPeerData() call
327 if peer_tables['Persons']:
328 columns = peer_tables['Persons'][0].keys()
332 # Keyed on foreign person_id
333 old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
335 # artificially attach the persons returned by GetPeerData to the new peer
336 # this is because validate_email needs peer_id to be correct when checking for duplicates
337 for person in peer_tables['Persons']:
338 person['peer_id']=peer_id
339 persons_at_peer = dict([(peer_person['person_id'], peer_person) \
340 for peer_person in peer_tables['Persons']])
342 # XXX Do we care about membership in foreign site(s)?
344 # Synchronize new set (still keyed on foreign person_id)
345 peer_persons = sync(old_peer_persons, persons_at_peer, Person)
347 # transcoder : retrieve a local key_id from a peer_key_id
348 key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
349 for peer_key_id,key in peer_keys.iteritems()])
351 for peer_person_id, person in peer_persons.iteritems():
352 # Bind any newly cached users to peer
353 if peer_person_id not in old_peer_persons:
354 peer.add_person(person, peer_person_id, commit = commit_mode)
355 person['peer_id'] = peer_id
356 person['peer_person_id'] = peer_person_id
357 person['key_ids'] = []
359 # User as viewed by peer
360 peer_person = persons_at_peer[peer_person_id]
362 # Foreign keys currently belonging to the user
363 old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
364 if key_transcoder[key_id] in peer_keys]
366 # Foreign keys that should belong to the user
367 # this is basically peer_person['key_ids'], we just check it makes sense
368 # (e.g. we might have failed importing it)
369 person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
371 # Remove stale keys from user
372 for key_id in (set(old_person_key_ids) - set(person_key_ids)):
373 person.remove_key(peer_keys[key_id], commit = commit_mode)
374 message ("%s Key %d removed from person %s"%(peer['peername'], key_id, person['email']))
376 # Add new keys to user
377 for key_id in (set(person_key_ids) - set(old_person_key_ids)):
378 person.add_key(peer_keys[key_id], commit = commit_mode)
379 message ("%s Key %d added into person %s"%(peer['peername'],key_id, person['email']))
381 timers['persons'] = time.time() - start
384 # XXX Synchronize foreign boot states
387 boot_states = BootStates(self.api).dict()
390 # Synchronize foreign nodes
395 message('Dealing with Nodes (1)')
397 # Compare only the columns returned by the GetPeerData() call
398 if peer_tables['Nodes']:
399 columns = peer_tables['Nodes'][0].keys()
401 # smooth federation with a 4.2 peer - ignore these fields that are useless anyway
402 columns = Node.fields
403 if 'interface_ids' in columns: columns.remove('interface_ids')
404 if 'dummybox_id' in columns: columns.remove('dummybox_id')
406 # Keyed on foreign node_id
407 old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
408 nodes_at_peer = dict([(node['node_id'], node) \
409 for node in peer_tables['Nodes']])
411 # Fix up site_id and boot_states references
412 for peer_node_id, node in nodes_at_peer.items():
414 if node['site_id'] not in peer_sites:
415 errors.append("invalid site %d" % node['site_id'])
416 if node['boot_state'] not in boot_states:
417 errors.append("invalid boot state %s" % node['boot_state'])
419 # XXX Log an event instead of printing to logfile
420 message ("Warning: Skipping invalid %s node %r : " % (peer['peername'], node)\
422 del nodes_at_peer[peer_node_id]
425 node['site_id'] = peer_sites[node['site_id']]['site_id']
427 # Synchronize new set
428 peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
430 for peer_node_id, node in peer_nodes.iteritems():
431 # Bind any newly cached foreign nodes to peer
432 if peer_node_id not in old_peer_nodes:
433 peer.add_node(node, peer_node_id, commit = commit_mode)
434 node['peer_id'] = peer_id
435 node['peer_node_id'] = peer_node_id
437 timers['nodes'] = time.time() - start
440 # Synchronize local nodes
444 message('Dealing with Nodes (2)')
446 # Keyed on local node_id
447 local_nodes = Nodes(self.api).dict()
449 for node in peer_tables['PeerNodes']:
450 # Foreign identifier for our node as maintained by peer
451 peer_node_id = node['node_id']
452 # Local identifier for our node as cached by peer
453 node_id = node['peer_node_id']
454 if node_id in local_nodes:
455 # Still a valid local node, add it to the synchronized
456 # set of local node objects keyed on foreign node_id.
457 peer_nodes[peer_node_id] = local_nodes[node_id]
459 timers['local_nodes'] = time.time() - start
462 # XXX Synchronize foreign slice instantiation states
465 slice_instantiations = SliceInstantiations(self.api).dict()
468 # Synchronize foreign slices
473 message('Dealing with Slices (1)')
475 # Compare only the columns returned by the GetPeerData() call
476 if peer_tables['Slices']:
477 columns = peer_tables['Slices'][0].keys()
481 # Keyed on foreign slice_id
482 old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
483 slices_at_peer = dict([(slice['slice_id'], slice) \
484 for slice in peer_tables['Slices']])
486 # Fix up site_id, instantiation, and creator_person_id references
487 for peer_slice_id, slice in slices_at_peer.items():
489 if slice['site_id'] not in peer_sites:
490 errors.append("invalid site %d" % slice['site_id'])
491 if slice['instantiation'] not in slice_instantiations:
492 errors.append("invalid instantiation %s" % slice['instantiation'])
493 if slice['creator_person_id'] not in peer_persons:
495 slice['creator_person_id'] = None
497 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
499 message("Warning: Skipping invalid %s slice %r : " % (peer['peername'], slice) \
501 del slices_at_peer[peer_slice_id]
504 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
506 # Synchronize new set
507 peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
509 message('Dealing with Slices (2)')
510 # transcoder : retrieve a local node_id from a peer_node_id
511 node_transcoder = dict ( [ (node['node_id'],peer_node_id) \
512 for peer_node_id,node in peer_nodes.iteritems()])
513 person_transcoder = dict ( [ (person['person_id'],peer_person_id) \
514 for peer_person_id,person in peer_persons.iteritems()])
516 for peer_slice_id, slice in peer_slices.iteritems():
517 # Bind any newly cached foreign slices to peer
518 if peer_slice_id not in old_peer_slices:
519 peer.add_slice(slice, peer_slice_id, commit = commit_mode)
520 slice['peer_id'] = peer_id
521 slice['peer_slice_id'] = peer_slice_id
522 slice['node_ids'] = []
523 slice['person_ids'] = []
525 # Slice as viewed by peer
526 peer_slice = slices_at_peer[peer_slice_id]
528 # Nodes that are currently part of the slice
529 old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
530 if node_id in node_transcoder and node_transcoder[node_id] in peer_nodes]
532 # Nodes that should be part of the slice
533 slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
535 # Remove stale nodes from slice
536 for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
537 slice.remove_node(peer_nodes[node_id], commit = commit_mode)
538 message ("%s node %s removed from slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
540 # Add new nodes to slice
541 for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
542 slice.add_node(peer_nodes[node_id], commit = commit_mode)
543 message ("%s node %s added into slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
545 # N.B.: Local nodes that may have been added to the slice
546 # by hand, are removed. In other words, don't do this.
548 # Foreign users that are currently part of the slice
549 #old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
550 # if person_transcoder[person_id] in peer_persons]
551 # An issue occurred with a user who registered on both sites (same email)
552 # So the remote person could not get cached locally
553 # The one-line map/filter style is nicer but ineffective here
554 old_slice_person_ids = []
555 for person_id in slice['person_ids']:
556 if not person_transcoder.has_key(person_id):
557 message ('WARNING : person_id %d in %s not transcodable (1) - skipped'%(person_id,slice['name']))
558 elif person_transcoder[person_id] not in peer_persons:
559 message('WARNING : person_id %d in %s not transcodable (2) - skipped'%(person_id,slice['name']))
561 old_slice_person_ids += [person_transcoder[person_id]]
563 # Foreign users that should be part of the slice
564 slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
566 # Remove stale users from slice
567 for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
568 slice.remove_person(peer_persons[person_id], commit = commit_mode)
569 message ("%s user %s removed from slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
571 # Add new users to slice
572 for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
573 slice.add_person(peer_persons[person_id], commit = commit_mode)
574 message ("%s user %s added into slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
576 # N.B.: Local users that may have been added to the slice
577 # by hand, are not touched.
579 timers['slices'] = time.time() - start
581 # Update peer itself and commit
582 peer.sync(commit = True)