2 # Thierry Parmentelat - INRIA
12 from PLC.Debug import log
13 from PLC.Faults import *
14 from PLC.Method import Method
15 from PLC.Parameter import Parameter, Mixed
16 from PLC.Auth import Auth
18 from PLC.Peers import Peer, Peers
19 from PLC.Sites import Site, Sites
20 from PLC.Persons import Person, Persons
21 from PLC.KeyTypes import KeyType, KeyTypes
22 from PLC.Keys import Key, Keys
23 from PLC.BootStates import BootState, BootStates
24 from PLC.Nodes import Node, Nodes
25 from PLC.SliceInstantiations import SliceInstantiations
26 from PLC.Slices import Slice, Slices
30 # initial version was doing only one final commit
31 # * set commit_mode to False to get that behaviour
32 # * set comit_mode to True to get everything synced at once
35 def message (to_print=None,verbose_only=False):
36 if verbose_only and not verbose:
38 print >> log, time.strftime("%m-%d-%H-%M-%S:"),
42 def message_verbose(to_print=None):
43 message(to_print,verbose_only=True)
50 def __init__(self, file_path, expire = 60 * 60 * 2):
52 self.fpath = file_path
56 if os.path.exists(self.fpath):
57 if (time.time() - os.stat(self.fpath).st_ctime) > self.expire:
61 message('FileLock.lock(%s) : %s' % (self.fpath, e))
64 self.fd = open(self.fpath, 'w')
65 fcntl.flock(self.fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
67 message('FileLock.lock(%s) : %s' % (self.fpath, e))
73 fcntl.flock(self.fd, fcntl.LOCK_UN | fcntl.LOCK_NB)
76 message('FileLock.unlock(%s) : %s' % (self.fpath, e))
79 class RefreshPeer(Method):
81 Fetches site, node, slice, person and key data from the specified peer
82 and caches it locally; also deletes stale entries.
83 Upon successful completion, returns a dict reporting various timers.
91 Mixed(Peer.fields['peer_id'],
92 Peer.fields['peername']),
95 returns = Parameter(int, "1 if successful")
97 def call(self, auth, peer_id_or_peername):
99 peername = Peers(self.api, [peer_id_or_peername], ['peername'])[0]['peername']
100 file_lock = FileLock("/tmp/refresh-peer-%s.lock" % peername)
101 if not file_lock.lock():
102 raise Exception, "Another instance of RefreshPeer is running."
104 ret_val = self.real_call(auth, peer_id_or_peername)
112 def real_call(self, auth, peer_id_or_peername):
114 peers = Peers(self.api, [peer_id_or_peername])
116 raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
118 peer_id = peer['peer_id']
120 # Connect to peer API
127 message('RefreshPeer starting up (commit_mode=%r)'%commit_mode)
128 message('Issuing GetPeerData')
129 peer_tables = peer.GetPeerData()
130 # for smooth federation with 4.2 - ignore fields that are useless anyway, and rewrite boot_state
131 boot_state_rewrite={'dbg':'safeboot','diag':'safeboot','disable':'disabled',
132 'inst':'reinstall','rins':'reinstall','new':'reinstall','rcnf':'reinstall'}
133 for node in peer_tables['Nodes']:
134 for key in ['nodenetwork_ids','dummybox_id']:
137 if node['boot_state'] in boot_state_rewrite: node['boot_state']=boot_state_rewrite[node['boot_state']]
138 for slice in peer_tables['Slices']:
139 for key in ['slice_attribute_ids']:
142 timers['transport'] = time.time() - start - peer_tables['db_time']
143 timers['peer_db'] = peer_tables['db_time']
144 message_verbose('GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport']))
146 def sync(objects, peer_objects, classobj):
148 Synchronizes two dictionaries of objects. objects should
149 be a dictionary of local objects keyed on their foreign
150 identifiers. peer_objects should be a dictionary of
151 foreign objects keyed on their local (i.e., foreign to us)
152 identifiers. Returns a final dictionary of local objects
153 keyed on their foreign identifiers.
156 classname=classobj(self.api).__class__.__name__
157 message_verbose('Entering sync on %s'%classname)
161 # Delete stale objects
162 for peer_object_id, object in objects.iteritems():
163 if peer_object_id not in peer_objects:
164 object.delete(commit = commit_mode)
165 message("%s %s %s deleted"%(peer['peername'],classname, object[object.primary_key]))
167 total = len(peer_objects)
169 # set this to something realistic to trace down a given object(s)
173 if classname == trace_type and peer_object_id in trace_ids:
174 message_verbose('TRACE>>'+message)
176 # Add/update new/existing objects
177 for peer_object_id, peer_object in peer_objects.iteritems():
178 message_verbose ('DBG %s peer_object_id=%d (%d/%d)'%(classname,peer_object_id,count,total))
180 if peer_object_id in synced:
181 message("Warning: %s Skipping already added %s: %r"%(
182 peer['peername'], classname, peer_object))
184 if classname == 'Node':
185 message_verbose ('DBG>> hostname=%s'%peer_object['hostname'])
186 elif classname == "Slice":
187 message_verbose ('DBG>> slicename=%s'%peer_object['name'])
188 if peer_object_id in objects:
189 # Update existing object
190 object = objects[peer_object_id]
192 # Replace foreign identifier with existing local
193 # identifier temporarily for the purposes of
195 peer_object[object.primary_key] = object[object.primary_key]
197 # Must use __eq__() instead of == since
198 # peer_object may be a raw dict instead of a Peer
200 trace ("in objects : comparing")
201 if not object.__eq__(peer_object):
202 # Only update intrinsic fields
204 object.update(object.db_fields(peer_object))
213 # Restore foreign identifier
214 peer_object[object.primary_key] = peer_object_id
216 trace ("not in objects -- creating")
218 object = classobj(self.api, peer_object)
220 # Replace foreign identifier with new local identifier
221 del object[object.primary_key]
222 trace ("forced clean id")
227 message_verbose("DBG>> syncing %s %d - commit_mode=%r"%(classname,peer_object_id,commit_mode))
229 object.sync(commit = commit_mode)
230 except PLCInvalidArgument, err:
231 # Skip if validation fails
232 # XXX Log an event instead of printing to logfile
233 message("Warning: %s Skipping invalid %s %r : %r"%(\
234 peer['peername'], classname, peer_object, err))
237 synced[peer_object_id] = object
240 message("%s: %s %d %s"%(peer['peername'], classname, object[object.primary_key], dbg))
242 message_verbose("Exiting sync on %s"%classname)
247 # Synchronize foreign sites
252 message('Dealing with Sites')
254 # Compare only the columns returned by the GetPeerData() call
255 if peer_tables['Sites']:
256 columns = peer_tables['Sites'][0].keys()
260 # Keyed on foreign site_id
261 old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
262 sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
264 # Synchronize new set (still keyed on foreign site_id)
265 peer_sites = sync(old_peer_sites, sites_at_peer, Site)
267 for peer_site_id, site in peer_sites.iteritems():
268 # Bind any newly cached sites to peer
269 if peer_site_id not in old_peer_sites:
270 peer.add_site(site, peer_site_id, commit = commit_mode)
271 site['peer_id'] = peer_id
272 site['peer_site_id'] = peer_site_id
274 timers['site'] = time.time() - start
277 # XXX Synchronize foreign key types
280 message('Dealing with Keys')
282 key_types = KeyTypes(self.api).dict()
285 # Synchronize foreign keys
290 # Compare only the columns returned by the GetPeerData() call
291 if peer_tables['Keys']:
292 columns = peer_tables['Keys'][0].keys()
296 # Keyed on foreign key_id
297 old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
298 keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
300 # Fix up key_type references
301 for peer_key_id, key in keys_at_peer.items():
302 if key['key_type'] not in key_types:
303 # XXX Log an event instead of printing to logfile
304 message("Warning: Skipping invalid %s key %r" % ( peer['peername'], key))
305 del keys_at_peer[peer_key_id]
308 # Synchronize new set (still keyed on foreign key_id)
309 peer_keys = sync(old_peer_keys, keys_at_peer, Key)
310 for peer_key_id, key in peer_keys.iteritems():
311 # Bind any newly cached keys to peer
312 if peer_key_id not in old_peer_keys:
313 peer.add_key(key, peer_key_id, commit = commit_mode)
314 key['peer_id'] = peer_id
315 key['peer_key_id'] = peer_key_id
317 timers['keys'] = time.time() - start
320 # Synchronize foreign users
325 message('Dealing with Persons')
327 # Compare only the columns returned by the GetPeerData() call
328 if peer_tables['Persons']:
329 columns = peer_tables['Persons'][0].keys()
333 # Keyed on foreign person_id
334 old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
336 # artificially attach the persons returned by GetPeerData to the new peer
337 # this is because validate_email needs peer_id to be correct when checking for duplicates
338 for person in peer_tables['Persons']:
339 person['peer_id']=peer_id
340 persons_at_peer = dict([(peer_person['person_id'], peer_person) \
341 for peer_person in peer_tables['Persons']])
343 # XXX Do we care about membership in foreign site(s)?
345 # Synchronize new set (still keyed on foreign person_id)
346 peer_persons = sync(old_peer_persons, persons_at_peer, Person)
348 # transcoder : retrieve a local key_id from a peer_key_id
349 key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
350 for peer_key_id,key in peer_keys.iteritems()])
352 for peer_person_id, person in peer_persons.iteritems():
353 # Bind any newly cached users to peer
354 if peer_person_id not in old_peer_persons:
355 peer.add_person(person, peer_person_id, commit = commit_mode)
356 person['peer_id'] = peer_id
357 person['peer_person_id'] = peer_person_id
358 person['key_ids'] = []
360 # User as viewed by peer
361 peer_person = persons_at_peer[peer_person_id]
363 # Foreign keys currently belonging to the user
364 old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
365 if key_transcoder[key_id] in peer_keys]
367 # Foreign keys that should belong to the user
368 # this is basically peer_person['key_ids'], we just check it makes sense
369 # (e.g. we might have failed importing it)
370 person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
372 # Remove stale keys from user
373 for key_id in (set(old_person_key_ids) - set(person_key_ids)):
374 person.remove_key(peer_keys[key_id], commit = commit_mode)
375 message ("%s Key %d removed from person %s"%(peer['peername'], key_id, person['email']))
377 # Add new keys to user
378 for key_id in (set(person_key_ids) - set(old_person_key_ids)):
379 person.add_key(peer_keys[key_id], commit = commit_mode)
380 message ("%s Key %d added into person %s"%(peer['peername'],key_id, person['email']))
382 timers['persons'] = time.time() - start
385 # XXX Synchronize foreign boot states
388 boot_states = BootStates(self.api).dict()
391 # Synchronize foreign nodes
396 message('Dealing with Nodes (1)')
398 # Compare only the columns returned by the GetPeerData() call
399 if peer_tables['Nodes']:
400 columns = peer_tables['Nodes'][0].keys()
402 # smooth federation with a 4.2 peer - ignore these fields that are useless anyway
403 columns = Node.fields
404 if 'interface_ids' in columns: columns.remove('interface_ids')
405 if 'dummybox_id' in columns: columns.remove('dummybox_id')
407 # Keyed on foreign node_id
408 old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
409 nodes_at_peer = dict([(node['node_id'], node) \
410 for node in peer_tables['Nodes']])
412 # Fix up site_id and boot_states references
413 for peer_node_id, node in nodes_at_peer.items():
415 if node['site_id'] not in peer_sites:
416 errors.append("invalid site %d" % node['site_id'])
417 if node['boot_state'] not in boot_states:
418 errors.append("invalid boot state %s" % node['boot_state'])
420 # XXX Log an event instead of printing to logfile
421 message ("Warning: Skipping invalid %s node %r : " % (peer['peername'], node)\
423 del nodes_at_peer[peer_node_id]
426 node['site_id'] = peer_sites[node['site_id']]['site_id']
428 # Synchronize new set
429 peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
431 for peer_node_id, node in peer_nodes.iteritems():
432 # Bind any newly cached foreign nodes to peer
433 if peer_node_id not in old_peer_nodes:
434 peer.add_node(node, peer_node_id, commit = commit_mode)
435 node['peer_id'] = peer_id
436 node['peer_node_id'] = peer_node_id
438 timers['nodes'] = time.time() - start
441 # Synchronize local nodes
445 message('Dealing with Nodes (2)')
447 # Keyed on local node_id
448 local_nodes = Nodes(self.api).dict()
450 for node in peer_tables['PeerNodes']:
451 # Foreign identifier for our node as maintained by peer
452 peer_node_id = node['node_id']
453 # Local identifier for our node as cached by peer
454 node_id = node['peer_node_id']
455 if node_id in local_nodes:
456 # Still a valid local node, add it to the synchronized
457 # set of local node objects keyed on foreign node_id.
458 peer_nodes[peer_node_id] = local_nodes[node_id]
460 timers['local_nodes'] = time.time() - start
463 # XXX Synchronize foreign slice instantiation states
466 slice_instantiations = SliceInstantiations(self.api).dict()
469 # Synchronize foreign slices
474 message('Dealing with Slices (1)')
476 # Compare only the columns returned by the GetPeerData() call
477 if peer_tables['Slices']:
478 columns = peer_tables['Slices'][0].keys()
482 # Keyed on foreign slice_id
483 old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
484 slices_at_peer = dict([(slice['slice_id'], slice) \
485 for slice in peer_tables['Slices']])
487 # Fix up site_id, instantiation, and creator_person_id references
488 for peer_slice_id, slice in slices_at_peer.items():
490 if slice['site_id'] not in peer_sites:
491 errors.append("invalid site %d" % slice['site_id'])
492 if slice['instantiation'] not in slice_instantiations:
493 errors.append("invalid instantiation %s" % slice['instantiation'])
494 if slice['creator_person_id'] not in peer_persons:
496 slice['creator_person_id'] = None
498 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
500 message("Warning: Skipping invalid %s slice %r : " % (peer['peername'], slice) \
502 del slices_at_peer[peer_slice_id]
505 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
507 # Synchronize new set
508 peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
510 message('Dealing with Slices (2)')
511 # transcoder : retrieve a local node_id from a peer_node_id
512 node_transcoder = dict ( [ (node['node_id'],peer_node_id) \
513 for peer_node_id,node in peer_nodes.iteritems()])
514 person_transcoder = dict ( [ (person['person_id'],peer_person_id) \
515 for peer_person_id,person in peer_persons.iteritems()])
517 for peer_slice_id, slice in peer_slices.iteritems():
518 # Bind any newly cached foreign slices to peer
519 if peer_slice_id not in old_peer_slices:
520 peer.add_slice(slice, peer_slice_id, commit = commit_mode)
521 slice['peer_id'] = peer_id
522 slice['peer_slice_id'] = peer_slice_id
523 slice['node_ids'] = []
524 slice['person_ids'] = []
526 # Slice as viewed by peer
527 peer_slice = slices_at_peer[peer_slice_id]
529 # Nodes that are currently part of the slice
530 old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
531 if node_id in node_transcoder and node_transcoder[node_id] in peer_nodes]
533 # Nodes that should be part of the slice
534 slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
536 # Remove stale nodes from slice
537 for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
538 slice.remove_node(peer_nodes[node_id], commit = commit_mode)
539 message ("%s node %s removed from slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
541 # Add new nodes to slice
542 for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
543 slice.add_node(peer_nodes[node_id], commit = commit_mode)
544 message ("%s node %s added into slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
546 # N.B.: Local nodes that may have been added to the slice
547 # by hand, are removed. In other words, don't do this.
549 # Foreign users that are currently part of the slice
550 #old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
551 # if person_transcoder[person_id] in peer_persons]
552 # An issue occurred with a user who registered on both sites (same email)
553 # So the remote person could not get cached locally
554 # The one-line map/filter style is nicer but ineffective here
555 old_slice_person_ids = []
556 for person_id in slice['person_ids']:
557 if not person_transcoder.has_key(person_id):
558 message ('WARNING : person_id %d in %s not transcodable (1) - skipped'%(person_id,slice['name']))
559 elif person_transcoder[person_id] not in peer_persons:
560 message('WARNING : person_id %d in %s not transcodable (2) - skipped'%(person_id,slice['name']))
562 old_slice_person_ids += [person_transcoder[person_id]]
564 # Foreign users that should be part of the slice
565 slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
567 # Remove stale users from slice
568 for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
569 slice.remove_person(peer_persons[person_id], commit = commit_mode)
570 message ("%s user %s removed from slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
572 # Add new users to slice
573 for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
574 slice.add_person(peer_persons[person_id], commit = commit_mode)
575 message ("%s user %s added into slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
577 # N.B.: Local users that may have been added to the slice
578 # by hand, are not touched.
580 timers['slices'] = time.time() - start
582 # Update peer itself and commit
583 peer.sync(commit = True)