X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=system%2FTestMain.py;h=2a6a6fea38389e92862d644bcbd5831e6d6c3372;hb=eeba8e303edbfc5e926767ba22294beda642ecba;hp=01bf6cd5d73ca917bd24ce563fb432ef255ad19e;hpb=65c93a296f2e481a312e7846f518eb2eaa2fc080;p=tests.git diff --git a/system/TestMain.py b/system/TestMain.py index 01bf6cd..2a6a6fe 100755 --- a/system/TestMain.py +++ b/system/TestMain.py @@ -1,4 +1,4 @@ -#!/usr/bin/python -u +#!/usr/bin/python3 -u # Thierry Parmentelat # Copyright (C) 2010 INRIA @@ -39,8 +39,8 @@ class Step: else: try: self.substeps = sequences[self.internal()] - except Exception,e: - print "macro step %s not found in macros.py (%s) - exiting" % (self.display(),e) + except Exception as e: + print("macro step {} not found in macros.py ({}) - exiting".format(self.display(),e)) raise def print_doc (self, level=0): @@ -52,11 +52,11 @@ class Step: width = tab - level - 2 format = "%%-%ds" % width line = start + format % self.display() - print line, + print(line, end=' ') try: - print self.method.__doc__ + print(self.method.__doc__) except: - print "*** no doc found" + print("*** no doc found") else: beg_start = level*' ' + '>>> ' end_start = level*' ' + '<<< ' @@ -66,10 +66,10 @@ class Step: format = "%%-%ds" % width beg_line = beg_start + format % self.display() + trail*'>' end_line = end_start + format % self.display() + trail*'<' - print beg_line + print(beg_line) for step in self.substeps: Step(step).print_doc(level+1) - print end_line + print(end_line) # return a list of (name, method) for all native steps involved def tuples (self): @@ -85,7 +85,7 @@ class Step: # just do a listdir, hoping we're in the right directory... @staticmethod def list_macros (): - names= sequences.keys() + names= list(sequences.keys()) names.sort() return names @@ -98,7 +98,7 @@ class TestMain: default_build_url = "git://git.onelab.eu/tests" def __init__(self): - self.path = os.path.dirname(sys.argv[0]) or "." + self.path = os.path.dirname(sys.argv[0]) or "." os.chdir(self.path) def show_env(self, options, message): @@ -121,11 +121,11 @@ class TestMain: def list_steps(self): if not self.options.verbose: - print self.steps_message + print(self.steps_message) else: # steps mentioned on the command line - if self.options.args: - scopes = [("Argument steps",self.options.args)] + if self.options.steps: + scopes = [("Argument steps",self.options.steps)] else: scopes = [("Default steps", TestPlc.default_steps)] if self.options.all_steps: @@ -133,7 +133,7 @@ class TestMain: # try to list macro steps as well scopes.append ( ("Macro steps", Step.list_macros()) ) for (scope, steps) in scopes: - print '--------------------', scope + print('--------------------', scope) for step in [step for step in steps if TestPlc.valid_step(step)]: try: (step, qualifier) = step.split('@') @@ -149,14 +149,15 @@ class TestMain: arch-rpms-url defaults to the last value used, as stored in arg-arch-rpms-url, no default config defaults to the last value used, as stored in arg-config, - or %r -ips_vnode, ips_vplc and ips_qemu defaults to the last value used, as stored in arg-ips-{bplc,vplc,bnode,vnode}, + or {} +ips_vnode, ips_vplc and ips_qemu defaults to the last value used, + as stored in arg-ips-{{bplc,vplc,bnode,vnode}}, default is to use IP scanning steps refer to a method in TestPlc or to a step_* module run with -l to see a list of available steps === -"""%(TestMain.default_config) +""".format(TestMain.default_config) parser = ArgumentParser(usage = usage) parser.add_argument("-u", "--url", action="store", dest="arch_rpms_url", @@ -216,7 +217,7 @@ run with -l to see a list of available steps def flatten (x): result = [] for el in x: - if hasattr(el, "__iter__") and not isinstance(el, basestring): + if hasattr(el, "__iter__") and not isinstance(el, str): result.extend(flatten(el)) else: result.append(el) @@ -242,29 +243,30 @@ run with -l to see a list of available steps ('pldistro', 'arg-pldistro', "onelab", None), ('fcdistro', 'arg-fcdistro', 'f14', None), ): -# print 'handling',recname +# print('handling', recname) path = filename is_list = isinstance(default, list) is_bool = isinstance(default, bool) if not getattr(self.options, recname): try: - parsed = file(path).readlines() + with open(path) as file: + parsed = file.readlines() if is_list: # lists parsed = [x.strip() for x in parsed] else: # strings and booleans if len(parsed) != 1: - print "%s - error when parsing %s" % (sys.argv[1],path) + print("{} - error when parsing {}".format(sys.argv[1], path)) sys.exit(1) parsed = parsed[0].strip() if is_bool: parsed = parsed.lower() == 'true' setattr(self.options, recname, parsed) - except: + except Exception as e: if default != "": setattr(self.options, recname, default) else: - print "Cannot determine", recname - print "Run %s --help for help" % sys.argv[0] + print("Cannot determine", recname, e) + print("Run {} --help for help".format(sys.argv[0])) sys.exit(1) # save for next run @@ -275,7 +277,7 @@ run with -l to see a list of available steps else: # strings and booleans - just call str() fsave.write(str(getattr(self.options, recname)) + "\n") fsave.close() -# utils.header('Saved %s into %s'%(recname,filename)) +# utils.header('Saved {} into {}'.format(recname, filename)) # lists need be reversed # I suspect this is useful for the various pools but for config, it's painful @@ -283,7 +285,7 @@ run with -l to see a list of available steps getattr(self.options, recname).reverse() if self.options.verbose: - utils.header('* Using %s = %s' % (recname, getattr(self.options, recname))) + utils.header('* Using {} = {}'.format(recname, getattr(self.options, recname))) # hack : if sfa is not among the published rpms, skip these tests TestPlc.check_whether_build_has_sfa(self.options.arch_rpms_url) @@ -341,36 +343,32 @@ run with -l to see a list of available steps all_plc_specs = m.config(all_plc_specs, self.options) except : traceback.print_exc() - print 'Cannot load config %s -- ignored' % modulename + print('Cannot load config {} -- ignored'.format(modulename)) raise # provision on local substrate all_plc_specs = LocalSubstrate.local_substrate.provision(all_plc_specs, self.options) # remember substrate IP address(es) for next run - ips_bplc_file = open('arg-ips-bplc', 'w') - for plc_spec in all_plc_specs: - ips_bplc_file.write("%s\n" % plc_spec['host_box']) - ips_bplc_file.close() - ips_vplc_file = open('arg-ips-vplc', 'w') - for plc_spec in all_plc_specs: - ips_vplc_file.write("%s\n" % plc_spec['settings']['PLC_API_HOST']) - ips_vplc_file.close() + with open('arg-ips-bplc', 'w') as ips_bplc_file: + for plc_spec in all_plc_specs: + ips_bplc_file.write("{}\n".format(plc_spec['host_box'])) + with open('arg-ips-vplc', 'w') as ips_vplc_file: + for plc_spec in all_plc_specs: + ips_vplc_file.write("{}\n".format(plc_spec['settings']['PLC_API_HOST'])) # ditto for nodes - ips_bnode_file = open('arg-ips-bnode', 'w') - for plc_spec in all_plc_specs: - for site_spec in plc_spec['sites']: - for node_spec in site_spec['nodes']: - ips_bnode_file.write("%s\n" % node_spec['host_box']) - ips_bnode_file.close() - ips_vnode_file = open('arg-ips-vnode','w') - for plc_spec in all_plc_specs: - for site_spec in plc_spec['sites']: - for node_spec in site_spec['nodes']: - # back to normal (unqualified) form - stripped = node_spec['node_fields']['hostname'].split('.')[0] - ips_vnode_file.write("%s\n" % stripped) - ips_vnode_file.close() + with open('arg-ips-bnode', 'w') as ips_bnode_file: + for plc_spec in all_plc_specs: + for site_spec in plc_spec['sites']: + for node_spec in site_spec['nodes']: + ips_bnode_file.write("{}\n".format(node_spec['host_box'])) + with open('arg-ips-vnode','w') as ips_vnode_file: + for plc_spec in all_plc_specs: + for site_spec in plc_spec['sites']: + for node_spec in site_spec['nodes']: + # back to normal (unqualified) form + stripped = node_spec['node_fields']['hostname'].split('.')[0] + ips_vnode_file.write("{}\n".format(stripped)) # build a TestPlc object from the result, passing options for spec in all_plc_specs: @@ -418,7 +416,7 @@ run with -l to see a list of available steps cross = True all_step_infos.append ( (substep, method, force, cross, qualifier, ) ) except : - utils.header("********** FAILED step %s (NOT FOUND) -- won't be run" % step) + utils.header("********** FAILED step {} (NOT FOUND) -- won't be run".format(step)) traceback.print_exc() overall_result = 'FAILURE' @@ -435,8 +433,8 @@ run with -l to see a list of available steps # do all steps on all plcs TIME_FORMAT = "%H-%M-%S" - TRACE_FORMAT = "TRACE: %(plc_counter)d %(begin)s->%(seconds)ss=%(duration)s " + \ - "status=%(status)s step=%(stepname)s plc=%(plcname)s force=%(force)s\n" + TRACE_FORMAT = "TRACE: {plc_counter:d} {begin}->{seconds}s={duration}s " + \ + "status={status} step={stepname} plc={plcname} force={force}\n" for stepname, method, force, cross, qualifier in all_step_infos: plc_counter = 0 for spec, plc_obj in all_plcs: @@ -456,16 +454,16 @@ run with -l to see a list of available steps if self.options.interactive: prompting = True while prompting: - msg="%d Run step %s on %s [r](un)/d(ry_run)/p(roceed)/s(kip)/q(uit) ? " % \ - (plc_counter,stepname,plcname) - answer = raw_input(msg).strip().lower() or "r" + msg="{:d} Run step {} on {} [r](un)/d(ry_run)/p(roceed)/s(kip)/q(uit) ? "\ + .format(plc_counter, stepname, plcname) + answer = input(msg).strip().lower() or "r" answer = answer[0] if answer in ['s','n']: # skip/no/next - print '%s on %s skipped' % (stepname, plcname) + print('{} on {} skipped'.format(stepname, plcname)) prompting = False skip_step = True elif answer in ['q','b']: # quit/bye - print 'Exiting' + print('Exiting') return 'FAILURE' elif answer in ['d']: # dry_run dry_run = self.options.dry_run @@ -476,7 +474,7 @@ run with -l to see a list of available steps step_result=method(plc_obj) else: step_result=method(plc_obj, across_plcs) - print 'dry_run step ->', step_result + print('dry_run step ->', step_result) self.options.dry_run = dry_run plc_obj.options.dry_run = dry_run plc_obj.apiserver.set_dry_run(dry_run) @@ -491,9 +489,9 @@ run with -l to see a list of available steps try: force_msg = "" if force and spec['failed_step']: - force_msg=" (forced after %s has failed)" % spec['failed_step'] - utils.header("********** %d RUNNING step %s%s on plc %s" % \ - (plc_counter, stepname, force_msg, plcname)) + force_msg=" (forced after {} has failed)".format(spec['failed_step']) + utils.header("********** {:d} RUNNING step {}{} on plc {}"\ + .format(plc_counter, stepname, force_msg, plcname)) if not cross: step_result = method(plc_obj) else: @@ -507,42 +505,42 @@ run with -l to see a list of available steps # do not overwrite if FAILURE if overall_result == 'SUCCESS': overall_result = 'IGNORED' - utils.header('********** %d IGNORED (%s) step %s on %s' % \ - (plc_counter, msg, stepname, plcname)) - status="%s[I]" % msg + utils.header('********** {} IGNORED ({}) step {} on {}'\ + .format(plc_counter, msg, stepname, plcname)) + status="{}[I]".format(msg) elif step_result: - utils.header('********** %d SUCCESSFUL step %s on %s' % \ - (plc_counter, stepname, plcname)) + utils.header('********** {:d} SUCCESSFUL step {} on {}'\ + .format(plc_counter, stepname, plcname)) status = "OK" else: overall_result = 'FAILURE' spec['failed_step'] = stepname - utils.header('********** %d FAILED step %s on %s (discarded from further steps)' % \ - (plc_counter, stepname, plcname)) + utils.header('********** {:d} FAILED step {} on {} (discarded from further steps)'\ + .format(plc_counter, stepname, plcname)) status = "KO" except: overall_result = 'FAILURE' spec['failed_step'] = stepname traceback.print_exc() - utils.header ('********** %d FAILED (exception) step %s on %s (discarded from further steps)' % \ - (plc_counter, stepname, plcname)) + utils.header ('********** {} FAILED (exception) step {} on {} (discarded from further steps)'\ + .format(plc_counter, stepname, plcname)) status = "KO" # do not run, just display it's skipped else: - why = "has failed %s" % spec['failed_step'] - utils.header("********** %d SKIPPED Step %s on %s (%s)" % \ - (plc_counter, stepname, plcname, why)) + why = "has failed {}".format(spec['failed_step']) + utils.header("********** {} SKIPPED Step {} on {} ({})"\ + .format(plc_counter, stepname, plcname, why)) status = "UNDEF" if not self.options.dry_run: delay = datetime.now()-beg_time seconds = int(delay.total_seconds()) duration = str(delay) # always do this on stdout - print TRACE_FORMAT % locals() + print(TRACE_FORMAT.format(**locals())) # duplicate on trace_file if provided if self.options.trace_file: - trace.write(TRACE_FORMAT % locals()) + trace.write(TRACE_FORMAT.format(**locals())) trace.flush() if self.options.trace_file and not self.options.dry_run: @@ -569,7 +567,7 @@ run with -l to see a list of available steps else: return 1 except SystemExit: - print 'Caught SystemExit' + print('Caught SystemExit') return 3 except: traceback.print_exc() @@ -577,5 +575,5 @@ run with -l to see a list of available steps if __name__ == "__main__": exit_code = TestMain().main() - print "TestMain exit code", exit_code + print("TestMain exit code", exit_code) sys.exit(exit_code)