### of a CompleterTask subclass
class Completer:
def __init__ (self, tasks, verbose=True, message=None):
- self.tasks=tasks
- self.verbose=verbose
- self.message="({})".format(message) if message else ""
+ self.tasks = tasks
+ self.verbose = verbose
+ self.message = "({})".format(message) if message else ""
def run (self, timeout_timedelta, silent_timedelta, period):
begin = datetime.now()
timeout = begin+timeout_timedelta
graceout = datetime.now()+silent_timedelta
silent_seconds = silent_timedelta.total_seconds()
silent_minutes = silent_seconds/60
- period_seconds=int(period.total_seconds())
+ period_seconds = int(period.total_seconds())
if self.verbose:
if timeout_seconds >= 120:
- utils.header("Completer [%d tasks]: max timeout is %d minutes, "
- "silent for %d minutes (period is %s s)"%\
- (len(self.tasks), timeout_minutes,
- silent_minutes, period_seconds))
+ utils.header("Completer [{} tasks]: max timeout is {} minutes, "
+ "silent for {} minutes (period is {} s)"\
+ .format(len(self.tasks), timeout_minutes,
+ silent_minutes, period_seconds))
else:
- utils.header("Completer [%d tasks]: max timeout is %d seconds, "
- "silent for %d seconds (period is %s s)"%\
- (len(self.tasks), timeout_seconds,
- silent_seconds, period_seconds))
- tasks=self.tasks
+ utils.header("Completer [{} tasks]: max timeout is {} seconds, "
+ "silent for {} seconds (period is {} s)"\
+ .format(len(self.tasks), timeout_seconds,
+ silent_seconds, period_seconds))
+ tasks = self.tasks
while tasks:
- fine=[]
+ fine = []
for task in tasks:
- success=task.run (silent=datetime.now() <= graceout)
- if success: fine.append(task)
- for task in fine: tasks.remove(task)
+ success = task.run (silent=datetime.now() <= graceout)
+ if success:
+ fine.append(task)
+ for task in fine:
+ tasks.remove(task)
if not tasks:
if self.verbose:
- duration = datetime.now()-begin
+ duration = datetime.now() - begin
print "total completer {} {}s".format(self.message,
int(duration.total_seconds()))
return True
task.failure_epilogue()
return False
if self.verbose:
- print '%ds..'%period_seconds,
+ print '{}s..'.format(period_seconds),
time.sleep(period_seconds)
# in case we're empty
return True
class CompleterTask:
def run (self, silent):
- result=self.actual_run()
+ result = self.actual_run()
if silent:
print '+' if result else '.',
sys.stdout.flush()
else:
- print self.message(),"->","OK" if result else "KO"
+ print self.message(), "->", "OK" if result else "KO"
return result
- def message (self): return "you-need-to-redefine-message"
- def failure_epilogue (self): print "you-need-to-redefine-failure_epilogue"
+
+ def message (self):
+ return "you-need-to-redefine-message"
+
+ def failure_epilogue (self):
+ print "you-need-to-redefine-failure_epilogue"
# random result
class TaskTest (CompleterTask):
- counter=1
+ counter = 1
def __init__ (self,max):
import random
- self.counter=TaskTest.counter
- TaskTest.counter+=1
- self.delay=random.random()*max
- self.fire=datetime.now()+timedelta(seconds=self.delay)
+ self.counter = TaskTest.counter
+ TaskTest.counter += 1
+ self.delay = random.random()*max
+ self.fire = datetime.now() + timedelta(seconds=self.delay)
def actual_run(self):
- return datetime.now()>=self.fire
+ return datetime.now() >= self.fire
def message (self):
- return "Task %d - delay was %d s"%(self.counter,self.delay)
+ return "Task {} - delay was {}s".format(self.counter, self.delay)
- def failure_epilogue (self): print "BOTTOM LINE: FAILURE with task (%s)"%self.counter
+ def failure_epilogue (self):
+ print "BOTTOM LINE: FAILURE with task ({})".format(self.counter)
def main ():
import sys
- if len(sys.argv)!=6:
+ if len(sys.argv) != 6:
print "Usage: <command> number_tasks max_random timeout_s silent_s period_s"
sys.exit(1)
- [number,max,timeout,silent,period]= [ int(x) for x in sys.argv[1:]]
+ [number, max, timeout, silent, period] = [ int(x) for x in sys.argv[1:]]
tasks = [ TaskTest(max) for i in range(number)]
- success=Completer(tasks,verbose=True).run(timedelta(seconds=timeout),
- timedelta(seconds=silent),
- timedelta(seconds=period))
+ success = Completer(tasks,verbose=True).run(timedelta(seconds=timeout),
+ timedelta(seconds=silent),
+ timedelta(seconds=period))
print "OVERALL",success
if __name__ == '__main__':
# the experimental lxc-based build box
def build_lxc_boxes_spec (self):
-# velvet needs attention
-# return [ 'liquid', 'reed', 'velvet' ]
-# reed out of the pool, liquid only used for the last f18 build
+# liquid only used for the last f18 build
return [ 'buzzcocks', 'liquid' ]
# the lxc-capable box for PLCs
# vplc01 to 40
def vplc_ips (self):
- return [ ( 'vplc%02d'%i, # DNS name
+ return [ ( 'vplc{:02d}'.format(i), # DNS name
'unused') # MAC address
for i in range(1,41) ]
# the nodes pool has a MAC address as user-data (3rd elt in tuple)
def vnode_ips (self):
- return [ ( 'vnode%02d'%i, # DNS name
- '02:34:56:00:00:%02d'%i) # MAC address
+ return [ ( 'vnode{:02d}'.format(i), # DNS name
+ '02:34:56:00:00:{:02d}'.format(i)) # MAC address
for i in range(1,21) ]
# local network settings
# turns out the config has an ip but no name..
def __init__ (self, auth, hostname=None, ip=None, verbose=False):
- self.auth=auth
+ self.auth = auth
if not hostname and not ip:
- raise Exception,"PlcapiUrlScanner needs _some_ input"
+ raise Exception, "PlcapiUrlScanner needs _some_ input"
if hostname:
if not ip:
- try: ip=socket.gethostbyname(hostname)
+ try:
+ ip = socket.gethostbyname(hostname)
except:
- hostname="%s.pl.sophia.inria.fr"%hostname
- ip=socket.gethostbyname(hostname)
+ hostname = "{}.pl.sophia.inria.fr".format(hostname)
+ ip = socket.gethostbyname(hostname)
else:
- if not hostname: hostname=socket.gethostbyaddr(ip)[0]
- self.hostname=hostname
- self.ip=ip
- self.verbose=verbose
+ hostname=socket.gethostbyaddr(ip)[0]
+ self.hostname = hostname
+ self.ip = ip
+ self.verbose = verbose
- def try_url (self,url):
+ def try_url (self, url):
try:
xmlrpclib.ServerProxy (url, verbose=self.verbose, allow_none=True).GetNodes(self.auth)
- print 'YES',url
+ print 'YES', url
return True
except xmlrpclib.ProtocolError as e:
- print '... (http error %s)'%e.errcode,url
+ print '... (http error {})'.format(e.errcode), url
return False
except Exception as e:
- print '---',type(e).__name__,url,e
- if self.verbose: traceback.print_exc()
+ print '---', type(e).__name__, url, e
+ if self.verbose:
+ traceback.print_exc()
return False
def try_url_required (self, url, required):
- result=self.try_url(url)
- if required and not result: return False
- else: return True
+ result = self.try_url(url)
+ if required and not result:
+ return False
+ else:
+ return True
def scan(self):
- overall=True
+ overall = True
for protocol in ['http','https']:
for dest in [ self.hostname, self.ip ]:
- for port in [ '',':80',':443']:
- for path in ['PLCAPI','PLCAPI/']:
- if protocol=='http' and port==':443': continue
- if protocol=='https' and port==':80': continue
+ for port in [ '', ':80', ':443']:
+ for path in ['PLCAPI', 'PLCAPI/']:
+ if protocol=='http' and port==':443':
+ continue
+ if protocol=='https' and port==':80':
+ continue
required = (protocol=='https') and (path=='PLCAPI/')
- url="%s://%s%s/%s"%(protocol,dest,port,path)
- if not self.try_url_required (url,required): overall=False
+ url="{}://{}{}/{}".format(protocol, dest, port, path)
+ if not self.try_url_required (url,required):
+ overall=False
return overall
from optparse import OptionParser
auth={'AuthMethod':'password','Username':'root@test.onelab.eu','AuthString':'test++'}
def main ():
- usage="%prog hostname"
- parser=OptionParser()
- parser.add_option("-v","--verbose",dest='verbose',action='store_true',default=False)
- (options,args)=parser.parse_args()
- if len(args)!=1:
+ usage = "%prog hostname"
+ parser = OptionParser()
+ parser.add_option("-v", "--verbose", dest='verbose', action='store_true', default=False)
+ (options,args) = parser.parse_args()
+ if len(args) != 1:
parser.print_help()
sys.exit(1)
- hostname=args[0]
- success=PlcapiUrlScanner (auth=auth, hostname=hostname,verbose=options.verbose).scan()
+ hostname = args[0]
+ success = PlcapiUrlScanner (auth=auth, hostname=hostname,verbose=options.verbose).scan()
sys.exit(0 if success else -1)
if __name__ == '__main__':
# used to map on several slices...
overall=True
slice_method = TestSliceSfa.__dict__[method.__name__]
- slice_spec=self.auth_sfa_spec['slice_spec']
- test_slice_sfa = TestSliceSfa(self,slice_spec)
+ slice_spec = self.auth_sfa_spec['slice_spec']
+ test_slice_sfa = TestSliceSfa(self, slice_spec)
if not slice_method(test_slice_sfa, *args, **kwds): overall=False
return overall
# restore the doc text
if not user_method(test_user_sfa, *args, **kwds): overall=False
return overall
# restore the doc text
- actual.__doc__=TestUserSfa.__dict__[method.__name__].__doc__
+ actual.__doc__ = TestUserSfa.__dict__[method.__name__].__doc__
return actual
class TestAuthSfa:
- def __init__ (self,test_plc,auth_sfa_spec):
- self.test_plc=test_plc
- self.auth_sfa_spec=auth_sfa_spec
- self.test_ssh=TestSsh(self.test_plc.test_ssh)
+ def __init__ (self, test_plc, auth_sfa_spec):
+ self.test_plc = test_plc
+ self.auth_sfa_spec = auth_sfa_spec
+ self.test_ssh = TestSsh(self.test_plc.test_ssh)
# # shortcuts
- self.login_base=self.auth_sfa_spec['login_base']
-# self.piuser=self.auth_sfa_spec['piuser']
-# self.regularuser=self.auth_sfa_spec['regularuser']
+ self.login_base = self.auth_sfa_spec['login_base']
+# self.piuser = self.auth_sfa_spec['piuser']
+# self.regularuser = self.auth_sfa_spec['regularuser']
def rspec_style (self): return self.auth_sfa_spec['rspec_style']
def sfi_path (self):
- return "/root/sfi/%s"%(self.rspec_style())
+ return "/root/sfi/{}".format(self.rspec_style())
# the hrn for the root authority
def root_hrn (self):
# the hrn for the auth/site
def auth_hrn (self):
- return "%s.%s"%(self.root_hrn(),self.login_base)
+ return "{}.{}".format(self.root_hrn(), self.login_base)
# something in this site (users typically); for use by classes for subobjects
def obj_hrn (self, name):
- return "%s.%s"%(self.auth_hrn(),name)
+ return "{}.{}".format(self.auth_hrn(), name)
def regular_user_hrn(self):
return self.obj_hrn(self.auth_sfa_spec['user_spec']['name'])
fileconf=open(file_name,'w')
fileconf.write (contents)
fileconf.close()
- utils.header ("(Over)wrote %s"%file_name)
+ utils.header ("(Over)wrote {}".format(file_name))
#
file_name=dir_name + os.sep + 'sfi_config'
fileconf=open(file_name,'w')
SFI_AUTH=self.auth_hrn()
- fileconf.write ("SFI_AUTH='%s'"%SFI_AUTH)
+ fileconf.write ("SFI_AUTH='{}'".format(SFI_AUTH))
fileconf.write('\n')
# default is to run as a PI
SFI_USER=self.obj_hrn(self.auth_sfa_spec['pi_spec']['name'])
- fileconf.write ("SFI_USER='%s'"%SFI_USER)
+ fileconf.write("SFI_USER='{}'".format(SFI_USER))
fileconf.write('\n')
- SFI_REGISTRY='http://%s:%s/'%(sfa_spec['settings']['SFA_REGISTRY_HOST'],12345)
- fileconf.write ("SFI_REGISTRY='%s'"%SFI_REGISTRY)
+ SFI_REGISTRY='http://{}:{}/'.format(sfa_spec['settings']['SFA_REGISTRY_HOST'], 12345)
+ fileconf.write("SFI_REGISTRY='{}'".format(SFI_REGISTRY))
fileconf.write('\n')
- SFI_SM='http://%s:%s/'%(sfa_spec['settings']['SFA_SM_HOST'],sfa_spec['sfi-connects-to-port'])
- fileconf.write ("SFI_SM='%s'"%SFI_SM)
+ SFI_SM='http://{}:{}/'.format(sfa_spec['settings']['SFA_SM_HOST'], sfa_spec['sfi-connects-to-port'])
+ fileconf.write("SFI_SM='{}'".format(SFI_SM))
fileconf.write('\n')
fileconf.close()
- utils.header ("(Over)wrote %s"%file_name)
+ utils.header ("(Over)wrote {}".format(file_name))
# using sfaadmin to bootstrap
def sfa_register_site (self, options):
"bootstrap a site using sfaadmin"
- command="sfaadmin reg register -t authority -x %s"%self.auth_hrn()
+ command="sfaadmin reg register -t authority -x {}".format(self.auth_hrn())
return self.test_plc.run_in_guest(command)==0
def sfa_register_pi (self, options):
pi_mail=pi_spec['email']
# as installed by sfi_config
pi_key=os.path.join(self.sfi_path(),self.obj_hrn(pi_spec['name']+'.pub'))
- command="sfaadmin reg register -t user -x %s --email %s --key %s"%(pi_hrn,pi_mail,pi_key)
+ command="sfaadmin reg register -t user -x {} --email {} --key {}".format(pi_hrn, pi_mail, pi_key)
if self.test_plc.run_in_guest(command)!=0: return False
- command="sfaadmin reg update -t authority -x %s --pi %s"%(self.auth_hrn(),pi_hrn)
- return self.test_plc.run_in_guest(command)==0
+ command="sfaadmin reg update -t authority -x {} --pi {}".format(self.auth_hrn(), pi_hrn)
+ return self.test_plc.run_in_guest(command) == 0
# run as pi
def sfi_pi (self, command):
pi_name=self.auth_sfa_spec['pi_spec']['name']
- return "sfi -d %s -u %s %s"%(self.sfi_path(),self.obj_hrn(pi_name), command,)
+ return "sfi -d {} -u {} {}".format(self.sfi_path(), self.obj_hrn(pi_name), command)
# the sfi command line option to run as a regular user
def sfi_user (self, command):
user_name=self.auth_sfa_spec['user_spec']['name']
- return "sfi -d %s -u %s %s"%(self.sfi_path(),self.obj_hrn(user_name), command,)
+ return "sfi -d {} -u {} {}".format(self.sfi_path(), self.obj_hrn(user_name), command)
# user management
@user_sfa_mapper
def sfa_remove_user_from_slice (self, options):
"remove regular user from slice"
- command="update -t slice -x %s -r none"%(self.slice_hrn())
+ command="update -t slice -x {} -r none".format(self.slice_hrn())
# xxx should check result other than visually
return self.test_plc.run_in_guest(self.sfi_pi(command))==0
def sfa_insert_user_in_slice (self, options):
"defines regular user as unique user in slice"
- command="update -t slice -x %s -r %s"%(self.slice_hrn(),self.regular_user_hrn())
+ command="update -t slice -x {} -r {}".format(self.slice_hrn(), self.regular_user_hrn())
# xxx should check result other than visually
return self.test_plc.run_in_guest(self.sfi_pi(command))==0
def sfi_list (self, options):
"run (as regular user) sfi list (on Registry)"
return \
- self.test_plc.run_in_guest(self.sfi_user("list -r %s"%self.root_hrn()))==0 and \
- self.test_plc.run_in_guest(self.sfi_user("list %s"%(self.auth_hrn())))==0
+ self.test_plc.run_in_guest(self.sfi_user("list -r {}".format(self.root_hrn()))) == 0 and \
+ self.test_plc.run_in_guest(self.sfi_user("list {}".format(self.auth_hrn()))) == 0
def sfi_show_site (self, options):
"run (as regular user) sfi show (on Registry)"
return \
- self.test_plc.run_in_guest(self.sfi_user("show %s"%(self.auth_hrn())))==0
+ self.test_plc.run_in_guest(self.sfi_user("show {}".format(self.auth_hrn()))) == 0
def sfi_show_slice (self, options):
"run (as PI) sfi show -n <slice> (on Registry)"
return \
- self.test_plc.run_in_guest(self.sfi_pi("show -n %s"%self.slice_hrn()))==0
+ self.test_plc.run_in_guest(self.sfi_pi("show -n {}".format(self.slice_hrn()))) == 0
# checks if self.regular_user is found in registry's reg-researchers
def sfi_show_slice_researchers (self, options):
"run (as PI) sfi show <slice> -k researcher -k reg-researchers (on Registry)"
return \
- self.test_plc.run_in_guest(self.sfi_pi("show %s -k researcher -k reg-researchers"%self.slice_hrn()))==0
+ self.test_plc.run_in_guest(self.sfi_pi("show {} -k researcher -k reg-researchers".format(self.slice_hrn()))) == 0
# those are step names exposed as methods of TestPlc, hence the _sfa
return self.test_ssh.is_local()
def run_in_buildname (self,command,background=False, dry_run=False):
- message="On %s: running %s"%(self.hostname(),command)
+ message="On {}: running {}".format(self.hostname(), command)
if background: message += " &"
utils.header(message)
return self.test_ssh.run_in_buildname (command,background, dry_run)
# we need at least one nodename, as template-qemu is not synced on remote testboxes
def qemu_kill_all(self,nodedir):
- self.run_in_buildname("%s/qemu-kill-node"%nodedir)
+ self.run_in_buildname("{}/qemu-kill-node".format(nodedir))
return True
def qemu_list_all(self):
class TestKey:
- def __init__ (self,test_plc,key_spec):
- self.test_plc=test_plc
- self.key_spec=key_spec
- self.test_ssh=TestSsh(self.test_plc.test_ssh)
+ def __init__ (self, test_plc, key_spec):
+ self.test_plc = test_plc
+ self.key_spec = key_spec
+ self.test_ssh = TestSsh(self.test_plc.test_ssh)
def name(self):
return self.key_spec['key_name']
def publicpath(self):
- return "keys/%s.pub"%(self.name())
+ return "keys/{}.pub".format(self.name())
def privatepath(self):
- return "keys/%s.rsa"%(self.name())
+ return "keys/{}.rsa".format(self.name())
-#Not tested yet, don't know if needed
-# def store_remote_key(self,hostname):
-# pub=self.publicpath()
-# priv=self.privatepath()
-# utils.header("Storing key %s in %s into %s "%(self.name(),pub,hostname))
-# dir=os.path.dirname(pub)
-# self.test_ssh.run("mkdir %s"%dir)
-# self.test_ssh.run("cat %s >> %s"%(self.key_spec['key_fields']['key'],pub))
-# self.test_ssh.run("cat %s >> %s"%(self.key_spec['private'],priv))
-# self.test_ssh.run("chmod %s 0400"%priv)
-# self.test_ssh.run("chmod %s 0444"%pub)
-
def store_key(self):
- pub=self.publicpath()
- priv=self.privatepath()
- utils.header("Storing key %s in %s"%(self.name(),pub))
- dir=os.path.dirname(pub)
+ pub = self.publicpath()
+ priv = self.privatepath()
+ utils.header("Storing key {} in {}".format(self.name(), pub))
+ dir = os.path.dirname(pub)
if not os.path.isdir(dir):
os.mkdir(dir)
- f=open(pub,"w")
- f.write(self.key_spec['key_fields']['key'])
- f.close()
- f=open(priv,"w")
- f.write(self.key_spec['private'])
- f.close()
+ with open(pub,"w") as f:
+ f.write(self.key_spec['key_fields']['key'])
+ with open(priv,"w") as f:
+ f.write(self.key_spec['private'])
os.chmod(priv,0400)
os.chmod(pub,0444)
try:
self.substeps = sequences[self.internal()]
except Exception,e:
- print "macro step %s not found in macros.py (%s) - exiting" % (self.display(),e)
+ print "macro step {} not found in macros.py ({}) - exiting".format(self.display(),e)
raise
def print_doc (self, level=0):
print self.steps_message
else:
# steps mentioned on the command line
- if self.options.args:
- scopes = [("Argument steps",self.options.args)]
+ if self.options.steps:
+ scopes = [("Argument steps",self.options.steps)]
else:
scopes = [("Default steps", TestPlc.default_steps)]
if self.options.all_steps:
arch-rpms-url defaults to the last value used, as stored in arg-arch-rpms-url,
no default
config defaults to the last value used, as stored in arg-config,
- or %r
-ips_vnode, ips_vplc and ips_qemu defaults to the last value used, as stored in arg-ips-{bplc,vplc,bnode,vnode},
+ or {}
+ips_vnode, ips_vplc and ips_qemu defaults to the last value used,
+ as stored in arg-ips-{{bplc,vplc,bnode,vnode}},
default is to use IP scanning
steps refer to a method in TestPlc or to a step_* module
run with -l to see a list of available steps
===
-"""%(TestMain.default_config)
+""".format(TestMain.default_config)
parser = ArgumentParser(usage = usage)
parser.add_argument("-u", "--url", action="store", dest="arch_rpms_url",
parsed = [x.strip() for x in parsed]
else: # strings and booleans
if len(parsed) != 1:
- print "%s - error when parsing %s" % (sys.argv[1],path)
+ print "{} - error when parsing {}".format(sys.argv[1], path)
sys.exit(1)
parsed = parsed[0].strip()
if is_bool:
setattr(self.options, recname, default)
else:
print "Cannot determine", recname
- print "Run %s --help for help" % sys.argv[0]
+ print "Run {} --help for help".format(sys.argv[0])
sys.exit(1)
# save for next run
else: # strings and booleans - just call str()
fsave.write(str(getattr(self.options, recname)) + "\n")
fsave.close()
-# utils.header('Saved %s into %s'%(recname,filename))
+# utils.header('Saved {} into {}'.format(recname, filename))
# lists need be reversed
# I suspect this is useful for the various pools but for config, it's painful
getattr(self.options, recname).reverse()
if self.options.verbose:
- utils.header('* Using %s = %s' % (recname, getattr(self.options, recname)))
+ utils.header('* Using {} = {}'.format(recname, getattr(self.options, recname)))
# hack : if sfa is not among the published rpms, skip these tests
TestPlc.check_whether_build_has_sfa(self.options.arch_rpms_url)
all_plc_specs = m.config(all_plc_specs, self.options)
except :
traceback.print_exc()
- print 'Cannot load config %s -- ignored' % modulename
+ print 'Cannot load config {} -- ignored'.format(modulename)
raise
# provision on local substrate
# remember substrate IP address(es) for next run
ips_bplc_file = open('arg-ips-bplc', 'w')
for plc_spec in all_plc_specs:
- ips_bplc_file.write("%s\n" % plc_spec['host_box'])
+ ips_bplc_file.write("{}\n".format(plc_spec['host_box']))
ips_bplc_file.close()
ips_vplc_file = open('arg-ips-vplc', 'w')
for plc_spec in all_plc_specs:
- ips_vplc_file.write("%s\n" % plc_spec['settings']['PLC_API_HOST'])
+ ips_vplc_file.write("{}\n".format(plc_spec['settings']['PLC_API_HOST']))
ips_vplc_file.close()
# ditto for nodes
ips_bnode_file = open('arg-ips-bnode', 'w')
for plc_spec in all_plc_specs:
for site_spec in plc_spec['sites']:
for node_spec in site_spec['nodes']:
- ips_bnode_file.write("%s\n" % node_spec['host_box'])
+ ips_bnode_file.write("{}\n".format(node_spec['host_box']))
ips_bnode_file.close()
ips_vnode_file = open('arg-ips-vnode','w')
for plc_spec in all_plc_specs:
for node_spec in site_spec['nodes']:
# back to normal (unqualified) form
stripped = node_spec['node_fields']['hostname'].split('.')[0]
- ips_vnode_file.write("%s\n" % stripped)
+ ips_vnode_file.write("{}\n".format(stripped))
ips_vnode_file.close()
# build a TestPlc object from the result, passing options
cross = True
all_step_infos.append ( (substep, method, force, cross, qualifier, ) )
except :
- utils.header("********** FAILED step %s (NOT FOUND) -- won't be run" % step)
+ utils.header("********** FAILED step {} (NOT FOUND) -- won't be run".format(step))
traceback.print_exc()
overall_result = 'FAILURE'
# do all steps on all plcs
TIME_FORMAT = "%H-%M-%S"
- TRACE_FORMAT = "TRACE: %(plc_counter)d %(begin)s->%(seconds)ss=%(duration)s " + \
- "status=%(status)s step=%(stepname)s plc=%(plcname)s force=%(force)s\n"
+ TRACE_FORMAT = "TRACE: {plc_counter:d} {begin}->{seconds}s={duration}s " + \
+ "status={status} step={stepname} plc={plcname} force={force}\n"
for stepname, method, force, cross, qualifier in all_step_infos:
plc_counter = 0
for spec, plc_obj in all_plcs:
if self.options.interactive:
prompting = True
while prompting:
- msg="%d Run step %s on %s [r](un)/d(ry_run)/p(roceed)/s(kip)/q(uit) ? " % \
- (plc_counter,stepname,plcname)
+ msg="{:d} Run step {} on {} [r](un)/d(ry_run)/p(roceed)/s(kip)/q(uit) ? "\
+ .format(plc_counter, stepname, plcname)
answer = raw_input(msg).strip().lower() or "r"
answer = answer[0]
if answer in ['s','n']: # skip/no/next
- print '%s on %s skipped' % (stepname, plcname)
+ print '{} on {} skipped'.format(stepname, plcname)
prompting = False
skip_step = True
elif answer in ['q','b']: # quit/bye
try:
force_msg = ""
if force and spec['failed_step']:
- force_msg=" (forced after %s has failed)" % spec['failed_step']
- utils.header("********** %d RUNNING step %s%s on plc %s" % \
- (plc_counter, stepname, force_msg, plcname))
+ force_msg=" (forced after {} has failed)".format(spec['failed_step'])
+ utils.header("********** {:d} RUNNING step {}{} on plc {}"\
+ .format(plc_counter, stepname, force_msg, plcname))
if not cross:
step_result = method(plc_obj)
else:
# do not overwrite if FAILURE
if overall_result == 'SUCCESS':
overall_result = 'IGNORED'
- utils.header('********** %d IGNORED (%s) step %s on %s' % \
- (plc_counter, msg, stepname, plcname))
- status="%s[I]" % msg
+ utils.header('********** {} IGNORED ({}) step {} on {}'\
+ .format(plc_counter, msg, stepname, plcname))
+ status="{}[I]".format(msg)
elif step_result:
- utils.header('********** %d SUCCESSFUL step %s on %s' % \
- (plc_counter, stepname, plcname))
+ utils.header('********** {:d} SUCCESSFUL step {} on {}'\
+ .format(plc_counter, stepname, plcname))
status = "OK"
else:
overall_result = 'FAILURE'
spec['failed_step'] = stepname
- utils.header('********** %d FAILED step %s on %s (discarded from further steps)' % \
- (plc_counter, stepname, plcname))
+ utils.header('********** {:d} FAILED step {} on {} (discarded from further steps)'\
+ .format(plc_counter, stepname, plcname))
status = "KO"
except:
overall_result = 'FAILURE'
spec['failed_step'] = stepname
traceback.print_exc()
- utils.header ('********** %d FAILED (exception) step %s on %s (discarded from further steps)' % \
- (plc_counter, stepname, plcname))
+ utils.header ('********** {} FAILED (exception) step {} on {} (discarded from further steps)'\
+ .format(plc_counter, stepname, plcname))
status = "KO"
# do not run, just display it's skipped
else:
- why = "has failed %s" % spec['failed_step']
- utils.header("********** %d SKIPPED Step %s on %s (%s)" % \
- (plc_counter, stepname, plcname, why))
+ why = "has failed {}".format(spec['failed_step'])
+ utils.header("********** {} SKIPPED Step {} on {} ({})"\
+ .format(plc_counter, stepname, plcname, why))
status = "UNDEF"
if not self.options.dry_run:
delay = datetime.now()-beg_time
seconds = int(delay.total_seconds())
duration = str(delay)
# always do this on stdout
- print TRACE_FORMAT % locals()
+ print TRACE_FORMAT.format(**locals())
# duplicate on trace_file if provided
if self.options.trace_file:
- trace.write(TRACE_FORMAT % locals())
+ trace.write(TRACE_FORMAT.format(**locals()))
trace.flush()
if self.options.trace_file and not self.options.dry_run:
class TestMapper:
- def __init__ (self,plcs,options):
- self.plcs=plcs
- self.options=options
+ def __init__(self,plcs,options):
+ self.plcs = plcs
+ self.options = options
@staticmethod
- def plc_name (plc):
+ def plc_name(plc):
return plc['name']
@staticmethod
- def node_name (node):
+ def node_name(node):
return node['name']
- def node_names (self):
- result=[]
+ def node_names(self):
+ result = []
for plc in self.plcs:
for site in plc['sites']:
for node in site['nodes']:
result.append(node['name'])
return result
- def apply_first_map (self, type, name, obj, maplist):
+ def apply_first_map(self, type, name, obj, maplist):
for (map_pattern,rename_dict) in maplist:
if utils.match (name,map_pattern):
if self.options.verbose:
- utils.header("TestMapper/%s : applying rules '%s' on %s"%(type,map_pattern,name))
+ utils.header("TestMapper/{} : applying rules '{}' on {}"\
+ .format(type, map_pattern, name))
for (k,v) in rename_dict.iteritems():
# apply : separator
- path=k.split(':')
+ path = k.split(':')
# step down but last step in path
- o=obj
+ o = obj
for step in path[:-1]:
if not o.has_key(step):
- o[step]={}
+ o[step] = {}
if self.options.verbose:
- utils.header ("WARNING : created step %s in path %s on %s %s"%(
- step,path,type,name))
- o=o[step]
+ utils.header ("WARNING : created step {} in path {} on {} {}"\
+ .format(step,path,type,name))
+ o = o[step]
# last step is the one for side-effect
- step=path[-1]
+ step = path[-1]
if self.options.verbose:
if not o.has_key(step):
- utils.header ("WARNING : inserting key %s for path %s on %s %s"%(
- step,path,type,name))
+ utils.header ("WARNING : inserting key {} for path {} on {} {}"\
+ .format(step, path, type, name))
# apply formatting if '%s' found in the value
if v is None:
if self.options.verbose: print "TestMapper WARNING - None value - ignored, key=",k
continue
- if v.find('%s')>=0:
- v=v%obj[k]
+ if v.find('%s') >= 0:
+ v = v % obj[k]
if self.options.verbose:
- print("TestMapper, rewriting %s: %s into %s"%(name,k,v))
- o[step]=v
+ print("TestMapper, rewriting {}: {} into {}"\
+ .format(name, k, v))
+ o[step] = v
# only apply first rule
return
- def map (self,mapper):
+ def map(self, mapper):
plc_maps = mapper.get('plc',[])
node_maps = mapper.get('node',[])
for plc in self.plcs:
- name=TestMapper.plc_name(plc)
- self.apply_first_map ('plc',name,plc,plc_maps)
+ name = TestMapper.plc_name(plc)
+ self.apply_first_map('plc', name, plc, plc_maps)
for site in plc['sites']:
for node in site['nodes']:
nodename = TestMapper.node_name(node)
- self.apply_first_map('node',nodename,node,node_maps)
+ self.apply_first_map('node', nodename, node, node_maps)
return self.plcs
from Completer import CompleterTask
class CompleterTaskNodeSsh (CompleterTask):
- def __init__ (self, hostname, qemuname, local_key, command=None, boot_state="boot", expected=True, dry_run=False):
- self.hostname=hostname
- self.qemuname=qemuname
- self.boot_state=boot_state
- self.local_key=local_key
- self.command=command if command is not None else "hostname;uname -a"
- self.expected=expected
+ def __init__ (self, hostname, qemuname, local_key, command=None,
+ boot_state="boot", expected=True, dry_run=False):
+ self.hostname = hostname
+ self.qemuname = qemuname
+ self.boot_state = boot_state
+ self.local_key = local_key
+ self.command = command if command is not None else "hostname;uname -a"
+ self.expected = expected
self.dry_run = dry_run
- self.test_ssh = TestSsh (self.hostname,key=self.local_key)
+ self.test_ssh = TestSsh (self.hostname, key=self.local_key)
def run (self, silent):
command = self.test_ssh.actual_command(self.command)
retcod = utils.system (command, silent=silent, dry_run=self.dry_run)
- if self.expected: return retcod == 0
- else: return retcod != 0
+ if self.expected:
+ return retcod == 0
+ else:
+ return retcod != 0
def failure_epilogue (self):
- print "Cannot reach %s in %s mode"%(self.hostname, self.boot_state)
+ print "Cannot reach {} in {} mode".format(self.hostname, self.boot_state)
class TestNode:
- def __init__ (self,test_plc,test_site,node_spec):
- self.test_plc=test_plc
- self.test_site=test_site
- self.node_spec=node_spec
+ def __init__ (self, test_plc, test_site, node_spec):
+ self.test_plc = test_plc
+ self.test_site = test_site
+ self.node_spec = node_spec
def name(self):
return self.node_spec['node_fields']['hostname']
def dry_run (self):
return self.test_plc.options.dry_run
+
@staticmethod
def is_qemu_model (model):
return model.find("qemu") >= 0
def nodedir (self):
if self.is_qemu():
- return "qemu-%s"%self.name()
+ return "qemu-{}".format(self.name())
else:
- return "real-%s"%self.name()
+ return "real-{}".format(self.name())
# this returns a hostname
def host_box (self):
user_spec = self.test_site.locate_user(ownername)
test_user = TestUser(self.test_plc,self.test_site,user_spec)
userauth = test_user.auth()
- utils.header("node %s created by user %s"%(self.name(),test_user.name()))
- rootauth=self.test_plc.auth_root()
- server = self.test_plc.apiserver
- node_id=server.AddNode(userauth,
- self.test_site.site_spec['site_fields']['login_base'],
- self.node_spec['node_fields'])
+ utils.header("node {} created by user {}".format(self.name(), test_user.name()))
+ rootauth = self.test_plc.auth_root()
+ server = self.test_plc.apiserver
+ node_id = server.AddNode(userauth,
+ self.test_site.site_spec['site_fields']['login_base'],
+ self.node_spec['node_fields'])
server.SetNodePlainBootstrapfs(userauth,
self.node_spec['node_fields']['hostname'],
'YES')
server.UpdateNode (userauth, self.name(), self.node_spec['node_fields_nint'])
interface_id = server.AddInterface (userauth, self.name(),self.node_spec['interface_fields_nint'])
server.AddIpAddress (userauth, interface_id, self.node_spec['ipaddress_fields'])
- route_fields=self.node_spec['route_fields']
- route_fields['interface_id']=interface_id
+ route_fields = self.node_spec['route_fields']
+ route_fields['interface_id'] = interface_id
server.AddRoute (userauth, node_id, self.node_spec['route_fields'])
pass
# populate network interfaces - others
for (attribute,value) in interface['settings'].iteritems():
# locate node network
interface = server.GetInterfaces(userauth,{'ip':interface['interface_fields']['ip']})[0]
- interface_id=interface['interface_id']
+ interface_id = interface['interface_id']
# locate or create node network attribute type
try:
interface_tagtype = server.GetTagTypes(userauth,{'name':attribute})[0]
test_user = TestUser(self.test_plc,self.test_site,user_spec)
auth = test_user.auth()
except:
- auth=self.test_plc.auth_root()
+ auth = self.test_plc.auth_root()
self.test_plc.apiserver.DeleteNode(auth,self.name())
# Do most of the stuff locally - will be pushed on host_box - *not* the plc - later if needed
def qemu_local_init(self):
"all nodes : init a clean local directory for holding node-dep stuff like iso image..."
- utils.system("rm -rf %s"%self.nodedir())
- utils.system("mkdir %s"%self.nodedir())
+ utils.system("rm -rf {}".format(self.nodedir()))
+ utils.system("mkdir {}".format(self.nodedir()))
if not self.is_qemu():
return True
- return utils.system("rsync -v -a --exclude .svn template-qemu/ %s/"%self.nodedir())==0
+ return utils.system("rsync -v -a --exclude .svn template-qemu/ {}/"\
+ .format(self.nodedir())) == 0
def bootcd(self):
"all nodes: invoke GetBootMedium and store result locally"
- utils.header("Calling GetBootMedium for %s"%self.name())
+ utils.header("Calling GetBootMedium for {}".format(self.name()))
options = []
if self.is_qemu():
options.append('serial')
options.append('no-hangcheck')
options.append('systemd-debug')
- encoded=self.test_plc.apiserver.GetBootMedium(self.test_plc.auth_root(),
- self.name(), 'node-iso', '', options)
+ encoded = self.test_plc.apiserver.GetBootMedium(self.test_plc.auth_root(),
+ self.name(), 'node-iso', '', options)
if (encoded == ''):
raise Exception, 'GetBootmedium failed'
- filename="%s/%s.iso"%(self.nodedir(),self.name())
- utils.header('Storing boot medium into %s'%filename)
+ filename = "{}/{}.iso".format(self.nodedir(), self.name())
+ utils.header('Storing boot medium into {}'.format(filename))
if self.dry_run():
print "Dry_run: skipped writing of iso image"
return True
if self.dry_run():
print "Dry_run: skipped getting current node state"
return True
- state=self.test_plc.apiserver.GetNodes(self.test_plc.auth_root(), self.name(), ['boot_state'])[0]['boot_state']
+ state = self.test_plc.apiserver.GetNodes(self.test_plc.auth_root(), self.name(), ['boot_state'])[0]['boot_state']
print self.name(),':',state
return True
"all nodes: compute qemu config qemu.conf and store it locally"
if not self.is_qemu():
return
- mac=self.node_spec['interface_fields']['mac']
- hostname=self.node_spec['node_fields']['hostname']
- ip=self.node_spec['interface_fields']['ip']
- auth=self.test_plc.auth_root()
- target_arch=self.test_plc.apiserver.GetPlcRelease(auth)['build']['target-arch']
- conf_filename="%s/qemu.conf"%(self.nodedir())
+ mac = self.node_spec['interface_fields']['mac']
+ hostname = self.node_spec['node_fields']['hostname']
+ ip = self.node_spec['interface_fields']['ip']
+ auth = self.test_plc.auth_root()
+ target_arch = self.test_plc.apiserver.GetPlcRelease(auth)['build']['target-arch']
+ conf_filename = "{}/qemu.conf".format(self.nodedir())
if self.dry_run():
print "dry_run: skipped actual storage of qemu.conf"
return True
- utils.header('Storing qemu config for %s in %s'%(self.name(),conf_filename))
- file=open(conf_filename,'w')
- file.write('MACADDR=%s\n'%mac)
- file.write('NODE_ISO=%s.iso\n'%self.name())
- file.write('HOSTNAME=%s\n'%hostname)
- file.write('IP=%s\n'%ip)
- file.write('TARGET_ARCH=%s\n'%target_arch)
- file.close()
+ utils.header('Storing qemu config for {} in {}'.format(self.name(), conf_filename))
+ with open(conf_filename,'w') as f:
+ file.write('MACADDR={}\n'.format(mac))
+ file.write('NODE_ISO={}.iso\n'.format(self.name()))
+ file.write('HOSTNAME={}\n'.format(hostname))
+ file.write('IP={}\n'.format(ip))
+ file.write('TARGET_ARCH={}\n'.format(target_arch))
return True
def qemu_clean (self):
- utils.header("Cleaning up qemu for host %s on box %s"%(self.name(),self.test_box().hostname()))
- dry_run=self.dry_run()
+ utils.header("Cleaning up qemu for host {} on box {}"\
+ .format(self.name(),self.test_box().hostname()))
+ dry_run = self.dry_run()
self.test_box().rmdir(self.nodedir(), dry_run=dry_run)
return True
# if relevant, push the qemu area onto the host box
if self.test_box().is_local():
return True
- dry_run=self.dry_run()
- utils.header ("Cleaning any former sequel of %s on %s"%(self.name(),self.host_box()))
- utils.header ("Transferring configuration files for node %s onto %s"%(self.name(),self.host_box()))
- return self.test_box().copy(self.nodedir(),recursive=True,dry_run=dry_run)==0
+ dry_run = self.dry_run()
+ utils.header ("Cleaning any former sequel of {} on {}"\
+ .format(self.name(), self.host_box()))
+ utils.header ("Transferring configuration files for node {} onto {}"\
+ .format(self.name(), self.host_box()))
+ return self.test_box().copy(self.nodedir(), recursive=True, dry_run=dry_run) == 0
def qemu_start (self):
"all nodes: start the qemu instance (also runs qemu-bridge-init start)"
- model=self.node_spec['node_fields']['model']
+ model = self.node_spec['node_fields']['model']
#starting the Qemu nodes before
if self.is_qemu():
self.start_qemu()
else:
- utils.header("TestNode.qemu_start : %s model %s taken as real node"%(self.name(),model))
+ utils.header("TestNode.qemu_start : {} model {} taken as real node"\
+ .format(self.name(), model))
return True
def qemu_timestamp (self):
"all nodes: start the qemu instance (also runs qemu-bridge-init start)"
test_box = self.test_box()
- test_box.run_in_buildname("mkdir -p %s"%self.nodedir(), dry_run=self.dry_run())
- now=int(time.time())
- return test_box.run_in_buildname("echo %d > %s/timestamp"%(now,self.nodedir()), dry_run=self.dry_run())==0
+ test_box.run_in_buildname("mkdir -p {}".format(self.nodedir()), dry_run=self.dry_run())
+ now = int(time.time())
+ return test_box.run_in_buildname("echo {:d} > {}/timestamp"\
+ .format(now, self.nodedir()), dry_run=self.dry_run()) == 0
def start_qemu (self):
test_box = self.test_box()
- utils.header("Starting qemu node %s on %s"%(self.name(),test_box.hostname()))
+ utils.header("Starting qemu node {} on {}".format(self.name(), test_box.hostname()))
- test_box.run_in_buildname("%s/qemu-bridge-init start >> %s/log.txt"%(self.nodedir(),self.nodedir()),
+ test_box.run_in_buildname("{}/qemu-bridge-init start >> {}/log.txt"\
+ .format(self.nodedir(), self.nodedir()),
dry_run=self.dry_run())
# kick it off in background, as it would otherwise hang
- test_box.run_in_buildname("%s/qemu-start-node 2>&1 >> %s/log.txt"%(self.nodedir(),self.nodedir()))
+ test_box.run_in_buildname("{}/qemu-start-node 2>&1 >> {}/log.txt"\
+ .format(self.nodedir(), self.nodedir()))
def list_qemu (self):
- utils.header("Listing qemu for host %s on box %s"%(self.name(),self.test_box().hostname()))
- command="%s/qemu-kill-node -l %s"%(self.nodedir(),self.name())
+ utils.header("Listing qemu for host {} on box {}"\
+ .format(self.name(), self.test_box().hostname()))
+ command = "{}/qemu-kill-node -l {}".format(self.nodedir(), self.name())
self.test_box().run_in_buildname(command, dry_run=self.dry_run())
return True
#Prepare the log file before killing the nodes
test_box = self.test_box()
# kill the right processes
- utils.header("Stopping qemu for node %s on box %s"%(self.name(),self.test_box().hostname()))
- command="%s/qemu-kill-node %s"%(self.nodedir(),self.name())
+ utils.header("Stopping qemu for node {} on box {}"\
+ .format(self.name(), self.test_box().hostname()))
+ command = "{}/qemu-kill-node {}".format(self.nodedir(),self.name())
self.test_box().run_in_buildname(command, dry_run=self.dry_run())
return True
def gather_qemu_logs (self):
if not self.is_qemu():
return True
- remote_log="%s/log.txt"%self.nodedir()
- local_log="logs/node.qemu.%s.txt"%self.name()
+ remote_log = "{}/log.txt".format(self.nodedir())
+ local_log = "logs/node.qemu.{}.txt".format(self.name())
self.test_box().test_ssh.fetch(remote_log,local_log,dry_run=self.dry_run())
def keys_clear_known_hosts (self):
def create_test_ssh(self):
# get the plc's keys for entering the node
- vservername=self.test_plc.vservername
+ vservername = self.test_plc.vservername
### # assuming we've run testplc.fetch_keys()
-### key = "keys/%(vservername)s.rsa"%locals()
+### key = "keys/{vservername}.rsa".format(**locals())
# fetch_keys doesn't grab the root key anymore
key = "keys/key_admin.rsa"
return TestSsh(self.name(), buildname=self.buildname(), key=key)
def check_hooks (self):
extensions = [ 'py','pl','sh' ]
- path='hooks/node'
- scripts=utils.locate_hooks_scripts ('node '+self.name(), path,extensions)
+ path = 'hooks/node'
+ scripts = utils.locate_hooks_scripts ('node '+self.name(), path,extensions)
overall = True
for script in scripts:
if not self.check_hooks_script (script):
def check_hooks_script (self,local_script):
# push the script on the node's root context
- script_name=os.path.basename(local_script)
- utils.header ("NODE hook %s (%s)"%(script_name,self.name()))
- test_ssh=self.create_test_ssh()
+ script_name = os.path.basename(local_script)
+ utils.header ("NODE hook {} ({})".format(script_name, self.name()))
+ test_ssh = self.create_test_ssh()
test_ssh.copy_home(local_script)
if test_ssh.run("./"+script_name) != 0:
- utils.header ("WARNING: node hooks check script %s FAILED (ignored)"%script_name)
+ utils.header ("WARNING: node hooks check script {} FAILED (ignored)"\
+ .format(script_name))
#return False
return True
else:
- utils.header ("SUCCESS: node hook %s OK"%script_name)
+ utils.header ("SUCCESS: node hook {} OK".format(script_name))
return True
def has_libvirt (self):
- test_ssh=self.create_test_ssh()
- return test_ssh.run ("rpm -q --quiet libvirt-client")==0
+ test_ssh = self.create_test_ssh()
+ return test_ssh.run ("rpm -q --quiet libvirt-client") == 0
- def _check_system_slice (self, slicename,dry_run=False):
- sitename=self.test_plc.plc_spec['settings']['PLC_SLICE_PREFIX']
- vservername="%s_%s"%(sitename,slicename)
- test_ssh=self.create_test_ssh()
+ def _check_system_slice (self, slicename, dry_run=False):
+ sitename = self.test_plc.plc_spec['settings']['PLC_SLICE_PREFIX']
+ vservername = "{}_{}".format(sitename, slicename)
+ test_ssh = self.create_test_ssh()
if self.has_libvirt():
- utils.header("Checking system slice %s using virsh"%slicename)
- return test_ssh.run("virsh --connect lxc:// list | grep -q ' %s '"%vservername,
- dry_run=dry_run)==0
+ utils.header("Checking system slice {} using virsh".format(slicename))
+ return test_ssh.run("virsh --connect lxc:// list | grep -q ' {} '".format(vservername),
+ dry_run = dry_run) == 0
else:
- (retcod,output)=utils.output_of(test_ssh.actual_command("cat /vservers/%s/etc/slicefamily 2> /dev/null")%vservername)
+ (retcod,output) = \
+ utils.output_of(test_ssh.actual_command("cat /vservers/{}/etc/slicefamily 2> /dev/null")\
+ .format(vservername))
# get last line only as ssh pollutes the output
- slicefamily=output.split("\n")[-1]
- utils.header("Found slicefamily '%s'for slice %s"%(slicefamily,slicename))
+ slicefamily = output.split("\n")[-1]
+ utils.header("Found slicefamily '{}'for slice {}".format(slicefamily,slicename))
if retcod != 0:
return False
- utils.header("Checking system slice %s using vserver-stat"%slicename)
- return test_ssh.run("vserver-stat | grep %s"%vservername,dry_run=dry_run)==0
+ utils.header("Checking system slice {} using vserver-stat".format(slicename))
+ return test_ssh.run("vserver-stat | grep {}".format(vservername), dry_run=dry_run) == 0
# step methods must take (self) and return a boolean (options is a member of the class)
def standby(minutes, dry_run):
- utils.header('Entering StandBy for %d mn'%minutes)
+ utils.header('Entering StandBy for {:d} mn'.format(minutes))
if dry_run:
print 'dry_run'
else:
ref_name = method.__name__.replace('_ignore', '').replace('force_', '')
ref_method = TestPlc.__dict__[ref_name]
result = ref_method(self)
- print "Actual (but ignored) result for %(ref_name)s is %(result)s" % locals()
+ print "Actual (but ignored) result for {ref_name} is {result}".format(**locals())
return Ignored(result)
name = method.__name__.replace('_ignore', '').replace('force_', '')
ignoring.__name__ = name
def _has_sfa_cached(rpms_url):
if os.path.isfile(has_sfa_cache_filename):
cached = file(has_sfa_cache_filename).read() == "yes"
- utils.header("build provides SFA (cached):%s" % cached)
+ utils.header("build provides SFA (cached):{}".format(cached))
return cached
# warning, we're now building 'sface' so let's be a bit more picky
# full builds are expected to return with 0 here
utils.header("Checking if build provides SFA package...")
- retcod = os.system("curl --silent %s/ | grep -q sfa-"%rpms_url) == 0
+ retcod = os.system("curl --silent {}/ | grep -q sfa-".format(rpms_url)) == 0
encoded = 'yes' if retcod else 'no'
with open(has_sfa_cache_filename,'w')as out:
out.write(encoded)
self.test_ssh = TestSsh(self.plc_spec['host_box'], self.options.buildname)
self.vserverip = plc_spec['vserverip']
self.vservername = plc_spec['vservername']
- self.url = "https://%s:443/PLCAPI/" % plc_spec['vserverip']
+ self.url = "https://{}:443/PLCAPI/".format(plc_spec['vserverip'])
self.apiserver = TestApiserver(self.url, options.dry_run)
(self.ssh_node_boot_timeout, self.ssh_node_boot_silent) = plc_spec['ssh_node_boot_timers']
(self.ssh_node_debug_timeout, self.ssh_node_debug_silent) = plc_spec['ssh_node_debug_timers']
def name(self):
name = self.plc_spec['name']
- return "%s.%s" % (name,self.vservername)
+ return "{}.{}".format(name,self.vservername)
def hostname(self):
return self.plc_spec['host_box']
def host_to_guest(self, command):
vservername = self.vservername
personality = self.options.personality
- raw = "%(personality)s virsh -c lxc:/// lxc-enter-namespace %(vservername)s" % locals()
+ raw = "{personality} virsh -c lxc:/// lxc-enter-namespace {vservername}".format(**locals())
# f14 still needs some extra help
if self.options.fcdistro == 'f14':
- raw +=" -- /usr/bin/env PATH=/bin:/sbin:/usr/bin:/usr/sbin %(command)s" % locals()
+ raw +=" -- /usr/bin/env PATH=/bin:/sbin:/usr/bin:/usr/sbin {command}".format(**locals())
else:
- raw +=" -- /usr/bin/env %(command)s" % locals()
+ raw +=" -- /usr/bin/env {command}".format(**locals())
return raw
# this /vservers thing is legacy...
def vm_root_in_host(self):
- return "/vservers/%s/" % (self.vservername)
+ return "/vservers/{}/".format(self.vservername)
def vm_timestamp_path(self):
- return "/vservers/%s/%s.timestamp" % (self.vservername,self.vservername)
+ return "/vservers/{}/{}.timestamp".format(self.vservername, self.vservername)
#start/stop the vserver
def start_guest_in_host(self):
- return "virsh -c lxc:/// start %s" % (self.vservername)
+ return "virsh -c lxc:/// start {}".format(self.vservername)
def stop_guest_in_host(self):
- return "virsh -c lxc:/// destroy %s" % (self.vservername)
+ return "virsh -c lxc:/// destroy {}".format(self.vservername)
# xxx quick n dirty
def run_in_guest_piped(self,local,remote):
def yum_check_installed(self, rpms):
if isinstance(rpms, list):
rpms=" ".join(rpms)
- return self.run_in_guest("rpm -q %s"%rpms) == 0
+ return self.run_in_guest("rpm -q {}".format(rpms)) == 0
# does a yum install in the vs, ignore yum retcod, check with rpm
def yum_install(self, rpms):
if isinstance(rpms, list):
rpms=" ".join(rpms)
- self.run_in_guest("yum -y install %s" % rpms)
+ self.run_in_guest("yum -y install {}".format(rpms))
# yum-complete-transaction comes with yum-utils, that is in vtest.pkgs
self.run_in_guest("yum-complete-transaction -y")
return self.yum_check_installed(rpms)
return site
if site['site_fields']['login_base'] == sitename:
return site
- raise Exception,"Cannot locate site %s" % sitename
+ raise Exception,"Cannot locate site {}".format(sitename)
def locate_node(self, nodename):
for site in self.plc_spec['sites']:
for node in site['nodes']:
if node['name'] == nodename:
return site, node
- raise Exception, "Cannot locate node %s" % nodename
+ raise Exception, "Cannot locate node {}".format(nodename)
def locate_hostname(self, hostname):
for site in self.plc_spec['sites']:
for node in site['nodes']:
if node['node_fields']['hostname'] == hostname:
return(site, node)
- raise Exception,"Cannot locate hostname %s" % hostname
+ raise Exception,"Cannot locate hostname {}".format(hostname)
def locate_key(self, key_name):
for key in self.plc_spec['keys']:
if key['key_name'] == key_name:
return key
- raise Exception,"Cannot locate key %s" % key_name
+ raise Exception,"Cannot locate key {}".format(key_name)
def locate_private_key_from_key_names(self, key_names):
# locate the first avail. key
for slice in self.plc_spec['slices']:
if slice['slice_fields']['name'] == slicename:
return slice
- raise Exception,"Cannot locate slice %s" % slicename
+ raise Exception,"Cannot locate slice {}".format(slicename)
def all_sliver_objs(self):
result = []
"print cut'n paste-able stuff to export env variables to your shell"
# guess local domain from hostname
if TestPlc.exported_id > 1:
- print "export GUESTHOSTNAME%d=%s" % (TestPlc.exported_id, self.plc_spec['vservername'])
+ print "export GUESTHOSTNAME{:d}={}".format(TestPlc.exported_id, self.plc_spec['vservername'])
return True
TestPlc.exported_id += 1
domain = socket.gethostname().split('.',1)[1]
- fqdn = "%s.%s" % (self.plc_spec['host_box'], domain)
- print "export BUILD=%s" % self.options.buildname
- print "export PLCHOSTLXC=%s" % fqdn
- print "export GUESTNAME=%s" % self.plc_spec['vservername']
+ fqdn = "{}.{}".format(self.plc_spec['host_box'], domain)
+ print "export BUILD={}".format(self.options.buildname)
+ print "export PLCHOSTLXC={}".format(fqdn)
+ print "export GUESTNAME={}".format(self.plc_spec['vservername'])
vplcname = self.plc_spec['vservername'].split('-')[-1]
- print "export GUESTHOSTNAME=%s.%s"%(vplcname, domain)
+ print "export GUESTHOSTNAME={}.{}".format(vplcname, domain)
# find hostname of first node
hostname, qemubox = self.all_node_infos()[0]
- print "export KVMHOST=%s.%s" % (qemubox, domain)
- print "export NODE=%s" % (hostname)
+ print "export KVMHOST={}.{}".format(qemubox, domain)
+ print "export NODE={}".format(hostname)
return True
# entry point
print '+ ',k,v
def display_node_spec(self, node):
- print "+ node=%s host_box=%s" % (node['name'],node['host_box']),
+ print "+ node={} host_box={}".format(node['name'], node['host_box']),
print "hostname=", node['node_fields']['hostname'],
print "ip=", node['interface_fields']['ip']
if self.options.verbose:
def display_mapping_plc(plc_spec):
print '+ MyPLC',plc_spec['name']
# WARNING this would not be right for lxc-based PLC's - should be harmless though
- print '+\tvserver address = root@%s:/vservers/%s' % (plc_spec['host_box'], plc_spec['vservername'])
- print '+\tIP = %s/%s' % (plc_spec['settings']['PLC_API_HOST'], plc_spec['vserverip'])
+ print '+\tvserver address = root@{}:/vservers/{}'.format(plc_spec['host_box'], plc_spec['vservername'])
+ print '+\tIP = {}/{}'.format(plc_spec['settings']['PLC_API_HOST'], plc_spec['vserverip'])
for site_spec in plc_spec['sites']:
for node_spec in site_spec['nodes']:
TestPlc.display_mapping_node(node_spec)
@staticmethod
def display_mapping_node(node_spec):
- print '+ NODE %s' % (node_spec['name'])
- print '+\tqemu box %s' % node_spec['host_box']
- print '+\thostname=%s' % node_spec['node_fields']['hostname']
+ print '+ NODE {}'.format(node_spec['name'])
+ print '+\tqemu box {}'.format(node_spec['host_box'])
+ print '+\thostname={}'.format(node_spec['node_fields']['hostname'])
# write a timestamp in /vservers/<>.timestamp
# cannot be inside the vserver, that causes vserver .. build to cough
# a first approx. is to store the timestamp close to the VM root like vs does
stamp_path = self.vm_timestamp_path()
stamp_dir = os.path.dirname(stamp_path)
- utils.system(self.test_ssh.actual_command("mkdir -p %s" % stamp_dir))
- return utils.system(self.test_ssh.actual_command("echo %d > %s" % (now, stamp_path))) == 0
+ utils.system(self.test_ssh.actual_command("mkdir -p {}".format(stamp_dir)))
+ return utils.system(self.test_ssh.actual_command("echo {:d} > {}".format(now, stamp_path))) == 0
# this is called inconditionnally at the beginning of the test sequence
# just in case this is a rerun, so if the vm is not running it's fine
def plcvm_delete(self):
"vserver delete the test myplc"
stamp_path = self.vm_timestamp_path()
- self.run_in_host("rm -f %s" % stamp_path)
- self.run_in_host("virsh -c lxc:// destroy %s" % self.vservername)
- self.run_in_host("virsh -c lxc:// undefine %s" % self.vservername)
- self.run_in_host("rm -fr /vservers/%s" % self.vservername)
+ self.run_in_host("rm -f {}".format(stamp_path))
+ self.run_in_host("virsh -c lxc:// destroy {}".format(self.vservername))
+ self.run_in_host("virsh -c lxc:// undefine {}".format(self.vservername))
+ self.run_in_host("rm -fr /vservers/{}".format(self.vservername))
return True
### install
script = "lbuild-initvm.sh"
script_options = ""
# pass the vbuild-nightly options to [lv]test-initvm
- script_options += " -p %s" % self.options.personality
- script_options += " -d %s" % self.options.pldistro
- script_options += " -f %s" % self.options.fcdistro
- script_options += " -r %s" % repo_url
+ script_options += " -p {}".format(self.options.personality)
+ script_options += " -d {}".format(self.options.pldistro)
+ script_options += " -f {}".format(self.options.fcdistro)
+ script_options += " -r {}".format(repo_url)
vserver_name = self.vservername
try:
vserver_hostname = socket.gethostbyaddr(self.vserverip)[0]
- script_options += " -n %s" % vserver_hostname
+ script_options += " -n {}".format(vserver_hostname)
except:
- print "Cannot reverse lookup %s" % self.vserverip
+ print "Cannot reverse lookup {}".format(self.vserverip)
print "This is considered fatal, as this might pollute the test results"
return False
- create_vserver="%(build_dir)s/%(script)s %(script_options)s %(vserver_name)s" % locals()
+ create_vserver="{build_dir}/{script} {script_options} {vserver_name}".format(**locals())
return self.run_in_host(create_vserver) == 0
### install_rpm
elif self.options.personality == "linux64":
arch = "x86_64"
else:
- raise Exception, "Unsupported personality %r"%self.options.personality
- nodefamily = "%s-%s-%s" % (self.options.pldistro, self.options.fcdistro, arch)
+ raise Exception, "Unsupported personality {}".format(self.options.personality)
+ nodefamily = "{}-{}-{}".format(self.options.pldistro, self.options.fcdistro, arch)
pkgs_list=[]
- pkgs_list.append("slicerepo-%s" % nodefamily)
+ pkgs_list.append("slicerepo-{}".format(nodefamily))
pkgs_list.append("myplc")
- pkgs_list.append("noderepo-%s" % nodefamily)
- pkgs_list.append("nodeimage-%s-plain" % nodefamily)
+ pkgs_list.append("noderepo-{}".format(nodefamily))
+ pkgs_list.append("nodeimage-{}-plain".format(nodefamily))
pkgs_string=" ".join(pkgs_list)
return self.yum_install(pkgs_list)
###
def plc_configure(self):
"run plc-config-tty"
- tmpname = '%s.plc-config-tty' % self.name()
+ tmpname = '{}.plc-config-tty'.format(self.name())
with open(tmpname,'w') as fileconf:
for (var,value) in self.plc_spec['settings'].iteritems():
- fileconf.write('e %s\n%s\n'%(var,value))
+ fileconf.write('e {}\n{}\n'.format(var, value))
fileconf.write('w\n')
fileconf.write('q\n')
- utils.system('cat %s' % tmpname)
- self.run_in_guest_piped('cat %s' % tmpname, 'plc-config-tty')
- utils.system('rm %s' % tmpname)
+ utils.system('cat {}'.format(tmpname))
+ self.run_in_guest_piped('cat {}'.format(tmpname), 'plc-config-tty')
+ utils.system('rm {}'.format(tmpname))
return True
# f14 is a bit odd in this respect, although this worked fine in guests up to f18
def start_stop_service(self, service, start_or_stop):
"utility to start/stop a service with the special trick for f14"
if self.options.fcdistro != 'f14':
- return self.run_in_guest("service %s %s" % (service, start_or_stop)) == 0
+ return self.run_in_guest("service {} {}".format(service, start_or_stop)) == 0
else:
# patch /sbin/service so it does not reset environment
self.run_in_guest('sed -i -e \\"s,env -i,env,\\" /sbin/service')
# this is because our own scripts in turn call service
- return self.run_in_guest("SYSTEMCTL_SKIP_REDIRECT=true service %s %s" % \
- (service, start_or_stop)) == 0
+ return self.run_in_guest("SYSTEMCTL_SKIP_REDIRECT=true service {} {}"\
+ .format(service, start_or_stop)) == 0
def plc_start(self):
"service plc start"
overall = True
prefix = 'debug_ssh_key'
for ext in ['pub', 'rsa'] :
- src = "%(vm_root)s/etc/planetlab/%(prefix)s.%(ext)s" % locals()
- dst = "keys/%(vservername)s-debug.%(ext)s" % locals()
+ src = "{vm_root}/etc/planetlab/{prefix}.{ext}".format(**locals())
+ dst = "keys/{vservername}-debug.{ext}".format(**locals())
if self.test_ssh.fetch(src, dst) != 0:
overall=False
return overall
for site_spec in self.plc_spec['sites']:
test_site = TestSite(self,site_spec)
if (action != "add"):
- utils.header("Deleting site %s in %s" % (test_site.name(), self.name()))
+ utils.header("Deleting site {} in {}".format(test_site.name(), self.name()))
test_site.delete_site()
# deleted with the site
#test_site.delete_users()
continue
else:
- utils.header("Creating site %s & users in %s" % (test_site.name(), self.name()))
+ utils.header("Creating site {} & users in {}".format(test_site.name(), self.name()))
test_site.create_site()
test_site.create_users()
return True
for site_spec in self.plc_spec['sites']:
test_site = TestSite(self, site_spec)
if action != "add":
- utils.header("Deleting nodes in site %s" % test_site.name())
+ utils.header("Deleting nodes in site {}".format(test_site.name()))
for node_spec in site_spec['nodes']:
test_node = TestNode(self, test_site, node_spec)
- utils.header("Deleting %s" % test_node.name())
+ utils.header("Deleting {}".format(test_node.name()))
test_node.delete_node()
else:
- utils.header("Creating nodes for site %s in %s" % (test_site.name(), self.name()))
+ utils.header("Creating nodes for site {} in {}".format(test_site.name(), self.name()))
for node_spec in site_spec['nodes']:
- utils.pprint('Creating node %s' % node_spec, node_spec)
+ utils.pprint('Creating node {}'.format(node_spec), node_spec)
test_node = TestNode(self, test_site, node_spec)
test_node.create_node()
return True
lease_addition = self.apiserver.AddLeases(self.auth_root(), nodes, lease_spec['slice'],
lease_spec['t_from'],lease_spec['t_until'])
if lease_addition['errors']:
- utils.header("Cannot create leases, %s"%lease_addition['errors'])
+ utils.header("Cannot create leases, {}".format(lease_addition['errors']))
ok = False
else:
- utils.header('Leases on nodes %r for %s from %d (%s) until %d (%s)' % \
- (nodes, lease_spec['slice'],
- lease_spec['t_from'], TestPlc.timestamp_printable(lease_spec['t_from']),
- lease_spec['t_until'], TestPlc.timestamp_printable(lease_spec['t_until'])))
+ utils.header('Leases on nodes {} for {} from {:d} ({}) until {:d} ({})'\
+ .format(nodes, lease_spec['slice'],
+ lease_spec['t_from'], TestPlc.timestamp_printable(lease_spec['t_from']),
+ lease_spec['t_until'], TestPlc.timestamp_printable(lease_spec['t_until'])))
return ok
def delete_leases(self):
"remove all leases in the myplc side"
lease_ids = [ l['lease_id'] for l in self.apiserver.GetLeases(self.auth_root())]
- utils.header("Cleaning leases %r" % lease_ids)
+ utils.header("Cleaning leases {}".format(lease_ids))
self.apiserver.DeleteLeases(self.auth_root(), lease_ids)
return True
for l in leases:
current = l['t_until'] >= now
if self.options.verbose or current:
- utils.header("%s %s from %s until %s" % \
- (l['hostname'], l['name'],
- TestPlc.timestamp_printable(l['t_from']),
- TestPlc.timestamp_printable(l['t_until'])))
+ utils.header("{} {} from {} until {}"\
+ .format(l['hostname'], l['name'],
+ TestPlc.timestamp_printable(l['t_from']),
+ TestPlc.timestamp_printable(l['t_until'])))
return True
# create nodegroups if needed, and populate
else:
tag_type_id = self.apiserver.AddTagType(auth,
{'tagname' : nodegroupname,
- 'description' : 'for nodegroup %s' % nodegroupname,
+ 'description' : 'for nodegroup {}'.format(nodegroupname),
'category' : 'test'})
print 'located tag (type)', nodegroupname, 'as', tag_type_id
# create nodegroup
except:
return False
def message(self):
- return "CompleterTaskBootState with node %s" % self.hostname
+ return "CompleterTaskBootState with node {}".format(self.hostname)
def failure_epilogue(self):
- print "node %s in state %s - expected %s" %\
- (self.hostname, self.last_boot_state, target_boot_state)
+ print "node {} in state {} - expected {}"\
+ .format(self.hostname, self.last_boot_state, target_boot_state)
timeout = timedelta(minutes=timeout_minutes)
graceout = timedelta(minutes=silent_minutes)
period = timedelta(seconds=period_seconds)
# the nodes that haven't checked yet - start with a full list and shrink over time
- utils.header("checking nodes boot state (expected %s)" % target_boot_state)
+ utils.header("checking nodes boot state (expected {})".format(target_boot_state))
tasks = [ CompleterTaskBootState(self,hostname) \
for (hostname,_) in self.all_node_infos() ]
message = 'check_boot_state={}'.format(target_boot_state)
def __init__(self, hostname):
self.hostname = hostname
def run(self, silent):
- command="ping -c 1 -w 1 %s >& /dev/null" % self.hostname
+ command="ping -c 1 -w 1 {} >& /dev/null".format(self.hostname)
return utils.system(command, silent=silent) == 0
def failure_epilogue(self):
- print "Cannot ping node with name %s" % self.hostname
+ print "Cannot ping node with name {}".format(self.hostname)
timeout = timedelta(seconds = timeout_seconds)
graceout = timeout
period = timedelta(seconds = period_seconds)
if debug:
message = "debug"
completer_message = 'ssh_node_debug'
- local_key = "keys/%(vservername)s-debug.rsa" % locals()
+ local_key = "keys/{vservername}-debug.rsa".format(**locals())
else:
message = "boot"
completer_message = 'ssh_node_boot'
local_key = "keys/key_admin.rsa"
- utils.header("checking ssh access to nodes (expected in %s mode)" % message)
+ utils.header("checking ssh access to nodes (expected in {} mode)".format(message))
node_infos = self.all_node_infos()
tasks = [ CompleterTaskNodeSsh(nodename, qemuname, local_key,
boot_state=message, dry_run=self.options.dry_run) \
def actual_run(self):
return self.test_sliver.check_initscript_stamp(self.stamp)
def message(self):
- return "initscript checker for %s" % self.test_sliver.name()
+ return "initscript checker for {}".format(self.test_sliver.name())
def failure_epilogue(self):
- print "initscript stamp %s not found in sliver %s"%\
- (self.stamp, self.test_sliver.name())
+ print "initscript stamp {} not found in sliver {}"\
+ .format(self.stamp, self.test_sliver.name())
tasks = []
for slice_spec in self.plc_spec['slices']:
def initscripts(self):
"create initscripts with PLCAPI"
for initscript in self.plc_spec['initscripts']:
- utils.pprint('Adding Initscript in plc %s' % self.plc_spec['name'], initscript)
+ utils.pprint('Adding Initscript in plc {}'.format(self.plc_spec['name']), initscript)
self.apiserver.AddInitScript(self.auth_root(), initscript['initscript_fields'])
return True
"delete initscripts with PLCAPI"
for initscript in self.plc_spec['initscripts']:
initscript_name = initscript['initscript_fields']['name']
- print('Attempting to delete %s in plc %s' % (initscript_name, self.plc_spec['name']))
+ print('Attempting to delete {} in plc {}'.format(initscript_name, self.plc_spec['name']))
try:
self.apiserver.DeleteInitScript(self.auth_root(), initscript_name)
print initscript_name, 'deleted'
def _speed_up_slices(self, p, r):
# create the template on the server-side
- template = "%s.nodemanager" % self.name()
+ template = "{}.nodemanager".format(self.name())
with open(template,"w") as template_file:
- template_file.write('OPTIONS="-p %s -r %s -d"\n'%(p,r))
+ template_file.write('OPTIONS="-p {} -r {} -d"\n'.format(p, r))
in_vm = "/var/www/html/PlanetLabConf/nodemanager"
- remote = "%s/%s" % (self.vm_root_in_host(), in_vm)
+ remote = "{}/{}".format(self.vm_root_in_host(), in_vm)
self.test_ssh.copy_abs(template, remote)
# Add a conf file
if not self.apiserver.GetConfFiles(self.auth_root(),
def debug_nodemanager(self):
"sets verbose mode for nodemanager, and speeds up cycle even more (needs speed_up_slices first)"
- template = "%s.nodemanager" % self.name()
+ template = "{}.nodemanager".format(self.name())
with open(template,"w") as template_file:
template_file.write('OPTIONS="-p 10 -r 6 -v -d"\n')
in_vm = "/var/www/html/PlanetLabConf/nodemanager"
- remote = "%s/%s" % (self.vm_root_in_host(), in_vm)
+ remote = "{}/{}".format(self.vm_root_in_host(), in_vm)
self.test_ssh.copy_abs(template, remote)
return True
return plc.locate_sliver_obj(nodename, slicename)
except:
pass
- raise Exception, "Cannot locate sliver %s@%s among all PLCs" % (nodename, slicename)
+ raise Exception, "Cannot locate sliver {}@{} among all PLCs".format(nodename, slicename)
# implement this one as a cross step so that we can take advantage of different nodes
# in multi-plcs mode
def actual_run(self):
return self.test_sliver.check_tcp_ready(port = 9999)
def message(self):
- return "network ready checker for %s" % self.test_sliver.name()
+ return "network ready checker for {}".format(self.test_sliver.name())
def failure_epilogue(self):
- print "could not bind port from sliver %s" % self.test_sliver.name()
+ print "could not bind port from sliver {}".format(self.test_sliver.name())
sliver_specs = {}
tasks = []
# locate the TestSliver instances involved, and cache them in the spec instance
spec['s_sliver'] = self.locate_sliver_obj_cross(spec['server_node'], spec['server_slice'], other_plcs)
spec['c_sliver'] = self.locate_sliver_obj_cross(spec['client_node'], spec['client_slice'], other_plcs)
- message = "Will check TCP between s=%s and c=%s" % \
- (spec['s_sliver'].name(), spec['c_sliver'].name())
+ message = "Will check TCP between s={} and c={}"\
+ .format(spec['s_sliver'].name(), spec['c_sliver'].name())
if 'client_connect' in spec:
- message += " (using %s)" % spec['client_connect']
+ message += " (using {})".format(spec['client_connect'])
utils.header(message)
# we need to check network presence in both slivers, but also
# avoid to insert a sliver several times
def actual_run(self):
return self.test_node._check_system_slice(slicename, dry_run=self.dry_run)
def message(self):
- return "System slice %s @ %s" % (slicename, self.test_node.name())
+ return "System slice {} @ {}".format(slicename, self.test_node.name())
def failure_epilogue(self):
- print "COULD not find system slice %s @ %s"%(slicename, self.test_node.name())
+ print "COULD not find system slice {} @ {}".format(slicename, self.test_node.name())
timeout = timedelta(minutes=timeout_minutes)
silent = timedelta(0)
period = timedelta(seconds=period_seconds)
"runs PLCAPI stress test, that checks Add/Update/Delete on all types - preserves contents"
# install the stress-test in the plc image
location = "/usr/share/plc_api/plcsh_stress_test.py"
- remote = "%s/%s" % (self.vm_root_in_host(), location)
+ remote = "{}/{}".format(self.vm_root_in_host(), location)
self.test_ssh.copy_abs("plcsh_stress_test.py", remote)
command = location
command += " -- --check"
utils.header("********** Regular yum failed - special workaround in place, 2nd chance")
code, cached_rpm_path = \
utils.output_of(self.actual_command_in_guest('find /var/cache/yum -name sfa-client\*.rpm'))
- utils.header("rpm_path=<<%s>>" % rpm_path)
+ utils.header("rpm_path=<<{}>>".format(rpm_path))
# just for checking
- self.run_in_guest("rpm -i %s" % cached_rpm_path)
+ self.run_in_guest("rpm -i {}".format(cached_rpm_path))
return self.yum_check_installed("sfa-client")
def sfa_dbclean(self):
"thoroughly wipes off the SFA database"
return self.run_in_guest("sfaadmin reg nuke") == 0 or \
self.run_in_guest("sfa-nuke.py") == 0 or \
- self.run_in_guest("sfa-nuke-plc.py") == 0
+ self.run_in_guest("sfa-nuke-plc.py") == 0 or \
+ self.run_in_guest("sfaadmin registry nuke") == 0
def sfa_fsclean(self):
"cleanup /etc/sfa/trusted_roots and /var/lib/sfa"
try:
self.apiserver.DeleteSite(self.auth_root(),login_base)
except:
- print "Site %s already absent from PLC db"%login_base
+ print "Site {} already absent from PLC db".format(login_base)
for spec_name in ['pi_spec','user_spec']:
user_spec = auth_sfa_spec[spec_name]
self.apiserver.DeletePerson(self.auth_root(),username)
except:
# this in fact is expected as sites delete their members
- #print "User %s already absent from PLC db"%username
+ #print "User {} already absent from PLC db".format(username)
pass
print "REMEMBER TO RUN sfa_import AGAIN"
###
def confdir(self):
- dirname = "conf.%s" % self.plc_spec['name']
+ dirname = "conf.{}".format(self.plc_spec['name'])
if not os.path.isdir(dirname):
- utils.system("mkdir -p %s" % dirname)
+ utils.system("mkdir -p {}".format(dirname))
if not os.path.isdir(dirname):
- raise Exception,"Cannot create config dir for plc %s" % self.name()
+ raise Exception,"Cannot create config dir for plc {}".format(self.name())
return dirname
def conffile(self, filename):
- return "%s/%s" % (self.confdir(),filename)
+ return "{}/{}".format(self.confdir(), filename)
def confsubdir(self, dirname, clean, dry_run=False):
- subdirname = "%s/%s" % (self.confdir(),dirname)
+ subdirname = "{}/{}".format(self.confdir(), dirname)
if clean:
- utils.system("rm -rf %s" % subdirname)
+ utils.system("rm -rf {}".format(subdirname))
if not os.path.isdir(subdirname):
- utils.system("mkdir -p %s" % subdirname)
+ utils.system("mkdir -p {}".format(subdirname))
if not dry_run and not os.path.isdir(subdirname):
- raise "Cannot create config subdir %s for plc %s" % (dirname,self.name())
+ raise "Cannot create config subdir {} for plc {}".format(dirname, self.name())
return subdirname
def conffile_clean(self, filename):
filename=self.conffile(filename)
- return utils.system("rm -rf %s" % filename)==0
+ return utils.system("rm -rf {}".format(filename))==0
###
def sfa_configure(self):
tmpname = self.conffile("sfa-config-tty")
with open(tmpname,'w') as fileconf:
for (var,value) in self.plc_spec['sfa']['settings'].iteritems():
- fileconf.write('e %s\n%s\n'%(var,value))
+ fileconf.write('e {}\n{}\n'.format(var, value))
fileconf.write('w\n')
fileconf.write('R\n')
fileconf.write('q\n')
- utils.system('cat %s' % tmpname)
- self.run_in_guest_piped('cat %s' % tmpname, 'sfa-config-tty')
+ utils.system('cat {}'.format(tmpname))
+ self.run_in_guest_piped('cat {}'.format(tmpname), 'sfa-config-tty')
return True
def aggregate_xml_line(self):
port = self.plc_spec['sfa']['neighbours-port']
- return '<aggregate addr="%s" hrn="%s" port="%r"/>' % \
- (self.vserverip, self.plc_spec['sfa']['settings']['SFA_REGISTRY_ROOT_AUTH'], port)
+ return '<aggregate addr="{}" hrn="{}" port="{}"/>'\
+ .format(self.vserverip, self.plc_spec['sfa']['settings']['SFA_REGISTRY_ROOT_AUTH'], port)
def registry_xml_line(self):
- return '<registry addr="%s" hrn="%s" port="12345"/>' % \
- (self.vserverip, self.plc_spec['sfa']['settings']['SFA_REGISTRY_ROOT_AUTH'])
+ return '<registry addr="{}" hrn="{}" port="12345"/>'\
+ .format(self.vserverip, self.plc_spec['sfa']['settings']['SFA_REGISTRY_ROOT_AUTH'])
# a cross step that takes all other plcs in argument
return True
agg_fname = self.conffile("agg.xml")
with open(agg_fname,"w") as out:
- out.write("<aggregates>%s</aggregates>\n" % \
- " ".join([ plc.aggregate_xml_line() for plc in other_plcs ]))
- utils.header("(Over)wrote %s" % agg_fname)
+ out.write("<aggregates>{}</aggregates>\n"\
+ .format(" ".join([ plc.aggregate_xml_line() for plc in other_plcs ])))
+ utils.header("(Over)wrote {}".format(agg_fname))
reg_fname=self.conffile("reg.xml")
with open(reg_fname,"w") as out:
- out.write("<registries>%s</registries>\n" % \
- " ".join([ plc.registry_xml_line() for plc in other_plcs ]))
- utils.header("(Over)wrote %s" % reg_fname)
+ out.write("<registries>{}</registries>\n"\
+ .format(" ".join([ plc.registry_xml_line() for plc in other_plcs ])))
+ utils.header("(Over)wrote {}".format(reg_fname))
return self.test_ssh.copy_abs(agg_fname,
- '/%s/etc/sfa/aggregates.xml' % self.vm_root_in_host()) == 0 \
+ '/{}/etc/sfa/aggregates.xml'.format(self.vm_root_in_host())) == 0 \
and self.test_ssh.copy_abs(reg_fname,
- '/%s/etc/sfa/registries.xml' % self.vm_root_in_host()) == 0
+ '/{}/etc/sfa/registries.xml'.format(self.vm_root_in_host())) == 0
def sfa_import(self):
"use sfaadmin to import from plc"
for slice_spec in self.plc_spec['sfa']['auth_sfa_specs']:
test_slice = TestAuthSfa(self, slice_spec)
dir_basename = os.path.basename(test_slice.sfi_path())
- dir_name = self.confsubdir("dot-sfi/%s" % dir_basename,
+ dir_name = self.confsubdir("dot-sfi/{}".format(dir_basename),
clean=True, dry_run=self.options.dry_run)
test_slice.sfi_configure(dir_name)
# push into the remote /root/sfi area
location = test_slice.sfi_path()
- remote = "%s/%s" % (self.vm_root_in_host(), location)
+ remote = "{}/{}".format(self.vm_root_in_host(), location)
self.test_ssh.mkdir(remote, abs=True)
# need to strip last level or remote otherwise we get an extra dir level
self.test_ssh.copy_abs(dir_name, os.path.dirname(remote), recursive=True)
for slice_spec in self.plc_spec['sfa']['auth_sfa_specs']:
test_slice = TestAuthSfa(self, slice_spec)
in_vm = test_slice.sfi_path()
- remote = "%s/%s" % (self.vm_root_in_host(), in_vm)
+ remote = "{}/{}".format(self.vm_root_in_host(), in_vm)
if self.test_ssh.copy_abs(filename, remote) !=0:
overall = False
return overall
"creates random entries in the PLCAPI"
# install the stress-test in the plc image
location = "/usr/share/plc_api/plcsh_stress_test.py"
- remote = "%s/%s" % (self.vm_root_in_host(), location)
+ remote = "{}/{}".format(self.vm_root_in_host(), location)
self.test_ssh.copy_abs("plcsh_stress_test.py", remote)
command = location
command += " -- --preserve --short-names"
def gather_slivers_var_logs(self):
for test_sliver in self.all_sliver_objs():
remote = test_sliver.tar_var_logs()
- utils.system("mkdir -p logs/sliver.var-log.%s" % test_sliver.name())
- command = remote + " | tar -C logs/sliver.var-log.%s -xf -" % test_sliver.name()
+ utils.system("mkdir -p logs/sliver.var-log.{}".format(test_sliver.name()))
+ command = remote + " | tar -C logs/sliver.var-log.{} -xf -".format(test_sliver.name())
utils.system(command)
return True
def gather_var_logs(self):
- utils.system("mkdir -p logs/myplc.var-log.%s" % self.name())
+ utils.system("mkdir -p logs/myplc.var-log.{}".format(self.name()))
to_plc = self.actual_command_in_guest("tar -C /var/log/ -cf - .")
- command = to_plc + "| tar -C logs/myplc.var-log.%s -xf -" % self.name()
+ command = to_plc + "| tar -C logs/myplc.var-log.{} -xf -".format(self.name())
utils.system(command)
- command = "chmod a+r,a+x logs/myplc.var-log.%s/httpd" % self.name()
+ command = "chmod a+r,a+x logs/myplc.var-log.{}/httpd".format(self.name())
utils.system(command)
def gather_pgsql_logs(self):
- utils.system("mkdir -p logs/myplc.pgsql-log.%s" % self.name())
+ utils.system("mkdir -p logs/myplc.pgsql-log.{}".format(self.name()))
to_plc = self.actual_command_in_guest("tar -C /var/lib/pgsql/data/pg_log/ -cf - .")
- command = to_plc + "| tar -C logs/myplc.pgsql-log.%s -xf -" % self.name()
+ command = to_plc + "| tar -C logs/myplc.pgsql-log.{} -xf -".format(self.name())
utils.system(command)
def gather_root_sfi(self):
- utils.system("mkdir -p logs/sfi.%s"%self.name())
+ utils.system("mkdir -p logs/sfi.{}".format(self.name()))
to_plc = self.actual_command_in_guest("tar -C /root/sfi/ -cf - .")
- command = to_plc + "| tar -C logs/sfi.%s -xf -"%self.name()
+ command = to_plc + "| tar -C logs/sfi.{} -xf -".format(self.name())
utils.system(command)
def gather_nodes_var_logs(self):
test_node = TestNode(self, test_site, node_spec)
test_ssh = TestSsh(test_node.name(), key="keys/key_admin.rsa")
command = test_ssh.actual_command("tar -C /var/log -cf - .")
- command = command + "| tar -C logs/node.var-log.%s -xf -" % test_node.name()
- utils.system("mkdir -p logs/node.var-log.%s" % test_node.name())
+ command = command + "| tar -C logs/node.var-log.{} -xf -".format(test_node.name())
+ utils.system("mkdir -p logs/node.var-log.{}".format(test_node.name()))
utils.system(command)
t = datetime.now()
d = t.date()
name = str(d)
- return "/root/%s-%s.sql" % (database, name)
+ return "/root/{}-{}.sql".format(database, name)
def plc_db_dump(self):
'dump the planetlab5 DB in /root in the PLC - filename has time'
dump=self.dbfile("planetab5")
self.run_in_guest('pg_dump -U pgsqluser planetlab5 -f '+ dump)
- utils.header('Dumped planetlab5 database in %s' % dump)
+ utils.header('Dumped planetlab5 database in {}'.format(dump))
return True
def plc_db_restore(self):
class TestSite:
- def __init__ (self,test_plc,site_spec):
- self.test_plc=test_plc
- self.site_spec=site_spec
+ def __init__ (self, test_plc, site_spec):
+ self.test_plc = test_plc
+ self.site_spec = site_spec
def name(self):
return self.site_spec['site_fields']['login_base']
print self.test_plc.auth_root()
self.test_plc.apiserver.AddSite(self.test_plc.auth_root(),
self.site_spec['site_fields'])
- self.test_plc.apiserver.AddSiteAddress(self.test_plc.auth_root(),self.name(),
+ self.test_plc.apiserver.AddSiteAddress(self.test_plc.auth_root(),
+ self.name(),
self.site_spec['address_fields'])
def create_users (self):
for user_spec in self.site_spec['users']:
- test_user=TestUser(self.test_plc,self,user_spec)
+ test_user = TestUser(self.test_plc, self, user_spec)
test_user.create_user()
test_user.add_keys()
def delete_site (self):
print self.test_plc.auth_root()
- self.test_plc.apiserver.DeleteSite(self.test_plc.auth_root(),self.name())
+ self.test_plc.apiserver.DeleteSite(self.test_plc.auth_root(), self.name())
return True
def delete_users(self):
for user_spec in self.site_spec['users']:
- test_user=TestUser(self.test_plc,self,user_spec)
+ test_user = TestUser(self.test_plc, self, user_spec)
test_user.delete_user()
- def locate_user (self,username):
+ def locate_user (self, username):
for user in self.site_spec['users']:
if user['name'] == username:
return user
if user['user_fields']['email'] == username:
return user
- raise Exception,"Cannot locate user %s"%username
+ raise Exception,"Cannot locate user {}".format(username)
- def locate_node (self,nodename):
+ def locate_node (self, nodename):
for node in self.site_spec['nodes']:
if node['name'] == nodename:
return node
- raise Exception,"Cannot locate node %s"%nodename
+ raise Exception,"Cannot locate node {}".format(nodename)
class CompleterTaskSliceSsh (CompleterTask):
def __init__ (self, test_plc, hostname, slicename, private_key,command, expected, dry_run):
- self.test_plc=test_plc
- self.hostname=hostname
- self.slicename=slicename
- self.private_key=private_key
- self.command=command
- self.dry_run=dry_run
- self.expected=expected
+ self.test_plc = test_plc
+ self.hostname = hostname
+ self.slicename = slicename
+ self.private_key = private_key
+ self.command = command
+ self.dry_run = dry_run
+ self.expected = expected
+
def run (self, silent):
(site_spec,node_spec) = self.test_plc.locate_hostname(self.hostname)
test_ssh = TestSsh (self.hostname,key=self.private_key,username=self.slicename)
if self.dry_run: return True
if self.expected: return retcod==0
else: return retcod!=0
+
def failure_epilogue (self):
if self.expected:
- print "Could not ssh into sliver %s@%s"%(self.slicename,self.hostname)
+ print "Could not ssh into sliver {}@{}".format(self.slicename, self.hostname)
else:
- print "Could still ssh into sliver%s@%s (that was expected to be down)"%(self.slicename,self.hostname)
+ print "Could still ssh into sliver{}@{} (that was expected to be down)"\
+ .format(self.slicename, self.hostname)
class TestSlice:
- def __init__ (self,test_plc,test_site,slice_spec):
- self.test_plc=test_plc
- self.test_site=test_site
- self.slice_spec=slice_spec
- self.test_ssh=TestSsh(self.test_plc.test_ssh)
+ def __init__ (self, test_plc, test_site, slice_spec):
+ self.test_plc = test_plc
+ self.test_site = test_site
+ self.slice_spec = slice_spec
+ self.test_ssh = TestSsh(self.test_plc.test_ssh)
def name(self):
return self.slice_spec['slice_fields']['name']
- def get_slice(self,slice_name):
+ def get_slice(self, slice_name):
for slice_spec in self.test_plc.plc_spec['slices']:
- if(slice_spec['slice_fields']['name']== slice_name):
+ if slice_spec['slice_fields']['name'] == slice_name:
return slice_spec
def owner_auth(self):
owner_spec = self.test_site.locate_user(self.slice_spec['owner'])
- return TestUser(self,self.test_site,owner_spec).auth()
+ return TestUser(self, self.test_site, owner_spec).auth()
def slice_name (self):
return self.slice_spec['slice_fields']['name']
auth = self.owner_auth()
slice_fields = self.slice_spec['slice_fields']
slice_name = slice_fields['name']
- utils.header("Creating slice %s"%slice_name)
- self.test_plc.apiserver.AddSlice(auth,slice_fields)
+ utils.header("Creating slice {}".format(slice_name))
+ self.test_plc.apiserver.AddSlice(auth, slice_fields)
for username in self.slice_spec['usernames']:
- user_spec=self.test_site.locate_user(username)
- test_user=TestUser(self,self.test_site,user_spec)
+ user_spec = self.test_site.locate_user(username)
+ test_user = TestUser(self,self.test_site,user_spec)
self.test_plc.apiserver.AddPersonToSlice(auth, test_user.name(), slice_name)
# add initscript code or name as appropriate
if self.slice_spec.has_key('initscriptcode'):
- iscode=self.slice_spec['initscriptcode']
- utils.header("Adding initscript code %s in %s"%(iscode,slice_name))
- self.test_plc.apiserver.AddSliceTag(self.test_plc.auth_root(), slice_name,'initscript_code',iscode)
+ iscode = self.slice_spec['initscriptcode']
+ utils.header("Adding initscript code {} in {}".format(iscode, slice_name))
+ self.test_plc.apiserver.AddSliceTag(self.test_plc.auth_root(), slice_name,
+ 'initscript_code', iscode)
elif self.slice_spec.has_key('initscriptname'):
- isname=self.slice_spec['initscriptname']
- utils.header("Adding initscript name %s in %s"%(isname,slice_name))
- self.test_plc.apiserver.AddSliceTag(self.test_plc.auth_root(), slice_name,'initscript',isname)
+ isname = self.slice_spec['initscriptname']
+ utils.header("Adding initscript name {} in {}".format(isname, slice_name))
+ self.test_plc.apiserver.AddSliceTag(self.test_plc.auth_root(), slice_name,
+ 'initscript', isname)
if 'omf-friendly' in self.slice_spec:
- utils.header("Making slice %s OMF-friendly"%slice_name)
- self.test_plc.apiserver.AddSliceTag(self.test_plc.auth_root(), slice_name,'vref','omf')
- self.test_plc.apiserver.AddSliceTag(self.test_plc.auth_root(), slice_name,'omf_control','yes')
+ utils.header("Making slice {} OMF-friendly".format(slice_name))
+ self.test_plc.apiserver.AddSliceTag(self.test_plc.auth_root(), slice_name, 'vref', 'omf')
+ self.test_plc.apiserver.AddSliceTag(self.test_plc.auth_root(), slice_name, 'omf_control', 'yes')
# setting vref directly like this was useful for multi-arch tests long ago - see wifilab
# however this should rather use other tags by now, so we drop this for now
# if self.slice_spec.has_key ('vref'):
-# vref_value=self.slice_spec['vref']
+# vref_value = self.slice_spec['vref']
# self.test_plc.apiserver.AddSliceTag(self.test_plc.auth_root(), slice_name,'vref',vref_value)
# epilogue
self.add_nodes()
auth = self.owner_auth()
slice_fields = self.slice_spec['slice_fields']
slice_name = slice_fields['name']
- vsys_tags = self.test_plc.apiserver.GetSliceTags (auth,{'tagname':'vsys','name':slice_name})
- values=[ st['value'] for st in vsys_tags ]
- expected=self.test_plc.plc_spec['expected_vsys_tags']
+ vsys_tags = self.test_plc.apiserver.GetSliceTags (auth, {'tagname' : 'vsys', 'name' : slice_name})
+ values = [st['value'] for st in vsys_tags]
+ expected = self.test_plc.plc_spec['expected_vsys_tags']
result = set(values) == set(expected)
if not result:
- print 'Check vsys defaults with slice %s'%slice_name
- print 'Expected %s'%expected
- print 'Got %s'%values
+ print 'Check vsys defaults with slice {}'.format(slice_name)
+ print 'Expected {}'.format(expected)
+ print 'Got {}'.format(values)
return result
# just add the nodes and handle tags
node_spec=self.test_site.locate_node(nodename)
test_node=TestNode(self.test_plc,self.test_site,node_spec)
hostnames += [test_node.name()]
- utils.header("Adding %r in %s"%(hostnames,slice_name))
+ utils.header("Adding {} in {}".format(hostnames, slice_name))
self.test_plc.apiserver.AddSliceToNodes(auth, slice_name, hostnames)
# trash the slice altogether
def delete_slice(self):
auth = self.owner_auth()
slice_name = self.slice_name()
- utils.header("Deleting slice %s"%slice_name)
+ utils.header("Deleting slice {}".format(slice_name))
self.test_plc.apiserver.DeleteSlice(auth,slice_name)
# keep the slice alive and just delete nodes
def delete_nodes (self):
auth = self.owner_auth()
slice_name = self.slice_name()
- print 'retrieving slice %s'%slice_name
+ print 'retrieving slice {}'.format(slice_name)
slice=self.test_plc.apiserver.GetSlices(auth,slice_name)[0]
node_ids=slice['node_ids']
- utils.header ("Deleting %d nodes from slice %s"%\
- (len(node_ids),slice_name))
+ utils.header ("Deleting {} nodes from slice {}"\
+ .format(len(node_ids), slice_name))
self.test_plc.apiserver.DeleteSliceFromNodes (auth,slice_name, node_ids)
def locate_private_key(self):
# graceout = timedelta(minutes=silent_minutes)
# period = timedelta(seconds=period_seconds)
if not command:
- command="echo hostname ; hostname; echo id; id; echo uname -a ; uname -a"
+ command = "echo hostname ; hostname; echo id; id; echo uname -a ; uname -a"
# locate a key
- private_key=self.locate_private_key()
+ private_key = self.locate_private_key()
if not private_key :
- utils.header("WARNING: Cannot find a valid key for slice %s"%self.name())
+ utils.header("WARNING: Cannot find a valid key for slice {}".format(self.name()))
return False
# convert nodenames to real hostnames
if expected: msg="ssh slice access enabled"
else: msg="ssh slice access disabled"
- utils.header("checking for %s -- slice %s"%(msg,self.name()))
+ utils.header("checking for {} -- slice {}".format(msg, self.name()))
tasks=[]
slicename=self.name()
# locate a key
private_key=self.locate_private_key()
if not private_key :
- utils.header("WARNING: Cannot find a valid key for slice %s"%self.name())
+ utils.header("WARNING: Cannot find a valid key for slice {}".format(self.name()))
return False
# convert nodenames to real hostnames
(site_spec,node_spec) = self.test_plc.locate_node(nodename)
hostname=node_spec['node_fields']['hostname']
- if expected: msg="%s to return TRUE from ssh"%command
- else: msg="%s to return FALSE from ssh"%command
+ if expected:
+ msg="{} to return TRUE from ssh".format(command)
+ else:
+ msg="{} to return FALSE from ssh".format(command)
- utils.header("checking %s -- slice %s on node %s"%(msg,self.name(),hostname))
+ utils.header("checking {} -- slice {} on node {}".format(msg, self.name(), hostname))
(site_spec,node_spec) = self.test_plc.locate_hostname(hostname)
test_ssh = TestSsh (hostname,key=private_key,username=self.name())
full_command = test_ssh.actual_command(command)
retcod = utils.system (full_command,silent=True)
- if getattr(options,'dry_run',None): return True
- if expected: success = retcod==0
- else: success = retcod!=0
- if not success: utils.header ("WRONG RESULT for %s"%msg)
+ if getattr(options,'dry_run',None):
+ return True
+ if expected:
+ success = retcod==0
+ else:
+ success = retcod!=0
+ if not success:
+ utils.header ("WRONG RESULT for {}".format(msg))
return success
# for TestPlc.slice_mapper__tasks
# use constant admin key
local_key = "keys/key_admin.rsa"
node_infos = self.test_plc.all_node_infos()
- rootfs="/vservers/%s"%self.name()
+ rootfs="/vservers/{}".format(self.name())
class CompleterTaskRootfs (CompleterTaskNodeSsh):
def __init__ (self, nodename, qemuname):
CompleterTaskNodeSsh.__init__(self,nodename, qemuname, local_key, expected=expected,
- command="ls -d %s"%rootfs)
+ command="ls -d {}".format(rootfs))
def failure_epilogue (self):
if expected:
- print "Could not stat %s - was expected to be present"%rootfs
+ print "Could not stat {} - was expected to be present".format(rootfs)
else:
- print "Sliver rootfs %s still present - this is unexpected"%rootfs
- utils.system(self.test_ssh.actual_command("ls -l %s; du -hs %s"%(rootfs,rootfs),dry_run=self.dry_run))
+ print "Sliver rootfs {} still present - this is unexpected".format(rootfs)
+ utils.system(self.test_ssh.actual_command("ls -l {rootfs}; du -hs {rootfs}".format(**locals()),
+ dry_run=self.dry_run))
return [ CompleterTaskRootfs (nodename, qemuname) for (nodename,qemuname) in node_infos ]
class TestSliceSfa:
def __init__ (self, test_auth_sfa, slice_spec):
- self.test_auth_sfa=test_auth_sfa
- self.slice_spec=slice_spec
+ self.test_auth_sfa = test_auth_sfa
+ self.slice_spec = slice_spec
# shortcuts
- self.test_plc=self.test_auth_sfa.test_plc
+ self.test_plc = self.test_auth_sfa.test_plc
def hrn (self):
return self.test_auth_sfa.obj_hrn(self.slice_spec['name'])
def sfi_user(self,*args,**kwds): return self.test_auth_sfa.sfi_user(*args, **kwds)
def discover_option(self):
- if self.rspec_style()=='pg': return "-r GENI"
- else: return "-r sfa"
+ if self.rspec_style() == 'pg':
+ return "-r GENI"
+ else:
+ return "-r sfa"
# those are step names exposed as methods of TestPlc, hence the _sfa
# needs to be run as pi
def sfa_register_slice(self,options):
"run sfi register (on Registry)"
- sfi_command="register"
+ sfi_command = "register"
sfi_command += " --type slice"
- sfi_command += " --xrn %s"%self.hrn()
+ sfi_command += " --xrn {}".format(self.hrn())
for opt in self.slice_spec['register_options']:
- sfi_command += " %s"%(opt)
+ sfi_command += " {}".format(opt)
return self.test_plc.run_in_guest(self.sfi_pi(sfi_command))==0
def sfa_renew_slice(self, options):
# we expect this to fail on too long term attemps, but to succeed otherwise
overall=True
for ( renew_until, expected) in [ (too_late, False), (one_month, True) ] :
- sfi_command="renew"
- sfi_command += " %s"%self.hrn()
- sfi_command += " %s"%renew_until
+ sfi_command = "renew"
+ sfi_command += " {}".format(self.hrn())
+ sfi_command += " {}".format(renew_until)
succeeded = self.test_plc.run_in_guest(self.sfi_user(sfi_command))==0
- if succeeded!=expected:
- utils.header ("Expecting success=%s, got %s"%(expected,succeeded))
+ if succeeded != expected:
+ utils.header ("Expecting success={}, got {}".format(expected, succeeded))
# however it turns out sfi renew always returns fine....
#overall=False
# so for helping manual checks:
# xxx this should use sfa_get_expires below and actually check the expected result
- sfi_command="show -k hrn -k expires %s"%self.hrn()
+ sfi_command = "show -k hrn -k expires {}".format(self.hrn())
self.test_plc.run_in_guest(self.sfi_user(sfi_command))
return overall
def sfa_get_expires (self, options):
- filename="%s.json"%self.hrn()
+ filename = "{}.json".format(self.hrn())
# /root/sfi/pg/<>
- inplc_filename=os.path.join(self.sfi_path(),filename)
+ inplc_filename = os.path.join(self.sfi_path(),filename)
# /vservers/<>/root/sfi/... - cannot use os.path
- inbox_filename="%s%s"%(self.test_plc.vm_root_in_host(),inplc_filename)
- sfi_command =""
- sfi_command += "-R %s --rawformat json"%inplc_filename
+ inbox_filename = "{}{}".format(self.test_plc.vm_root_in_host(), inplc_filename)
+ sfi_command = ""
+ sfi_command += "-R {} --rawformat json".format(inplc_filename)
sfi_command += " status"
- sfi_command += " %s"%self.hrn()
+ sfi_command += " {}".format(self.hrn())
# cannot find it if sfi status returns an error
if self.test_plc.run_in_guest (self.sfi_user(sfi_command)) !=0: return
if self.test_plc.test_ssh.fetch(inbox_filename,filename)!=0: return
try:
- with file(filename) as f: status = json.loads(f.read())
- value=status['value']
- sliver=value['geni_slivers'][0]
- expires=sliver['geni_expires']
- print " * expiration for %s (first sliver) -> %s"%(self.hrn(),expires)
+ with file(filename) as f:
+ status = json.loads(f.read())
+ value = status['value']
+ sliver = value['geni_slivers'][0]
+ expires = sliver['geni_expires']
+ print " * expiration for {} (first sliver) -> {}".format(self.hrn(), expires)
return expires
except:
traceback.print_exc()
# helper - filename to store a given result
- def _resname (self,name,ext): return "%s.%s"%(name,ext)
+ def _resname (self,name,ext): return "{}.{}".format(name, ext)
def adfile (self): return self._resname("ad","rspec")
def reqfile (self): return self._resname("req","rspec")
def empty_reqfile (self): return "empty-rspec.xml"
def sfa_discover(self,options):
"discover resources into resouces_in.rspec"
return self.test_plc.run_in_guest(self.sfi_user(\
- "resources %s -o %s/%s"% (self.discover_option(),self.sfi_path(),self.adfile())))==0
+ "resources {} -o {}/{}"\
+ .format(self.discover_option(),self.sfi_path(),self.adfile()))) == 0
def sfa_rspec(self,options):
"invoke sfiListNodes and sfiAddSlivers to prepare a rspec"
- commands=[
- "sfiListNodes.py -i %s/%s -o %s/%s"%(self.sfi_path(),self.adfile(),self.sfi_path(),self.nodefile()),
- "sfiAddSliver.py -i %s/%s -n %s/%s -o %s/%s"%\
- (self.sfi_path(),self.adfile(),self.sfi_path(),self.nodefile(),self.sfi_path(),self.reqfile()),
+ commands = [
+ "sfiListNodes.py -i {}/{} -o {}/{}".format(self.sfi_path(), self.adfile(),
+ self.sfi_path(), self.nodefile()),
+ "sfiAddSliver.py -i {}/{} -n {}/{} -o {}/{}".format(self.sfi_path(), self.adfile(),
+ self.sfi_path(), self.nodefile(),
+ self.sfi_path(), self.reqfile()),
]
for command in commands:
- if self.test_plc.run_in_guest(command)!=0: return False
+ if self.test_plc.run_in_guest(command) != 0: return False
return True
def _sfa_allocate(self,file,options):
- command=self.sfi_user("allocate %s %s"%(self.hrn(),file))
- return self.test_plc.run_in_guest(command)==0
+ command = self.sfi_user("allocate {} {}".format(self.hrn(), file))
+ return self.test_plc.run_in_guest(command) == 0
def sfa_allocate(self,options):
"invoke run sfi allocate (on SM)"
def sfa_provision(self,options):
"invoke run sfi provision (on SM)"
- command=self.sfi_user("provision %s"%(self.hrn()))
- return self.test_plc.run_in_guest(command)==0
+ command = self.sfi_user("provision {}".format(self.hrn()))
+ return self.test_plc.run_in_guest(command) == 0
# just a synonym
sfa_provision_empty = sfa_provision
def plc_name (self):
- return "%s_%s"%(self.test_auth_sfa.login_base,self.slice_spec['name'])
+ return "{}_{}".format(self.test_auth_sfa.login_base, self.slice_spec['name'])
# all local nodes in slice ?
def sfa_check_slice_plc (self,options):
"check the slice has been created at the plc - all local nodes should be in slice"
- slice=self.test_plc.apiserver.GetSlices(self.test_plc.auth_root(), self.plc_name())[0]
- nodes=self.test_plc.apiserver.GetNodes(self.test_plc.auth_root(), {'peer_id':None})
- result=True
+ slice = self.test_plc.apiserver.GetSlices(self.test_plc.auth_root(), self.plc_name())[0]
+ nodes = self.test_plc.apiserver.GetNodes(self.test_plc.auth_root(), {'peer_id':None})
+ result = True
for node in nodes:
if node['node_id'] in slice['node_ids']:
- utils.header("local node %s found in slice %s"%(node['hostname'],slice['name']))
+ utils.header("local node {} found in slice {}".format(node['hostname'], slice['name']))
else:
- utils.header("ERROR - local node %s NOT FOUND in slice %s"%(node['hostname'],slice['name']))
- result=False
+ utils.header("ERROR - local node {} NOT FOUND in slice {}"\
+ .format(node['hostname'], slice['name']))
+ result = False
return result
# no node left in slice ?
def sfa_check_slice_plc_empty (self,options):
"check the slice have been emptied at the plcs - no node should be in slice"
- slices=self.test_plc.apiserver.GetSlices(self.test_plc.auth_root(),
+ slices = self.test_plc.apiserver.GetSlices(self.test_plc.auth_root(),
self.plc_name(),
['node_ids'])
return not slices[0]['node_ids']
# run as pi
def sfa_delete_slice(self,options):
"run sfi delete"
- self.test_plc.run_in_guest(self.sfi_pi("delete %s"%(self.hrn(),)))
- return self.test_plc.run_in_guest(self.sfi_pi("remove -t slice %s"%(self.hrn(),)))==0
+ self.test_plc.run_in_guest(self.sfi_pi("delete {}".format(self.hrn())))
+ return self.test_plc.run_in_guest(self.sfi_pi("remove -t slice {}".format(self.hrn()))) == 0
def locate_private_key(self):
return self.test_plc.locate_private_key_from_key_names ( [ self.slice_spec['key_name'] ] )
# check the resulting sliver
- def ssh_slice_sfa(self,options,timeout_minutes=40,silent_minutes=0,period_seconds=15):
+ def ssh_slice_sfa(self, options, timeout_minutes=40, silent_minutes=0, period_seconds=15):
"tries to ssh-enter the SFA slice"
timeout = timedelta(minutes=timeout_minutes)
graceout = timedelta(minutes=silent_minutes)
# locate a key
private_key=self.locate_private_key()
if not private_key :
- utils.header("WARNING: Cannot find a valid key for slice %s"%self.name())
+ utils.header("WARNING: Cannot find a valid key for slice {}".format(self.name()))
return False
command="echo hostname ; hostname; echo id; id; echo uname -a ; uname -a"
dry_run = getattr(options,'dry_run',False)
for nodename in self.slice_spec['nodenames']:
(site_spec,node_spec) = self.test_plc.locate_node(nodename)
- tasks.append( CompleterTaskSliceSsh(self.test_plc,node_spec['node_fields']['hostname'],
- slicename,private_key,command,expected=True,dry_run=dry_run))
- return Completer (tasks, message='ssh_slice_sfa').run (timeout, graceout, period)
+ tasks.append( CompleterTaskSliceSsh(self.test_plc, node_spec['node_fields']['hostname'],
+ slicename, private_key, command,
+ expected=True, dry_run=dry_run))
+ return Completer (tasks, message='ssh_slice_sfa').run(timeout, graceout, period)
class TestSliver:
- def __init__ (self,test_plc,test_node,test_slice):
- self.test_plc=test_plc
- self.test_node=test_node
- self.test_slice=test_slice
+ def __init__(self, test_plc, test_node, test_slice):
+ self.test_plc = test_plc
+ self.test_node = test_node
+ self.test_slice = test_slice
self.test_ssh = self.create_test_ssh()
def get_privateKey(self):
- slice_spec=self.test_slice.slice_spec
+ slice_spec = self.test_slice.slice_spec
try:
- (found,privatekey)=self.test_slice.locate_key()
- return (found,privatekey)
+ (found, privatekey) = self.test_slice.locate_key()
+ return (found, privatekey)
except Exception,e:
print str(e)
def create_test_ssh(self):
private_key = self.test_slice.locate_private_key()
if not private_key:
- raise Exception,"Cannot find the private key for slice %s"%self.test_slice.name()
- return TestSsh (self.test_node.name(),key=private_key,username=self.test_slice.name(),
+ raise Exception,"Cannot find the private key for slice {}".format(self.test_slice.name())
+ return TestSsh (self.test_node.name(), key=private_key, username=self.test_slice.name(),
# so that copies end up in the home dir
buildname=".")
def name(self):
- return "%s@%s"%(self.test_slice.name(),self.test_node.name())
+ return "{}@{}".format(self.test_slice.name(), self.test_node.name())
def check_initscript_stamp(self, stamp):
- utils.header("Checking for initscript stamp %s on sliver %s"%(stamp, self.name()))
- return self.test_ssh.run("ls -l /var/tmp/%s.stamp"%stamp)==0
+ utils.header("Checking for initscript stamp {} on sliver {}".format(stamp, self.name()))
+ return self.test_ssh.run("ls -l /var/tmp/{}.stamp".format(stamp)) == 0
def check_tcp_ready (self, port):
- ready_command = "./tcptest.py ready -p %d"%(port)
+ ready_command = "./tcptest.py ready -p {}".format(port)
return self.test_ssh.copy("tcptest.py") == 0 and \
self.test_ssh.run(ready_command) == 0
def run_tcp_server (self, port, timeout=10):
- server_command = "./tcptest.py server -p %d -t %d"%(port, timeout)
+ server_command = "./tcptest.py server -p {} -t {}".format(port, timeout)
return self.test_ssh.copy("tcptest.py") == 0 and \
- self.test_ssh.run(server_command, background=True)==0
+ self.test_ssh.run(server_command, background=True) == 0
def run_tcp_client (self, servername, port, retry=5):
- client_command="./tcptest.py client -a %s -p %d"%(servername, port)
+ client_command = "./tcptest.py client -a {} -p {}".format(servername, port)
if self.test_ssh.copy("tcptest.py") != 0:
return False
if self.test_ssh.run(client_command) == 0:
#def tar_var_logs (self):
# return self.test_ssh.actual_command("sudo tar -C /var/log -cf - .")
def tar_var_logs (self):
- test_ssh=self.test_node.create_test_ssh()
- dir_to_tar="/vservers/%s/var/log"%self.test_slice.name()
- return test_ssh.actual_command("tar -C %s -cf - ."%dir_to_tar)
+ test_ssh = self.test_node.create_test_ssh()
+ dir_to_tar = "/vservers/{}/var/log".format(self.test_slice.name())
+ return test_ssh.actual_command("tar -C {} -cf - .".format(dir_to_tar))
def check_hooks (self):
print 'NOTE: slice hooks check scripts NOT (yet?) run in sudo'
extensions = [ 'py','pl','sh' ]
- path='hooks/slice/'
- scripts=utils.locate_hooks_scripts ('sliver '+self.name(), path,extensions)
+ path = 'hooks/slice/'
+ scripts = utils.locate_hooks_scripts ('sliver '+self.name(), path,extensions)
overall = True
for script in scripts:
if not self.check_hooks_script (script):
return overall
def check_hooks_script (self,local_script):
- script_name=os.path.basename(local_script)
- utils.header ("SLIVER hook %s (%s)"%(script_name,self.name()))
- test_ssh=self.create_test_ssh()
+ script_name = os.path.basename(local_script)
+ utils.header ("SLIVER hook {} ({})".format(script_name, self.name()))
+ test_ssh = self.create_test_ssh()
test_ssh.copy_home(local_script)
if test_ssh.run("./"+script_name) != 0:
- utils.header ("WARNING: hooks check script %s FAILED (ignored)"%script_name)
+ utils.header ("WARNING: hooks check script {} FAILED (ignored)".format(script_name))
#return False
return True
else:
- utils.header ("SUCCESS: sliver hook %s OK"%script_name)
+ utils.header ("SUCCESS: sliver hook {} OK".format(script_name))
return True
remote_ip = socket.gethostbyname(hostname)
return local_ip == remote_ip
except:
- utils.header("WARNING : something wrong in is_local_hostname with hostname=%s"%hostname)
+ utils.header("WARNING : something wrong in is_local_hostname with hostname={}".format(hostname))
return False
# some boxes have their working space in user's homedir (/root),
def key_part(self):
if not self.key:
return ""
- return "-i %s " % self.key
+ return "-i {} ".format(self.key)
def hostname_part(self):
if not self.username:
return self.hostname
else:
- return "%s@%s" % (self.username,self.hostname)
+ return "{}@{}".format(self.username,self.hostname)
# command gets run on the right box
def actual_command(self, command, keep_stdin=False, dry_run=False, backslash=True):
if self.is_local():
return utils.system(command, background)
self.create_buildname_once(dry_run)
- return self.run("cd %s ; %s" % (self.fullname(self.buildname), command),
+ return self.run("cd {} ; {}".format(self.fullname(self.buildname), command),
background=background, dry_run=dry_run)
def fullname(self, dirname):
# ab. paths remain as-is
if not abs:
if dirname:
- dirname = "%s/%s" % (self.buildname,dirname)
+ dirname = "{}/{}".format(self.buildname, dirname)
else:
dirname = self.buildname
dirname = self.fullname(dirname)
if dirname == '.':
return
- return self.run("mkdir -p %s" % dirname, dry_run=dry_run)
+ return self.run("mkdir -p {}".format(dirname), dry_run=dry_run)
def rmdir(self, dirname=None, dry_run=False):
if self.is_local():
return shutil.rmtree(dirname)
return 0
if dirname:
- dirname = "%s/%s" % (self.buildname,dirname)
+ dirname = "{}/{}".format(self.buildname, dirname)
else:
dirname = self.buildname
dirname = self.fullname(dirname)
- return self.run("rm -rf %s" % dirname, dry_run=dry_run)
+ return self.run("rm -rf {}".format(dirname), dry_run=dry_run)
def create_buildname_once(self, dry_run):
if self.is_local():
if recursive:
scp_command += "-r "
scp_command += self.key_part()
- scp_command += "%s %s:%s/%s" % (local_file, self.hostname_part(),
- self.fullname(self.buildname),
- os.path.basename(local_file) or ".")
+ scp_command += "{} {}:{}/{}".format(local_file, self.hostname_part(),
+ self.fullname(self.buildname),
+ os.path.basename(local_file) or ".")
if dry_run:
- utils.header("DRY RUN TestSsh.copy %s" % scp_command)
+ utils.header("DRY RUN TestSsh.copy {}".format(scp_command))
# need to be consistent with the non-dry-run mode
return 0
return utils.system(scp_command)
if self.is_local():
dest = ""
else:
- dest = "%s:" % self.hostname_part()
+ dest = "{}:".format(self.hostname_part())
scp_command = "scp "
scp_command += TestSsh.std_options
if recursive:
scp_command += "-r "
scp_command += self.key_part()
- scp_command += "%s %s%s" % (local_file, dest, remote_file)
+ scp_command += "{} {}{}".format(local_file, dest, remote_file)
if dry_run:
- utils.header("DRY RUN TestSsh.copy %s" % scp_command)
+ utils.header("DRY RUN TestSsh.copy {}".format(scp_command))
# need to be consistent with the non-dry-run mode
return 0
return utils.system(scp_command)
command="cp "
if recursive:
command += "-r "
- command += "%s %s" % (remote_file,local_file)
+ command += "{} {}".format(remote_file, local_file)
else:
command = "scp "
if not dry_run:
if remote_file.find("/") == 0:
remote_path = remote_file
else:
- remote_path = "%s/%s" % (self.buildname, remote_file)
+ remote_path = "{}/{}".format(self.buildname, remote_file)
remote_path = self.fullname(remote_path)
- command += "%s:%s %s" % (self.hostname_part(), remote_path, local_file)
+ command += "{}:{} {}".format(self.hostname_part(), remote_path, local_file)
return utils.system(command)
# this is only to avoid harmless message when host cannot be identified
# the only place where this is needed is when tring to reach a slice in a node,
# which is done from the test master box
def clear_known_hosts(self):
- known_hosts = "%s/.ssh/known_hosts" % os.getenv("HOME")
- utils.header("Clearing entry for %s in %s" % (self.hostname, known_hosts))
- return utils.system("sed -i -e /^%s/d %s" % (self.hostname, known_hosts))
+ known_hosts = "{}/.ssh/known_hosts".format(os.getenv("HOME"))
+ utils.header("Clearing entry for {} in {}".format(self.hostname, known_hosts))
+ return utils.system("sed -i -e /^{}/d {}".format(self.hostname, known_hosts))
class TestUser:
- def __init__ (self,test_plc,test_site,user_spec):
- self.test_plc=test_plc
- self.test_site=test_site
- self.user_spec=user_spec
+ def __init__ (self, test_plc, test_site, user_spec):
+ self.test_plc = test_plc
+ self.test_site = test_site
+ self.user_spec = user_spec
def name(self):
return self.user_spec['user_fields']['email']
def auth (self):
- person=self.user_spec['user_fields']
- return {'Username':person['email'],
- 'AuthMethod':'password',
- 'AuthString':person['password'],
- 'Role':self.user_spec['roles'][0],
+ person = self.user_spec['user_fields']
+ return {'Username' : person['email'],
+ 'AuthMethod' : 'password',
+ 'AuthString' : person['password'],
+ 'Role' : self.user_spec['roles'][0],
}
def create_user (self):
- user_spec=self.user_spec
- fields=user_spec['user_fields']
- auth=self.test_plc.auth_root()
- utils.header('Adding user %s - roles %r'%(fields['email'],user_spec['roles']))
- self.test_plc.apiserver.AddPerson(auth,fields)
- self.test_plc.apiserver.UpdatePerson(auth,fields['email'],{'enabled': True})
+ user_spec = self.user_spec
+ fields = user_spec['user_fields']
+ auth = self.test_plc.auth_root()
+ utils.header('Adding user {} - roles {}'.format(fields['email'], user_spec['roles']))
+ self.test_plc.apiserver.AddPerson(auth, fields)
+ self.test_plc.apiserver.UpdatePerson(auth, fields['email'], {'enabled': True})
for role in user_spec['roles']:
self.test_plc.apiserver.AddRoleToPerson(auth,role,fields['email'])
self.test_plc.apiserver.AddPersonToSite(auth,
- self.name(),
- self.test_site.name())
+ self.name(),
+ self.test_site.name())
def delete_user(self):
- auth=self.test_plc.auth_root()
+ auth = self.test_plc.auth_root()
self.test_plc.apiserver.DeletePerson(auth,self.name())
def add_keys (self):
- user_spec=self.user_spec
+ user_spec = self.user_spec
for key_name in user_spec['key_names']:
- key_spec=self.test_plc.locate_key(key_name)
- auth=self.auth()
+ key_spec = self.test_plc.locate_key(key_name)
+ auth = self.auth()
self.test_plc.apiserver.AddPersonKey(auth,self.name(), key_spec['key_fields'])
class TestUserSfa:
- def __init__ (self,test_auth_sfa, user_spec):
- self.test_auth_sfa=test_auth_sfa
- self.user_spec=user_spec
+ def __init__ (self, test_auth_sfa, user_spec):
+ self.test_auth_sfa = test_auth_sfa
+ self.user_spec = user_spec
# shortcuts
- self.test_plc=self.test_auth_sfa.test_plc
- self.login_base=self.test_auth_sfa.login_base
+ self.test_plc = self.test_auth_sfa.test_plc
+ self.login_base = self.test_auth_sfa.login_base
def sfi_path(self):
return self.test_auth_sfa.sfi_path()
def hrn(self):
return self.test_auth_sfa.obj_hrn(self.user_spec['name'])
- def sfi_pi(self,*args,**kwds):
+ def sfi_pi(self, *args, **kwds):
return self.test_auth_sfa.sfi_pi(*args, **kwds)
- def sfi_user(self,*args,**kwds):
+ def sfi_user(self, *args, **kwds):
return self.test_auth_sfa.sfi_user(*args, **kwds)
# xxx todo - not the right place any longer - or is it ?
- def sfa_register_user (self,options):
+ def sfa_register_user (self, options):
"add a regular user using sfi register"
user_hrn = self.hrn()
- command="register"
+ command = "register"
command += " --type user"
- command += " --xrn %s"%user_hrn
- command += " --email %s"%self.user_spec['email']
+ command += " --xrn {}".format(user_hrn)
+ command += " --email {}".format(self.user_spec['email'])
command += " " + " ".join(self.user_spec['register_options'])
# handle key separately because of embedded whitespace
# hack - the user's pubkey is avail from his hrn
- command += " -k %s/%s.pub"%(self.sfi_path(),user_hrn)
- return self.test_plc.run_in_guest(self.sfi_pi(command))==0
+ command += " -k {}/{}.pub".format(self.sfi_path(), user_hrn)
+ return self.test_plc.run_in_guest(self.sfi_pi(command)) == 0
def sfa_update_user (self,options):
"update a user record using sfi update"
user_hrn = self.hrn()
command="update"
command += " --type user"
- command += " --xrn %s"%user_hrn
+ command += " --xrn {}".format(user_hrn)
command += " " + " ".join(self.user_spec['update_options'])
- return self.test_plc.run_in_guest(self.sfi_pi(command))==0
+ return self.test_plc.run_in_guest(self.sfi_pi(command)) == 0
def sfa_delete_user(self,options):
"run sfi delete on user record"
user_hrn = self.hrn()
- command="remove -t user %s"%user_hrn
- return \
- self.test_plc.run_in_guest(self.sfi_pi(command))==0
+ command = "remove -t user {}".format(user_hrn)
+ return self.test_plc.run_in_guest(self.sfi_pi(command)) == 0
from optparse import OptionParser
def myprint(message, id='client'):
- now=time.strftime("%H:%M:%S", time.localtime())
- print "*",now,'(%s)' % id, '--',message
+ now = time.strftime("%H:%M:%S", time.localtime())
+ print "* {now} ({id}) -- {message}".format(**locals())
sys.stdout.flush()
def show_network_status(id):
mout=i*'ping ' + '\n'
min=mout.upper()
if s.send(mout) != len(mout):
- myprint("cannot send %s"%mout.strip())
+ myprint("cannot send {}".format(mout.strip()))
result=False
break
line=s.recv(len(min))
if line is not line:
- myprint("unexpected reception\ngot:%s\nexpected: %s",line,min)
- result=False
+ myprint("unexpected reception\ngot:{}\nexpected: {}".format(line, min))
+ result = False
else:
- myprint("OK:%s"%mout.strip())
+ myprint("OK:{}".format(mout.strip()))
# leave the connection open, but the last one (so 1 iter returns fast)
if i != options.loops:
time.sleep(options.sleep)
def init_options(options_arg):
global options
- options=options_arg
+ options = options_arg
# how could this accept a list again ?
def header(message):
- now=time.strftime("%H:%M:%S", time.localtime())
- print "*",now,'--',message
+ now = time.strftime("%H:%M:%S", time.localtime())
+ print "*", now, '--', message
-def pprint(message,spec,depth=2):
- now=time.strftime("%H:%M:%S", time.localtime())
- print ">",now,"--",message
- PrettyPrinter(indent=8,depth=depth).pprint(spec)
+def pprint(message, spec, depth=2):
+ now = time.strftime("%H:%M:%S", time.localtime())
+ print ">", now, "--", message
+ PrettyPrinter(indent=8, depth=depth).pprint(spec)
def system(command, background=False, silent=False, dry_run=None):
dry_run = dry_run if dry_run is not None else getattr(options, 'dry_run', False)
if dry_run:
- print 'dry_run:',command
+ print 'dry_run:', command
return 0
if silent :
- if command.find(';')>=0: command = "(%s) 2> /dev/null" % command
+ if command.find(';') >= 0:
+ command = "({}) 2> /dev/null".format(command)
else: command += " 2> /dev/null"
- if background: command += " &"
+ if background:
+ command += " &"
if silent:
print '.',
sys.stdout.flush()
else:
- now=time.strftime("%H:%M:%S", time.localtime())
+ now = time.strftime("%H:%M:%S", time.localtime())
# don't show in summary
- print "->",now,'--',
+ print "->", now, '--',
sys.stdout.flush()
if not silent:
command = "set -x; " + command
### WARNING : this ALWAYS does its job, even in dry_run mode
def output_of (command):
import commands
- (code,string) = commands.getstatusoutput(command)
- return (code,string)
+ (code, string) = commands.getstatusoutput(command)
+ return (code, string)
# convenience: translating shell-like pattern into regexp
def match (string, pattern):
# tmp - there's probably much simpler
# rewrite * into .*, ? into .
- pattern=pattern.replace("*",".*")
- pattern=pattern.replace("?",".")
+ pattern = pattern.replace("*",".*")
+ pattern = pattern.replace("?",".")
return re.compile(pattern).match(string)
def locate_hooks_scripts (message, path, extensions):
- print message,'searching',path,'for extensions',extensions
- scripts=[]
+ print message, 'searching', path, 'for extensions', extensions
+ scripts = []
for ext in extensions:
# skip helper programs
- scripts += glob.glob (path+'/[a-zA-Z]*.'+ext)
+ scripts += glob.glob (path+'/[a-zA-Z]*.' + ext)
return scripts
# quick & dirty - should probably use the parseroption object instead
# and move to TestMain as well
exclude_options_keys = [ 'ensure_value' , 'read_file', 'read_module' ]
def show_options (message, options):
- now=time.strftime("%H:%M:%S", time.localtime())
- print ">",now,"--",message
+ now = time.strftime("%H:%M:%S", time.localtime())
+ print ">", now, "--", message
for k in dir(options):
- if k.find("_")==0: continue
- if k in exclude_options_keys: continue
- print " ",k,":",getattr(options,k)
+ if k.find("_") == 0:
+ continue
+ if k in exclude_options_keys:
+ continue
+ print " ", k, ":", getattr(options, k)