#
# $Id$
+import os
+import sys
+import fcntl
import time
from PLC.Debug import log
verbose=False
+# initial version was doing only one final commit
+# * set commit_mode to False to get that behaviour
+# * set comit_mode to True to get everything synced at once
+commit_mode=True
+
def message (to_print=None,verbose_only=False):
if verbose_only and not verbose:
return
def message_verbose(to_print=None):
message(to_print,verbose_only=True)
+
+class FileLock:
+ """
+ Lock/Unlock file
+ """
+ def __init__(self, file_path, expire = 60 * 60 * 2):
+ self.expire = expire
+ self.fpath = file_path
+ self.fd = None
+
+ def lock(self):
+ if os.path.exists(self.fpath):
+ if (time.time() - os.stat(self.fpath).st_ctime) > self.expire:
+ try:
+ os.unlink(self.fpath)
+ except Exception, e:
+ message('FileLock.lock(%s) : %s' % (self.fpath, e))
+ return False
+ try:
+ self.fd = open(self.fpath, 'w')
+ fcntl.flock(self.fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except IOError, e:
+ message('FileLock.lock(%s) : %s' % (self.fpath, e))
+ return False
+ return True
+
+ def unlock(self):
+ try:
+ fcntl.flock(self.fd, fcntl.LOCK_UN | fcntl.LOCK_NB)
+ self.fd.close()
+ except IOError, e:
+ message('FileLock.unlock(%s) : %s' % (self.fpath, e))
+
+
class RefreshPeer(Method):
"""
Fetches site, node, slice, person and key data from the specified peer
returns = Parameter(int, "1 if successful")
def call(self, auth, peer_id_or_peername):
+ ret_val = None
+ peername = Peers(self.api, [peer_id_or_peername], ['peername'])[0]['peername']
+ file_lock = FileLock("/tmp/refresh-peer-%s.lock" % peername)
+ if not file_lock.lock():
+ raise Exception, "Another instance of RefreshPeer is running."
+ try:
+ ret_val = self.real_call(auth, peer_id_or_peername)
+ except Exception, e:
+ file_lock.unlock()
+ raise Exception, e
+ file_lock.unlock()
+ return ret_val
+
+
+ def real_call(self, auth, peer_id_or_peername):
# Get peer
peers = Peers(self.api, [peer_id_or_peername])
if not peers:
# Get peer data
start = time.time()
+ message('RefreshPeer starting up (commit_mode=%r)'%commit_mode)
message('Issuing GetPeerData')
peer_tables = peer.GetPeerData()
+ # for smooth federation with 4.2 - ignore fields that are useless anyway, and rewrite boot_state
+ boot_state_rewrite={'dbg':'safeboot','diag':'safeboot','disable':'disabled',
+ 'inst':'reinstall','rins':'reinstall','new':'reinstall','rcnf':'reinstall'}
+ for node in peer_tables['Nodes']:
+ for key in ['nodenetwork_ids','dummybox_id']:
+ if key in node:
+ del node[key]
+ if node['boot_state'] in boot_state_rewrite: node['boot_state']=boot_state_rewrite[node['boot_state']]
+ for slice in peer_tables['Slices']:
+ for key in ['slice_attribute_ids']:
+ if key in slice:
+ del slice[key]
timers['transport'] = time.time() - start - peer_tables['db_time']
timers['peer_db'] = peer_tables['db_time']
message_verbose('GetPeerData returned -> db=%d transport=%d'%(timers['peer_db'],timers['transport']))
# Delete stale objects
for peer_object_id, object in objects.iteritems():
if peer_object_id not in peer_objects:
- object.delete(commit = False)
+ object.delete(commit = commit_mode)
message("%s %s %s deleted"%(peer['peername'],classname, object[object.primary_key]))
total = len(peer_objects)
count=1
+ # set this to something realistic to trace down a given object(s)
+ trace_type="Node"
+ trace_ids=[]
+ def trace (message):
+ if classname == trace_type and peer_object_id in trace_ids:
+ message_verbose('TRACE>>'+message)
+
# Add/update new/existing objects
for peer_object_id, peer_object in peer_objects.iteritems():
message_verbose ('DBG %s peer_object_id=%d (%d/%d)'%(classname,peer_object_id,count,total))
count += 1
+ if peer_object_id in synced:
+ message("Warning: %s Skipping already added %s: %r"%(
+ peer['peername'], classname, peer_object))
+ continue
if classname == 'Node':
message_verbose ('DBG>> hostname=%s'%peer_object['hostname'])
elif classname == "Slice":
# Must use __eq__() instead of == since
# peer_object may be a raw dict instead of a Peer
# object.
+ trace ("in objects : comparing")
if not object.__eq__(peer_object):
# Only update intrinsic fields
+ trace ("updating")
object.update(object.db_fields(peer_object))
+ trace ("updated")
sync = True
dbg = "changed"
else:
+ trace ("intact")
sync = False
dbg = None
# Restore foreign identifier
peer_object[object.primary_key] = peer_object_id
else:
+ trace ("not in objects -- creating")
# Add new object
object = classobj(self.api, peer_object)
+ trace ("created")
# Replace foreign identifier with new local identifier
del object[object.primary_key]
+ trace ("forced clean id")
sync = True
dbg = "added"
if sync:
+ message_verbose("DBG>> syncing %s %d - commit_mode=%r"%(classname,peer_object_id,commit_mode))
try:
- object.sync(commit = False)
+ object.sync(commit = commit_mode)
except PLCInvalidArgument, err:
# Skip if validation fails
# XXX Log an event instead of printing to logfile
for peer_site_id, site in peer_sites.iteritems():
# Bind any newly cached sites to peer
if peer_site_id not in old_peer_sites:
- peer.add_site(site, peer_site_id, commit = False)
+ peer.add_site(site, peer_site_id, commit = commit_mode)
site['peer_id'] = peer_id
site['peer_site_id'] = peer_site_id
for peer_key_id, key in peer_keys.iteritems():
# Bind any newly cached keys to peer
if peer_key_id not in old_peer_keys:
- peer.add_key(key, peer_key_id, commit = False)
+ peer.add_key(key, peer_key_id, commit = commit_mode)
key['peer_id'] = peer_id
key['peer_key_id'] = peer_key_id
for peer_person_id, person in peer_persons.iteritems():
# Bind any newly cached users to peer
if peer_person_id not in old_peer_persons:
- peer.add_person(person, peer_person_id, commit = False)
+ peer.add_person(person, peer_person_id, commit = commit_mode)
person['peer_id'] = peer_id
person['peer_person_id'] = peer_person_id
person['key_ids'] = []
# Remove stale keys from user
for key_id in (set(old_person_key_ids) - set(person_key_ids)):
- person.remove_key(peer_keys[key_id], commit = False)
+ person.remove_key(peer_keys[key_id], commit = commit_mode)
message ("%s Key %d removed from person %s"%(peer['peername'], key_id, person['email']))
# Add new keys to user
for key_id in (set(person_key_ids) - set(old_person_key_ids)):
- person.add_key(peer_keys[key_id], commit = False)
+ person.add_key(peer_keys[key_id], commit = commit_mode)
message ("%s Key %d added into person %s"%(peer['peername'],key_id, person['email']))
timers['persons'] = time.time() - start
if peer_tables['Nodes']:
columns = peer_tables['Nodes'][0].keys()
else:
- columns = None
+ # smooth federation with a 4.2 peer - ignore these fields that are useless anyway
+ columns = Node.fields
+ if 'interface_ids' in columns: columns.remove('interface_ids')
+ if 'dummybox_id' in columns: columns.remove('dummybox_id')
# Keyed on foreign node_id
old_peer_nodes = Nodes(self.api, {'peer_id': peer_id}, columns).dict('peer_node_id')
for peer_node_id, node in peer_nodes.iteritems():
# Bind any newly cached foreign nodes to peer
if peer_node_id not in old_peer_nodes:
- peer.add_node(node, peer_node_id, commit = False)
+ peer.add_node(node, peer_node_id, commit = commit_mode)
node['peer_id'] = peer_id
node['peer_node_id'] = peer_node_id
for peer_slice_id, slice in peer_slices.iteritems():
# Bind any newly cached foreign slices to peer
if peer_slice_id not in old_peer_slices:
- peer.add_slice(slice, peer_slice_id, commit = False)
+ peer.add_slice(slice, peer_slice_id, commit = commit_mode)
slice['peer_id'] = peer_id
slice['peer_slice_id'] = peer_slice_id
slice['node_ids'] = []
# Nodes that are currently part of the slice
old_slice_node_ids = [ node_transcoder[node_id] for node_id in slice['node_ids'] \
- if node_transcoder[node_id] in peer_nodes]
+ if node_id in node_transcoder and node_transcoder[node_id] in peer_nodes]
# Nodes that should be part of the slice
slice_node_ids = [ node_id for node_id in peer_slice['node_ids'] if node_id in peer_nodes]
# Remove stale nodes from slice
for node_id in (set(old_slice_node_ids) - set(slice_node_ids)):
- slice.remove_node(peer_nodes[node_id], commit = False)
+ slice.remove_node(peer_nodes[node_id], commit = commit_mode)
message ("%s node %s removed from slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
# Add new nodes to slice
for node_id in (set(slice_node_ids) - set(old_slice_node_ids)):
- slice.add_node(peer_nodes[node_id], commit = False)
+ slice.add_node(peer_nodes[node_id], commit = commit_mode)
message ("%s node %s added into slice %s"%(peer['peername'], peer_nodes[node_id]['hostname'], slice['name']))
# N.B.: Local nodes that may have been added to the slice
# Remove stale users from slice
for person_id in (set(old_slice_person_ids) - set(slice_person_ids)):
- slice.remove_person(peer_persons[person_id], commit = False)
+ slice.remove_person(peer_persons[person_id], commit = commit_mode)
message ("%s user %s removed from slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
# Add new users to slice
for person_id in (set(slice_person_ids) - set(old_slice_person_ids)):
- slice.add_person(peer_persons[person_id], commit = False)
+ slice.add_person(peer_persons[person_id], commit = commit_mode)
message ("%s user %s added into slice %s"%(peer['peername'],peer_persons[person_id]['email'], slice['name']))
# N.B.: Local users that may have been added to the slice
# Update peer itself and commit
peer.sync(commit = True)
-
+
return timers