X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;ds=sidebyside;f=tools%2Fautomate%2Fvxargs.py;fp=tools%2Fautomate%2Fvxargs.py;h=fea9e370ef4736d351a353a9e03c38bb0ac4c768;hb=e637272100e8e03884188cb2118b21158e739bb0;hp=0000000000000000000000000000000000000000;hpb=328ee7b92f4e23570d8a33ad9244413ae3aee6bf;p=monitor.git diff --git a/tools/automate/vxargs.py b/tools/automate/vxargs.py new file mode 100755 index 0000000..fea9e37 --- /dev/null +++ b/tools/automate/vxargs.py @@ -0,0 +1,529 @@ +#!/usr/bin/env python + +# DHARMA Project +# Copyright (C) 2003-2004 Yun Mao, University of Pennsylvania +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of version 2.1 of the GNU Lesser General Public +# License as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + +""" +vxargs: Visualized xargs with redirected output + +""" +version = "0.3.1" +import os, sys, time, signal, errno +import curses, random +import getopt + +update_rate = 1 + +final_stats = {} +gsl = None +stopping = 0 + +def getListFromFile(f): + """I'll ignore the line starting with # + + @param f: file object of the host list file + @return: a list of hostnames (or IPs) + """ + hostlist = [] + for line in f: + if line[0]!='#': + if line.strip(): + hostlist.append([line.strip(),'']) + elif hostlist and hostlist[-1][1]=='': + hostlist[-1][1] = line.strip()[1:] + return hostlist + +def get_last_line(fn): + #equ to tail -n1 fn + try: + lines = open(fn,'r').readlines() + if len(lines)>0: + return (0, lines[-1].strip()) + except IOError: + pass + return (1,'') + +class Slot: + def __init__(self, outdir, num, screen, timeout, name, count): + self.outdir = outdir + self.slotnum = num + self.screen = screen + self.comment = "" + self.startTime = time.time() + self.timeout = timeout + self.name = name + self.count = count + + def drawLine(self, comment='', done = False): + if self.screen is None: return + if comment == '': + comment = self.comment + else: + self.comment = comment + stdscr = self.screen + elapsed = time.time()-self.startTime + try: + y,x = stdscr.getmaxyx() + spaces = ' '*x + stdscr.addstr(self.slotnum+2, 0, spaces) #title occupies two lines + if done: + stdscr.addstr(self.slotnum+2,0, comment[:x]) + else: + #construct the string + output = "(%3ds)%3d: %s " % ( round(elapsed), self.count, self.name ) + spaceleft = x - len(output) + if self.outdir and spaceleft>1: + outfn = os.path.join(self.outdir, '%s.out' % self.name) + errfn = os.path.join(self.outdir, '%s.err' % self.name) + lout = get_last_line(outfn) + lerr = get_last_line(errfn) + if lerr[0]==0 and lerr[1]: + output += lerr[1] + elif lout[0]==0 and lout[1]: + output += lout[1] + else: + output += comment + else: + output += comment + stdscr.addstr(self.slotnum+2, 0, output[:x] ) + stdscr.refresh() + except curses.error: #some of them will be out of screen, ignore it + pass + def update(self, pid): + self.drawLine() + if self.timeout >0: + self.kill(pid) + + def kill(self, pid): + overtime = time.time()-self.startTime - self.timeout + try: + if overtime > 3: #expired more than 3 seconds, send -9 + os.kill(-pid, signal.SIGKILL) + elif overtime > 2: #expired more than 2 seconds, send -15 + os.kill(-pid, signal.SIGTERM) + elif overtime >= 0: + os.kill(-pid, signal.SIGINT) + except OSError, e: + if e.errno != errno.ESRCH: # No such process + raise e + + def stop(self, pid): + """stop current pid b/c we caught SIGINT twice + """ + self.startTime = time.time() - self.timeout + self.kill(pid) + +class Slots: + pids = {} + def __init__(self, max, screen, timeout, outdir): + self.maxChild = max + self.slots = range(self.maxChild) + self.screen = screen + self.t = timeout + self.outdir = outdir + + def getSlot(self, name, count): + if not self.slots: + #it's empty, wait until other jobs finish + slot = self.waitJobs().slotnum + else: + slot = self.slots[0] + self.slots.remove(slot) + return Slot(self.outdir, slot, self.screen, self.t, name, count) + + def mapPID(self, pid, slot): + """@param slot: slot object + """ + self.pids[pid] = slot + + def waitJobs(self): + while 1: + try: + pid, status = os.wait() + break + except OSError, e: + if e.errno == errno.ECHILD: #no child processes + raise RuntimeError('no child processes when waiting') + slot = self.pids[pid] + if self.outdir: + open(os.path.join(self.outdir, '%s.status' % slot.name),'w').write('%d' % (status>>8)) + if (status & 0xFF) !=0: + open(os.path.join(self.outdir, 'killed_list'),'a').write('%s\n' % (slot.name)) + if status >>8: + open(os.path.join(self.outdir, 'abnormal_list'),'a').write('%s\n' % (slot.name)) + del self.pids[pid] + s = status >> 8 + if final_stats.has_key(s): + final_stats[s]+= 1 + else: + final_stats[s]=1 + return slot + def update(self): + for k,v in self.pids.items(): + v.update(k) + def timeout(self): + self.update() + signal.alarm(update_rate) + + def drawTitle(self, stuff): + if self.screen: + y,x = self.screen.getmaxyx() + spaces = ' '*(x*2) + self.screen.addstr(0,0, spaces) + self.screen.addstr(0,0, stuff[:x*2]) + self.screen.refresh() + else: + print stuff + def stop(self): + if stopping ==1: + msg = 'Stopping -- Waiting current jobs done. Press Ctrl-C again to kill current jobs.' + else: + msg = 'Stopping -- Killing current jobs' + self.drawTitle(msg) + if stopping >1: + for k,v in self.pids.items(): + v.stop(k) + return + +def handler(signum, frame_unused): + global gsl + if signum==signal.SIGALRM: + gsl.timeout() + if signum==signal.SIGINT: + global stopping + stopping += 1 + gsl.stop() + +def generateCommands(cmd_line, args): + return [per_arg.replace('{}', args[0]) for per_arg in cmd_line] + +def spawn(cmdline, outfn, errfn, setpgrp = False): + """A cleverer spawn that lets you redirect stdout and stderr to + outfn and errfn. Returns pid of child. + You can't do this with os.spawn, sadly. + """ + pid = os.fork() + if pid==0: #child + out = open(outfn, 'w') + os.dup2(out.fileno() ,sys.stdout.fileno()) + err = open(errfn, 'w') + os.dup2(err.fileno(), sys.stderr.fileno()) + if setpgrp: + os.setpgrp() + try: + os.execvp(cmdline[0], cmdline) + except OSError,e: + print >> sys.stderr, "error before execution:",e + sys.exit(255) + #father process + return pid + +def start(win, max_child, hlist, outdir, randomize, command_line, timeout): + + total = len(hlist) + + if randomize: + random.shuffle(hlist) + + signal.signal(signal.SIGALRM, handler) + signal.signal(signal.SIGINT, handler) + signal.alarm(update_rate) + + sl = Slots(max_child, win, timeout, outdir) + global gsl + global stopping + gsl = sl + count = 0 + for i in hlist: + slot = sl.getSlot(i[0], count) + if stopping>0: + slot.drawLine('Done', done=True) + break + + count += 1 + slot.drawLine(i[1]) + x = generateCommands(command_line, i) + + sl.drawTitle("%d/%d:%s" %(count, total,' '.join(x))) + + outpath = '/dev/null' + errpath = '/dev/null' + if outdir: + outpath = os.path.join(outdir, '%s.out'%i[0]) + errpath = os.path.join(outdir, '%s.err'%i[0]) + + pid = spawn(x, outpath, errpath, setpgrp = True) + sl.mapPID(pid, slot) + + while sl.pids: + try: + slot = sl.waitJobs() + except RuntimeError: + print >> sys.stderr, 'Warning: lost tracking of %d jobs' % len(sl.pids) + return + slot.drawLine('Done', done = True) #Done + +def get_output(outdir, argument_list, out= True, err=False, status=False): + """ + + For post processing the output dir. + + @param out: decide whether to process *.out files + @param err: decide whether to process *.err files + @param status: decide whether to process *.status files + + @return: (out, err, status): out is a hash table, in which the + keys are the arguments, and the values are the string of the + output, if available. err is similar. the values of hash table + status is the value of exit status in int. + + """ + if not out and not err and not status: + raise RuntimeError("one of out, err and status has to be True") + + result = ({},{},{}) + mapping = ('out','err','status') + p = [] + if out: p.append(0) + if err: p.append(1) + if status: p.append(2) + for arg in argument_list: + basefn = os.path.join(outdir, arg) + for i in p: + fn = '.'.join( (basefn, mapping[i]) ) #basefn.ext + try: + lines = open(fn).readlines() + result[i][arg]=''.join(lines) + except IOError: + pass + if not status: return result + int_status = {} + for k,v in result[2].items(): + try: + int_status[k] = int(v.strip()) + except ValueError: + pass + return result[0], result[1], int_status + +def main(): + options = 'hP:ra:o:yt:pn' + long_opts = ['help','max-procs=','randomize','args=','output=','noprompt','timeout=','plain', 'version','no-exec'] + try: + opts,args = getopt.getopt(sys.argv[1:], options,long_opts) + except getopt.GetoptError: + print "Unknown options" + usage() + sys.exit(1) + #set default values + ask_prompt = True + maxchild = 30 + randomize = False + hostfile = sys.stdin + outdir = '' + timeout = 0 + plain = False + no_exec = False + if os.environ.has_key('VXARGS_OUTDIR'): + outdir = os.environ['VXARGS_OUTDIR'] + for o,a in opts: + if o in ['--version']: + print "vxargs version",version + print "Copyright (c) 2004 Yun Mao (maoy@cis.upenn.edu)" + print "Freely distributed under GNU LGPL License" + sys.exit(1) + elif o in ['-h','--help']: + usage() + sys.exit(1) + elif o in ['-r','--randomize']: + randomize = True + elif o in ['-P','--max-procs']: + maxchild = int(a) + elif o in ['-a','--args']: + try: + hostfile = open(a,'r') + except IOError, e: + print "argument file %s has error: %s" % ( a, str(e) ) + sys.exit(3) + elif o in ['-o','--output']: + outdir = a + if a =='/dev/null': outdir = '' + elif o in ['-y','--noprompt']: + ask_prompt = False + elif o in ['-t','--timeout']: + timeout = int(a) + elif o in ['-p','--plain']: + plain = True + elif o in ['-n','--no-exec']: + no_exec = True + else: + print 'Unknown options' + usage() + sys.exit(1) + if len(args)<1: + print "No command given." + usage() + sys.exit(1) + #now test outdir + if outdir: + if os.path.exists(outdir): + if not os.path.isdir(outdir): + print "%s exists and is not a dir, won't continue" % outdir + sys.exit(3) + elif no_exec: + print "%s is the destination dir and would be destroyed." % (outdir) + elif ask_prompt: + if hostfile == sys.stdin: + print "You must specify --noprompt (-y) option if no --args (-a) or --no-exec (-n) is given. Doing so will destroy folder %s." % (outdir) + sys.exit(3) + else: + result = raw_input("%s exists. Continue will destroy everything in it. Are you sure? (y/n) " % (outdir)) + if result not in ['y','Y']: + sys.exit(3) + os.system('rm -f %s' % (os.path.join(outdir,'*'))) + else: + if not no_exec: + os.system('mkdir -p %s' % outdir) + + hlist = getListFromFile(hostfile) + if no_exec: + for i in hlist: + real_cmdline = generateCommands(args, i) + print ' '.join(real_cmdline) + sys.exit(0) + + if plain: # no fancy output + return start(None, maxchild, hlist, outdir, randomize, args, timeout) + else: + # use fancy curses-based animation + try: + curses.wrapper(start, maxchild, hlist, outdir, randomize, args, timeout) + except curses.error: + sys.exit(4) + #post execution, output some stats + total = 0 + for k,v in final_stats.items(): + print "exit code %d: %d job(s)" % (k,v) + total += v + print "total number of jobs:", total +def usage(): + print """\ +NAME + + vxargs - build and execute command lines from an argument list file + with visualization and parallelism, and output redirection. + +DESCRIPTION + + vxargs reads a list of arguments from a txt file or standard input, + delimited by newlines, and executes the command one or more times + with initial arguments in which {} is substituted by the arguments + read from the file or standard input. The current executing commands + and progress will be dynamically updated on the screen. Stdout and + stderr of each command will be redirected to separate files. A list + of all processes with a non-zero exit status is generated in file + abnormal_list. A list of all timeout processes is generated in file + killed_list. + +SYNOPSIS + + vxargs [OPTIONS] command [initial-arguments] + +OPTIONS + + --help + Print a summary of the options to vxargs and exit. + + --max-procs=max-procs, -P max-procs + Run up to max-procs processes at a time; the default is 30. + + --randomize, -r [OPTIONAL] + Randomize the host list before all execution. + + --args=filename, -a filename + The arguments file. If unspecified, the arguments will be read + from standard input, and -y option must be specified. + + --output=outdir, -o outdir + output directory for stdout and stderr files + The default value is specified by the environment variable VXARGS_OUTDIR. + If it is unspecified, both stdout and stderr will be redirected + to /dev/null. + Note that if the directory existed before execution, everything + inside will be wiped. + + --timeout=timeout, -t timeout + The maximal time in second for each command to execute. timeout=0 + means infinite. 0 (i.e. infinite) is the default value. When the time is up, + vxargs will send signal SIGINT to the process. If the process does not + stop after 2 seconds, vxargs will send SIGTERM signal, and send SIGKILL + if it still keeps running after 3 seconds. + + --noprompt, -y + Wipe out the outdir without confirmation. + + --no-exec, -n + Print the commands that would be executed, but do not execute them. + + --plain, -p + Don't use curses-based output, but plain output to stdout + instead. It will be less exciting, but will do the same job + effectively. It is useful if one wants to start vxargs from cron + or by another program that doesn't want to see the output. + By default, vxargs uses the curses-based output. + + --version + Display current version and copyright information. + +EXAMPLES: + Suppose the iplist.txt file has following content: +$ cat iplist.txt +216.165.109.79 +#planetx.scs.cs.nyu.edu +158.130.6.254 +#planetlab1.cis.upenn.edu +158.130.6.253 +#planetlab2.cis.upenn.edu +128.232.103.203 +#planetlab3.xeno.cl.cam.ac.uk + +Note that lines starting with '#' will be interpreted as comment for +the previous lines, which is optional, for visualization purpose only. + +$ vxargs -a iplist.txt -o /tmp/result -P 10 ssh upenn_dharma@{} "hostname;uptime" + +...[ UI output]... + +$ cat /tmp/result/* +planetlab3.xeno.cl.cam.ac.uk + 03:13:21 up 4 days, 14:36, 0 users, load average: 0.36, 0.44, 0.44 +planetlab2.cis.upenn.edu + 03:13:20 up 26 days, 16:19, 0 users, load average: 8.11, 7.41, 7.41 +planetlab1.cis.upenn.edu + 03:13:19 up 22 days, 20:02, 0 users, load average: 13.60, 12.55, 12.59 +ssh: connect to host 216.165.109.79 port 22: Connection timed out +$ + +other examples: +cat iplist.txt | vxargs -o /tmp/result rsync -az -e ssh --delete mirror $SLICE@{}: + +vxargs -a iplist.txt -o /tmp/result ssh {} killall -9 java + +For more information, please visit http://dharma.cis.upenn.edu/planetlab/vxargs/ +""" +if __name__=='__main__': + main() +