-#!/usr/bin/python -u
+#!/usr/bin/python3 -u
# Thierry Parmentelat <thierry.parmentelat@inria.fr>
# Copyright (C) 2010 INRIA
import utils
from TestPlc import TestPlc, Ignored
+from TestBonding import TestBonding, onelab_bonding_spec
from TestSite import TestSite
from TestNode import TestNode
from macros import sequences
else:
try:
self.substeps = sequences[self.internal()]
- except Exception,e:
- print "macro step %s not found in macros.py (%s) - exiting" % (self.display(),e)
+ except Exception as e:
+ print("macro step {} not found in macros.py ({}) - exiting".format(self.display(),e))
raise
def print_doc (self, level=0):
width = tab - level - 2
format = "%%-%ds" % width
line = start + format % self.display()
- print line,
+ print(line, end=' ')
try:
- print self.method.__doc__
+ print(self.method.__doc__)
except:
- print "*** no doc found"
+ print("*** no doc found")
else:
beg_start = level*' ' + '>>> '
end_start = level*' ' + '<<< '
format = "%%-%ds" % width
beg_line = beg_start + format % self.display() + trail*'>'
end_line = end_start + format % self.display() + trail*'<'
- print beg_line
+ print(beg_line)
for step in self.substeps:
Step(step).print_doc(level+1)
- print end_line
+ print(end_line)
# return a list of (name, method) for all native steps involved
def tuples (self):
# just do a listdir, hoping we're in the right directory...
@staticmethod
def list_macros ():
- names= sequences.keys()
+ names= list(sequences.keys())
names.sort()
return names
default_build_url = "git://git.onelab.eu/tests"
def __init__(self):
- self.path = os.path.dirname(sys.argv[0]) or "."
+ self.path = os.path.dirname(sys.argv[0]) or "."
os.chdir(self.path)
def show_env(self, options, message):
utils.show_options("main options", options)
def init_steps(self):
- self.steps_message = 20*'x' + " Defaut steps are\n" + \
- TestPlc.printable_steps(TestPlc.default_steps)
- self.steps_message += 20*'x' + " Other useful steps are\n" + \
- TestPlc.printable_steps(TestPlc.other_steps)
- self.steps_message += 20*'x' + " Macro steps are\n" + \
- " ".join(Step.list_macros())
+ self.steps_message = ""
+ if not self.options.bonding:
+ self.steps_message += 20*'x' + " Defaut steps are\n" + \
+ TestPlc.printable_steps(TestPlc.default_steps)
+ self.steps_message += 20*'x' + " Other useful steps are\n" + \
+ TestPlc.printable_steps(TestPlc.other_steps)
+ self.steps_message += 20*'x' + " Macro steps are\n" + \
+ " ".join(Step.list_macros())
+ else:
+ self.steps_message += 20*'x' + " Default steps with bonding are\n" + \
+ TestPlc.printable_steps(TestPlc.bonding_steps)
def list_steps(self):
if not self.options.verbose:
- print self.steps_message
+ print(self.steps_message)
else:
# steps mentioned on the command line
- if self.options.args:
- scopes = [("Argument steps",self.options.args)]
+ if self.options.steps:
+ scopes = [("Argument steps",self.options.steps)]
else:
scopes = [("Default steps", TestPlc.default_steps)]
if self.options.all_steps:
# try to list macro steps as well
scopes.append ( ("Macro steps", Step.list_macros()) )
for (scope, steps) in scopes:
- print '--------------------', scope
+ print('--------------------', scope)
for step in [step for step in steps if TestPlc.valid_step(step)]:
try:
(step, qualifier) = step.split('@')
Step(stepname).print_doc()
def run (self):
- self.init_steps()
usage = """usage: %%prog [options] steps
arch-rpms-url defaults to the last value used, as stored in arg-arch-rpms-url,
no default
config defaults to the last value used, as stored in arg-config,
- or %r
-ips_vnode, ips_vplc and ips_qemu defaults to the last value used, as stored in arg-ips-{bplc,vplc,bnode,vnode},
+ or {}
+ips_vnode, ips_vplc and ips_qemu defaults to the last value used,
+ as stored in arg-ips-{{bplc,vplc,bnode,vnode}},
default is to use IP scanning
steps refer to a method in TestPlc or to a step_* module
+
+run with -l to see a list of available steps
===
-"""%(TestMain.default_config)
- usage += self.steps_message
+""".format(TestMain.default_config)
parser = ArgumentParser(usage = usage)
parser.add_argument("-u", "--url", action="store", dest="arch_rpms_url",
help="Show environment and exits")
parser.add_argument("-t", "--trace", action="store", dest="trace_file", default=None,
help="Trace file location")
-# parser.add_argument("-g", "--bonding", action='store', dest='bonding', default=None,
-# help="specify build to bond with")
+ parser.add_argument("-g", "--bonding", action='store', dest='bonding', default=None,
+ help="specify build to bond with")
parser.add_argument("steps", nargs='*')
self.options = parser.parse_args()
def flatten (x):
result = []
for el in x:
- if hasattr(el, "__iter__") and not isinstance(el, basestring):
+ if hasattr(el, "__iter__") and not isinstance(el, str):
result.extend(flatten(el))
else:
result.append(el)
('pldistro', 'arg-pldistro', "onelab", None),
('fcdistro', 'arg-fcdistro', 'f14', None),
):
-# print 'handling',recname
+# print('handling', recname)
path = filename
is_list = isinstance(default, list)
is_bool = isinstance(default, bool)
if not getattr(self.options, recname):
try:
- parsed = file(path).readlines()
+ with open(path) as file:
+ parsed = file.readlines()
if is_list: # lists
parsed = [x.strip() for x in parsed]
else: # strings and booleans
if len(parsed) != 1:
- print "%s - error when parsing %s" % (sys.argv[1],path)
+ print("{} - error when parsing {}".format(sys.argv[1], path))
sys.exit(1)
parsed = parsed[0].strip()
if is_bool:
parsed = parsed.lower() == 'true'
setattr(self.options, recname, parsed)
- except:
+ except Exception as e:
if default != "":
setattr(self.options, recname, default)
else:
- print "Cannot determine", recname
- print "Run %s --help for help" % sys.argv[0]
+ print("Cannot determine", recname, e)
+ print("Run {} --help for help".format(sys.argv[0]))
sys.exit(1)
# save for next run
else: # strings and booleans - just call str()
fsave.write(str(getattr(self.options, recname)) + "\n")
fsave.close()
-# utils.header('Saved %s into %s'%(recname,filename))
+# utils.header('Saved {} into {}'.format(recname, filename))
# lists need be reversed
# I suspect this is useful for the various pools but for config, it's painful
getattr(self.options, recname).reverse()
if self.options.verbose:
- utils.header('* Using %s = %s' % (recname, getattr(self.options, recname)))
+ utils.header('* Using {} = {}'.format(recname, getattr(self.options, recname)))
# hack : if sfa is not among the published rpms, skip these tests
TestPlc.check_whether_build_has_sfa(self.options.arch_rpms_url)
- # use the default list of steps if unspecified
- if len(self.options.steps) == 0:
- self.options.steps = TestPlc.default_steps
+ # initialize steps
+ if not self.options.steps:
+ # defaults, depends on using bonding or not
+ if self.options.bonding:
+ self.options.steps = TestPlc.bonding_steps
+ else:
+ self.options.steps = TestPlc.default_steps
if self.options.list_steps:
self.init_steps()
self.list_steps()
return 'SUCCESS'
- # steps
- if not self.options.steps:
- #default (all) steps
- #self.options.steps=['dump','clean','install','populate']
- self.options.steps = TestPlc.default_steps
-
# rewrite '-' into '_' in step names
self.options.steps = [ step.replace('-', '_') for step in self.options.steps ]
self.options.exclude = [ step.replace('-', '_') for step in self.options.exclude ]
all_plc_specs = m.config(all_plc_specs, self.options)
except :
traceback.print_exc()
- print 'Cannot load config %s -- ignored' % modulename
+ print('Cannot load config {} -- ignored'.format(modulename))
raise
# provision on local substrate
all_plc_specs = LocalSubstrate.local_substrate.provision(all_plc_specs, self.options)
# remember substrate IP address(es) for next run
- ips_bplc_file = open('arg-ips-bplc', 'w')
- for plc_spec in all_plc_specs:
- ips_bplc_file.write("%s\n" % plc_spec['host_box'])
- ips_bplc_file.close()
- ips_vplc_file = open('arg-ips-vplc', 'w')
- for plc_spec in all_plc_specs:
- ips_vplc_file.write("%s\n" % plc_spec['settings']['PLC_API_HOST'])
- ips_vplc_file.close()
+ with open('arg-ips-bplc', 'w') as ips_bplc_file:
+ for plc_spec in all_plc_specs:
+ ips_bplc_file.write("{}\n".format(plc_spec['host_box']))
+ with open('arg-ips-vplc', 'w') as ips_vplc_file:
+ for plc_spec in all_plc_specs:
+ ips_vplc_file.write("{}\n".format(plc_spec['settings']['PLC_API_HOST']))
# ditto for nodes
- ips_bnode_file = open('arg-ips-bnode', 'w')
- for plc_spec in all_plc_specs:
- for site_spec in plc_spec['sites']:
- for node_spec in site_spec['nodes']:
- ips_bnode_file.write("%s\n" % node_spec['host_box'])
- ips_bnode_file.close()
- ips_vnode_file = open('arg-ips-vnode','w')
- for plc_spec in all_plc_specs:
- for site_spec in plc_spec['sites']:
- for node_spec in site_spec['nodes']:
- # back to normal (unqualified) form
- stripped = node_spec['node_fields']['hostname'].split('.')[0]
- ips_vnode_file.write("%s\n" % stripped)
- ips_vnode_file.close()
+ with open('arg-ips-bnode', 'w') as ips_bnode_file:
+ for plc_spec in all_plc_specs:
+ for site_spec in plc_spec['sites']:
+ for node_spec in site_spec['nodes']:
+ ips_bnode_file.write("{}\n".format(node_spec['host_box']))
+ with open('arg-ips-vnode','w') as ips_vnode_file:
+ for plc_spec in all_plc_specs:
+ for site_spec in plc_spec['sites']:
+ for node_spec in site_spec['nodes']:
+ # back to normal (unqualified) form
+ stripped = node_spec['node_fields']['hostname'].split('.')[0]
+ ips_vnode_file.write("{}\n".format(stripped))
# build a TestPlc object from the result, passing options
for spec in all_plc_specs:
# pass options to utils as well
utils.init_options(self.options)
+ # populate TestBonding objects
+ # need to wait until here as we need all_plcs
+ if self.options.bonding:
+ ## allow to pass -g ../2015.03.15--f18 so we can use bash completion
+ self.options.bonding = os.path.basename(self.options.bonding)
+ # this will fail if ../{bonding} has not the right arg- files
+ for spec, test_plc in all_plcs:
+ test_plc.test_bonding = TestBonding (test_plc,
+ onelab_bonding_spec(self.options.bonding),
+ self.options)
+
overall_result = 'SUCCESS'
all_step_infos = []
for step in self.options.steps:
cross = True
all_step_infos.append ( (substep, method, force, cross, qualifier, ) )
except :
- utils.header("********** FAILED step %s (NOT FOUND) -- won't be run" % step)
+ utils.header("********** FAILED step {} (NOT FOUND) -- won't be run".format(step))
traceback.print_exc()
overall_result = 'FAILURE'
# do all steps on all plcs
TIME_FORMAT = "%H-%M-%S"
- TRACE_FORMAT = "TRACE: %(plc_counter)d %(begin)s->%(seconds)ss=%(duration)s " + \
- "status=%(status)s step=%(stepname)s plc=%(plcname)s force=%(force)s\n"
+ TRACE_FORMAT = "TRACE: {plc_counter:d} {begin}->{seconds}s={duration}s " + \
+ "status={status} step={stepname} plc={plcname} force={force}\n"
for stepname, method, force, cross, qualifier in all_step_infos:
plc_counter = 0
for spec, plc_obj in all_plcs:
if self.options.interactive:
prompting = True
while prompting:
- msg="%d Run step %s on %s [r](un)/d(ry_run)/p(roceed)/s(kip)/q(uit) ? " % \
- (plc_counter,stepname,plcname)
- answer = raw_input(msg).strip().lower() or "r"
+ msg="{:d} Run step {} on {} [r](un)/d(ry_run)/p(roceed)/s(kip)/q(uit) ? "\
+ .format(plc_counter, stepname, plcname)
+ answer = input(msg).strip().lower() or "r"
answer = answer[0]
if answer in ['s','n']: # skip/no/next
- print '%s on %s skipped' % (stepname, plcname)
+ print('{} on {} skipped'.format(stepname, plcname))
prompting = False
skip_step = True
elif answer in ['q','b']: # quit/bye
- print 'Exiting'
+ print('Exiting')
return 'FAILURE'
elif answer in ['d']: # dry_run
dry_run = self.options.dry_run
step_result=method(plc_obj)
else:
step_result=method(plc_obj, across_plcs)
- print 'dry_run step ->', step_result
+ print('dry_run step ->', step_result)
self.options.dry_run = dry_run
plc_obj.options.dry_run = dry_run
plc_obj.apiserver.set_dry_run(dry_run)
try:
force_msg = ""
if force and spec['failed_step']:
- force_msg=" (forced after %s has failed)" % spec['failed_step']
- utils.header("********** %d RUNNING step %s%s on plc %s" % \
- (plc_counter, stepname, force_msg, plcname))
+ force_msg=" (forced after {} has failed)".format(spec['failed_step'])
+ utils.header("********** {:d} RUNNING step {}{} on plc {}"\
+ .format(plc_counter, stepname, force_msg, plcname))
if not cross:
step_result = method(plc_obj)
else:
# do not overwrite if FAILURE
if overall_result == 'SUCCESS':
overall_result = 'IGNORED'
- utils.header('********** %d IGNORED (%s) step %s on %s' % \
- (plc_counter, msg, stepname, plcname))
- status="%s[I]" % msg
+ utils.header('********** {} IGNORED ({}) step {} on {}'\
+ .format(plc_counter, msg, stepname, plcname))
+ status="{}[I]".format(msg)
elif step_result:
- utils.header('********** %d SUCCESSFUL step %s on %s' % \
- (plc_counter, stepname, plcname))
+ utils.header('********** {:d} SUCCESSFUL step {} on {}'\
+ .format(plc_counter, stepname, plcname))
status = "OK"
else:
overall_result = 'FAILURE'
spec['failed_step'] = stepname
- utils.header('********** %d FAILED step %s on %s (discarded from further steps)' % \
- (plc_counter, stepname, plcname))
+ utils.header('********** {:d} FAILED step {} on {} (discarded from further steps)'\
+ .format(plc_counter, stepname, plcname))
status = "KO"
except:
overall_result = 'FAILURE'
spec['failed_step'] = stepname
traceback.print_exc()
- utils.header ('********** %d FAILED (exception) step %s on %s (discarded from further steps)' % \
- (plc_counter, stepname, plcname))
+ utils.header ('********** {} FAILED (exception) step {} on {} (discarded from further steps)'\
+ .format(plc_counter, stepname, plcname))
status = "KO"
# do not run, just display it's skipped
else:
- why = "has failed %s" % spec['failed_step']
- utils.header("********** %d SKIPPED Step %s on %s (%s)" % \
- (plc_counter, stepname, plcname, why))
+ why = "has failed {}".format(spec['failed_step'])
+ utils.header("********** {} SKIPPED Step {} on {} ({})"\
+ .format(plc_counter, stepname, plcname, why))
status = "UNDEF"
if not self.options.dry_run:
delay = datetime.now()-beg_time
seconds = int(delay.total_seconds())
duration = str(delay)
# always do this on stdout
- print TRACE_FORMAT % locals()
+ print(TRACE_FORMAT.format(**locals()))
# duplicate on trace_file if provided
if self.options.trace_file:
- trace.write(TRACE_FORMAT % locals())
+ trace.write(TRACE_FORMAT.format(**locals()))
trace.flush()
if self.options.trace_file and not self.options.dry_run:
else:
return 1
except SystemExit:
- print 'Caught SystemExit'
+ print('Caught SystemExit')
return 3
except:
traceback.print_exc()
if __name__ == "__main__":
exit_code = TestMain().main()
- print "TestMain exit code", exit_code
+ print("TestMain exit code", exit_code)
sys.exit(exit_code)