2 # Thierry Parmentelat - INRIA
11 from PLC.Debug import log
12 from PLC.Faults import *
13 from PLC.Method import Method
14 from PLC.Parameter import Parameter, Mixed
15 from PLC.Auth import Auth
17 from PLC.Peers import Peer, Peers
18 from PLC.Sites import Site, Sites
19 from PLC.Persons import Person, Persons
20 from PLC.KeyTypes import KeyType, KeyTypes
21 from PLC.Keys import Key, Keys
22 from PLC.BootStates import BootState, BootStates
23 from PLC.Nodes import Node, Nodes
24 from PLC.SliceInstantiations import SliceInstantiations
25 from PLC.Slices import Slice, Slices
29 # initial version was doing only one final commit
30 # * set commit_mode to False to get that behaviour
31 # * set comit_mode to True to get everything synced at once
34 def message (to_print=None,verbose_only=False):
35 if verbose_only and not verbose:
37 print >> log, time.strftime("%m-%d-%H-%M-%S:"),
41 def message_verbose(to_print=None):
42 message(to_print,verbose_only=True)
49 def __init__(self, file_path, expire = 60 * 60 * 2):
51 self.fpath = file_path
55 if os.path.exists(self.fpath):
56 if (time.time() - os.stat(self.fpath).st_ctime) > self.expire:
60 message('FileLock.lock(%s) : %s' % (self.fpath, e))
63 self.fd = open(self.fpath, 'w')
64 fcntl.flock(self.fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
66 message('FileLock.lock(%s) : %s' % (self.fpath, e))
72 fcntl.flock(self.fd, fcntl.LOCK_UN | fcntl.LOCK_NB)
75 message('FileLock.unlock(%s) : %s' % (self.fpath, e))
78 class RefreshPeer(Method):
80 Fetches site, node, slice, person and key data from the specified peer
81 and caches it locally; also deletes stale entries.
82 Upon successful completion, returns a dict reporting various timers.
90 Mixed(Peer.fields['peer_id'],
91 Peer.fields['peername']),
94 returns = Parameter(int, "1 if successful")
96 def call(self, auth, peer_id_or_peername):
98 peername = Peers(self.api, [peer_id_or_peername], ['peername'])[0]['peername']
99 file_lock = FileLock("/tmp/refresh-peer-%s.lock" % peername)
100 if not file_lock.lock():
101 raise Exception, "Another instance of RefreshPeer is running."
103 self.real_call(auth, peer_id_or_peername)
110 def real_call(self, auth, peer_id_or_peername):
112 peers = Peers(self.api, [peer_id_or_peername])
114 raise PLCInvalidArgument, "No such peer '%s'" % unicode(peer_id_or_peername)
116 peer_id = peer['peer_id']
118 # Connect to peer API
125 message('RefreshPeer starting up (commit_mode=%r)'%commit_mode)
126 message('Issuing GetPeerData')
127 peer_tables = peer.GetPeerData()
128 # for smooth federation with 4.2 - ignore fields that are useless anyway, and rewrite boot_state
129 boot_state_rewrite={'dbg':'safeboot','diag':'safeboot','disable':'disabled',
130 'inst':'reinstall','rins':'reinstall','new':'reinstall','rcnf':'reinstall'}
131 for node in peer_tables['Nodes']:
132 for key in ['nodenetwork_ids','dummybox_id']:
135 if node['boot_state'] in boot_state_rewrite: node['boot_state']=boot_state_rewrite[node['boot_state']]
136 for slice in peer_tables['Slices']:
137 for key in ['slice_attribute_ids']:
140 timers['transport'] = time.time() - start - peer_tables['db_time']
141 timers['peer_db'] = peer_tables['db_time']
142 message_verbose('GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport']))
144 def sync(objects, peer_objects, classobj):
146 Synchronizes two dictionaries of objects. objects should
147 be a dictionary of local objects keyed on their foreign
148 identifiers. peer_objects should be a dictionary of
149 foreign objects keyed on their local (i.e., foreign to us)
150 identifiers. Returns a final dictionary of local objects
151 keyed on their foreign identifiers.
154 classname=classobj(self.api).__class__.__name__
155 message_verbose('Entering sync on %s'%classname)
159 # Delete stale objects
160 for peer_object_id, object in objects.iteritems():
161 if peer_object_id not in peer_objects:
162 object.delete(commit = commit_mode)
163 message("%s %s %s deleted"%(peer['peername'],classname, object[object.primary_key]))
165 total = len(peer_objects)
167 # set this to something realistic to trace down a given object(s)
171 if classname == trace_type and peer_object_id in trace_ids:
172 message_verbose('TRACE>>'+message)
174 # Add/update new/existing objects
175 for peer_object_id, peer_object in peer_objects.iteritems():
176 message_verbose ('DBG %s peer_object_id=%d (%d/%d)'%(classname,peer_object_id,count,total))
178 if peer_object_id in synced:
179 message("Warning: %s Skipping already added %s: %r"%(
180 peer['peername'], classname, peer_object))
182 if classname == 'Node':
183 message_verbose ('DBG>> hostname=%s'%peer_object['hostname'])
184 elif classname == "Slice":
185 message_verbose ('DBG>> slicename=%s'%peer_object['name'])
186 if peer_object_id in objects:
187 # Update existing object
188 object = objects[peer_object_id]
190 # Replace foreign identifier with existing local
191 # identifier temporarily for the purposes of
193 peer_object[object.primary_key] = object[object.primary_key]
195 # Must use __eq__() instead of == since
196 # peer_object may be a raw dict instead of a Peer
198 trace ("in objects : comparing")
199 if not object.__eq__(peer_object):
200 # Only update intrinsic fields
202 object.update(object.db_fields(peer_object))
211 # Restore foreign identifier
212 peer_object[object.primary_key] = peer_object_id
214 trace ("not in objects -- creating")
216 object = classobj(self.api, peer_object)
218 # Replace foreign identifier with new local identifier
219 del object[object.primary_key]
220 trace ("forced clean id")
225 message_verbose("DBG>> syncing %s %d - commit_mode=%r"%(classname,peer_object_id,commit_mode))
227 object.sync(commit = commit_mode)
228 except PLCInvalidArgument, err:
229 # Skip if validation fails
230 # XXX Log an event instead of printing to logfile
231 message("Warning: %s Skipping invalid %s %r : %r"%(\
232 peer['peername'], classname, peer_object, err))
235 synced[peer_object_id] = object
238 message("%s: %s %d %s"%(peer['peername'], classname, object[object.primary_key], dbg))
240 message_verbose("Exiting sync on %s"%classname)
245 # Synchronize foreign sites
250 message('Dealing with Sites')
252 # Compare only the columns returned by the GetPeerData() call
253 if peer_tables['Sites']:
254 columns = peer_tables['Sites'][0].keys()
258 # Keyed on foreign site_id
259 old_peer_sites = Sites(self.api, {'peer_id': peer_id}, columns).dict('peer_site_id')
260 sites_at_peer = dict([(site['site_id'], site) for site in peer_tables['Sites']])
262 # Synchronize new set (still keyed on foreign site_id)
263 peer_sites = sync(old_peer_sites, sites_at_peer, Site)
265 for peer_site_id, site in peer_sites.iteritems():
266 # Bind any newly cached sites to peer
267 if peer_site_id not in old_peer_sites:
268 peer.add_site(site, peer_site_id, commit = commit_mode)
269 site['peer_id'] = peer_id
270 site['peer_site_id'] = peer_site_id
272 timers['site'] = time.time() - start
275 # XXX Synchronize foreign key types
278 message('Dealing with Keys')
280 key_types = KeyTypes(self.api).dict()
283 # Synchronize foreign keys
288 # Compare only the columns returned by the GetPeerData() call
289 if peer_tables['Keys']:
290 columns = peer_tables['Keys'][0].keys()
294 # Keyed on foreign key_id
295 old_peer_keys = Keys(self.api, {'peer_id': peer_id}, columns).dict('peer_key_id')
296 keys_at_peer = dict([(key['key_id'], key) for key in peer_tables['Keys']])
298 # Fix up key_type references
299 for peer_key_id, key in keys_at_peer.items():
300 if key['key_type'] not in key_types:
301 # XXX Log an event instead of printing to logfile
302 message("Warning: Skipping invalid %s key %r" % ( peer['peername'], key))
303 del keys_at_peer[peer_key_id]
306 # Synchronize new set (still keyed on foreign key_id)
307 peer_keys = sync(old_peer_keys, keys_at_peer, Key)
308 for peer_key_id, key in peer_keys.iteritems():
309 # Bind any newly cached keys to peer
310 if peer_key_id not in old_peer_keys:
311 peer.add_key(key, peer_key_id, commit = commit_mode)
312 key['peer_id'] = peer_id
313 key['peer_key_id'] = peer_key_id
315 timers['keys'] = time.time() - start
318 # Synchronize foreign users
323 message('Dealing with Persons')
325 # Compare only the columns returned by the GetPeerData() call
326 if peer_tables['Persons']:
327 columns = peer_tables['Persons'][0].keys()
331 # Keyed on foreign person_id
332 old_peer_persons = Persons(self.api, {'peer_id': peer_id}, columns).dict('peer_person_id')
334 # artificially attach the persons returned by GetPeerData to the new peer
335 # this is because validate_email needs peer_id to be correct when checking for duplicates
336 for person in peer_tables['Persons']:
337 person['peer_id']=peer_id
338 persons_at_peer = dict([(peer_person['person_id'], peer_person) \
339 for peer_person in peer_tables['Persons']])
341 # XXX Do we care about membership in foreign site(s)?
343 # Synchronize new set (still keyed on foreign person_id)
344 peer_persons = sync(old_peer_persons, persons_at_peer, Person)
346 # transcoder : retrieve a local key_id from a peer_key_id
347 key_transcoder = dict ( [ (key['key_id'],peer_key_id) \
348 for peer_key_id,key in peer_keys.iteritems()])
350 for peer_person_id, person in peer_persons.iteritems():
351 # Bind any newly cached users to peer
352 if peer_person_id not in old_peer_persons:
353 peer.add_person(person, peer_person_id, commit = commit_mode)
354 person['peer_id'] = peer_id
355 person['peer_person_id'] = peer_person_id
356 person['key_ids'] = []
358 # User as viewed by peer
359 peer_person = persons_at_peer[peer_person_id]
361 # Foreign keys currently belonging to the user
362 old_person_key_ids = [key_transcoder[key_id] for key_id in person['key_ids'] \
363 if key_transcoder[key_id] in peer_keys]
365 # Foreign keys that should belong to the user
366 # this is basically peer_person['key_ids'], we just check it makes sense
367 # (e.g. we might have failed importing it)
368 person_key_ids = [ key_id for key_id in peer_person['key_ids'] if key_id in peer_keys]
370 # Remove stale keys from user
371 for key_id in (set(old_person_key_ids) - set(person_key_ids)):
372 person.remove_key(peer_keys[key_id], commit = commit_mode)
373 message ("%s Key %d removed from person %s"%(peer['peername'], key_id, person['email']))
375 # Add new keys to user
376 for key_id in (set(person_key_ids) - set(old_person_key_ids)):
377 person.add_key(peer_keys[key_id], commit = commit_mode)
378 message ("%s Key %d added into person %s"%(peer['peername'],key_id, person['email']))
380 timers['persons'] = time.time() - start
383 # XXX Synchronize foreign boot states
386 boot_states = BootStates(self.api).dict()
389 # Synchronize foreign nodes
394 message('Dealing with Nodes (1)')
396 # Compare only the columns returned by the GetPeerData() call
397 if peer_tables['Nodes']:
398 columns = peer_tables['Nodes'][0].keys()
400 # smooth federation with a 4.2 peer - ignore these fields that are useless anyway
401 columns = Node.fields
402 if 'interface_ids' in columns: columns.remove('interface_ids')
403 if 'dummybox_id' in columns: columns.remove('dummybox_id')
405 # Keyed on foreign node_id
406 old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
407 nodes_at_peer = dict([(node['node_id'], node) \
408 for node in peer_tables['Nodes']])
410 # Fix up site_id and boot_states references
411 for peer_node_id, node in nodes_at_peer.items():
413 if node['site_id'] not in peer_sites:
414 errors.append("invalid site %d" % node['site_id'])
415 if node['boot_state'] not in boot_states:
416 errors.append("invalid boot state %s" % node['boot_state'])
418 # XXX Log an event instead of printing to logfile
419 message ("Warning: Skipping invalid %s node %r : " % (peer['peername'], node)\
421 del nodes_at_peer[peer_node_id]
424 node['site_id'] = peer_sites[node['site_id']]['site_id']
426 # Synchronize new set
427 peer_nodes = sync(old_peer_nodes, nodes_at_peer, Node)
429 for peer_node_id, node in peer_nodes.iteritems():
430 # Bind any newly cached foreign nodes to peer
431 if peer_node_id not in old_peer_nodes:
432 peer.add_node(node, peer_node_id, commit = commit_mode)
433 node['peer_id'] = peer_id
434 node['peer_node_id'] = peer_node_id
436 timers['nodes'] = time.time() - start
439 # Synchronize local nodes
443 message('Dealing with Nodes (2)')
445 # Keyed on local node_id
446 local_nodes = Nodes(self.api).dict()
448 for node in peer_tables['PeerNodes']:
449 # Foreign identifier for our node as maintained by peer
450 peer_node_id = node['node_id']
451 # Local identifier for our node as cached by peer
452 node_id = node['peer_node_id']
453 if node_id in local_nodes:
454 # Still a valid local node, add it to the synchronized
455 # set of local node objects keyed on foreign node_id.
456 peer_nodes[peer_node_id] = local_nodes[node_id]
458 timers['local_nodes'] = time.time() - start
461 # XXX Synchronize foreign slice instantiation states
464 slice_instantiations = SliceInstantiations(self.api).dict()
467 # Synchronize foreign slices
472 message('Dealing with Slices (1)')
474 # Compare only the columns returned by the GetPeerData() call
475 if peer_tables['Slices']:
476 columns = peer_tables['Slices'][0].keys()
480 # Keyed on foreign slice_id
481 old_peer_slices = Slices(self.api, {'peer_id': peer_id}, columns).dict('peer_slice_id')
482 slices_at_peer = dict([(slice['slice_id'], slice) \
483 for slice in peer_tables['Slices']])
485 # Fix up site_id, instantiation, and creator_person_id references
486 for peer_slice_id, slice in slices_at_peer.items():
488 if slice['site_id'] not in peer_sites:
489 errors.append("invalid site %d" % slice['site_id'])
490 if slice['instantiation'] not in slice_instantiations:
491 errors.append("invalid instantiation %s" % slice['instantiation'])
492 if slice['creator_person_id'] not in peer_persons:
494 slice['creator_person_id'] = None
496 slice['creator_person_id'] = peer_persons[slice['creator_person_id']]['person_id']
498 message("Warning: Skipping invalid %s slice %r : " % (peer['peername'], slice) \
500 del slices_at_peer[peer_slice_id]
503 slice['site_id'] = peer_sites[slice['site_id']]['site_id']
505 # Synchronize new set
506 peer_slices = sync(old_peer_slices, slices_at_peer, Slice)
508 message('Dealing with Slices (2)')
509 # transcoder : retrieve a local node_id from a peer_node_id
510 node_transcoder = dict ( [ (node['node_id'],peer_node_id) \
511 for peer_node_id,node in peer_nodes.iteritems()])
512 person_transcoder = dict ( [ (person['person_id'],peer_person_id) \
513 for peer_person_id,person in peer_persons.iteritems()])
515 for peer_slice_id, slice in peer_slices.iteritems():
516 # Bind any newly cached foreign slices to peer
517 if peer_slice_id not in old_peer_slices:
518 peer.add_slice(slice, peer_slice_id, commit = commit_mode)
519 slice['peer_id'] = peer_id
520 slice['peer_slice_id'] = peer_slice_id
521 slice['node_ids'] = []
522 slice['person_ids'] = []
524 # Slice as viewed by peer
525 peer_slice = slices_at_peer[peer_slice_id]
527 # Nodes that are currently part of the slice
528 old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
529 if node_id in node_transcoder and node_transcoder[node_id] in peer_nodes]
531 # Nodes that should be part of the slice
532 slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
534 # Remove stale nodes from slice
535 for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
536 slice.remove_node(peer_nodes[node_id], commit = commit_mode)
537 message ("%s node %s removed from slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
539 # Add new nodes to slice
540 for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
541 slice.add_node(peer_nodes[node_id], commit = commit_mode)
542 message ("%s node %s added into slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
544 # N.B.: Local nodes that may have been added to the slice
545 # by hand, are removed. In other words, don't do this.
547 # Foreign users that are currently part of the slice
548 #old_slice_person_ids = [ person_transcoder[person_id] for person_id in slice['person_ids'] \
549 # if person_transcoder[person_id] in peer_persons]
550 # An issue occurred with a user who registered on both sites (same email)
551 # So the remote person could not get cached locally
552 # The one-line map/filter style is nicer but ineffective here
553 old_slice_person_ids = []
554 for person_id in slice['person_ids']:
555 if not person_transcoder.has_key(person_id):
556 message ('WARNING : person_id %d in %s not transcodable (1) - skipped'%(person_id,slice['name']))
557 elif person_transcoder[person_id] not in peer_persons:
558 message('WARNING : person_id %d in %s not transcodable (2) - skipped'%(person_id,slice['name']))
560 old_slice_person_ids += [person_transcoder[person_id]]
562 # Foreign users that should be part of the slice
563 slice_person_ids = [ person_id for person_id in peer_slice['person_ids'] if person_id in peer_persons ]
565 # Remove stale users from slice
566 for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
567 slice.remove_person(peer_persons[person_id], commit = commit_mode)
568 message ("%s user %s removed from slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
570 # Add new users to slice
571 for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
572 slice.add_person(peer_persons[person_id], commit = commit_mode)
573 message ("%s user %s added into slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
575 # N.B.: Local users that may have been added to the slice
576 # by hand, are not touched.
578 timers['slices'] = time.time() - start
580 # Update peer itself and commit
581 peer.sync(commit = True)