+++ /dev/null
-#!/usr/bin/env python
-
-# DHARMA Project
-# Copyright (C) 2003-2004 Yun Mao, University of Pennsylvania
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of version 2.1 of the GNU Lesser General Public
-# License as published by the Free Software Foundation.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-
-
-"""
-vxargs: Visualized xargs with redirected output
-
-"""
-version = "0.3.1"
-import os, sys, time, signal, errno
-import curses, random
-import getopt
-
-update_rate = 1
-
-final_stats = {}
-gsl = None
-stopping = 0
-
-def getListFromFile(f):
- """I'll ignore the line starting with #
-
- @param f: file object of the host list file
- @return: a list of hostnames (or IPs)
- """
- hostlist = []
- for line in f:
- if line[0]!='#':
- if line.strip():
- hostlist.append([line.strip(),''])
- elif hostlist and hostlist[-1][1]=='':
- hostlist[-1][1] = line.strip()[1:]
- return hostlist
-
-def get_last_line(fn):
- #equ to tail -n1 fn
- try:
- lines = open(fn,'r').readlines()
- if len(lines)>0:
- return (0, lines[-1].strip())
- except IOError:
- pass
- return (1,'')
-
-class Slot:
- def __init__(self, outdir, num, screen, timeout, name, count):
- self.outdir = outdir
- self.slotnum = num
- self.screen = screen
- self.comment = ""
- self.startTime = time.time()
- self.timeout = timeout
- self.name = name
- self.count = count
-
- def drawLine(self, comment='', done = False):
- if self.screen is None: return
- if comment == '':
- comment = self.comment
- else:
- self.comment = comment
- stdscr = self.screen
- elapsed = time.time()-self.startTime
- try:
- y,x = stdscr.getmaxyx()
- spaces = ' '*x
- stdscr.addstr(self.slotnum+2, 0, spaces) #title occupies two lines
- if done:
- stdscr.addstr(self.slotnum+2,0, comment[:x])
- else:
- #construct the string
- output = "(%3ds)%3d: %s " % ( round(elapsed), self.count, self.name )
- spaceleft = x - len(output)
- if self.outdir and spaceleft>1:
- outfn = os.path.join(self.outdir, '%s.out' % self.name)
- errfn = os.path.join(self.outdir, '%s.err' % self.name)
- lout = get_last_line(outfn)
- lerr = get_last_line(errfn)
- if lerr[0]==0 and lerr[1]:
- output += lerr[1]
- elif lout[0]==0 and lout[1]:
- output += lout[1]
- else:
- output += comment
- else:
- output += comment
- stdscr.addstr(self.slotnum+2, 0, output[:x] )
- stdscr.refresh()
- except curses.error: #some of them will be out of screen, ignore it
- pass
- def update(self, pid):
- self.drawLine()
- if self.timeout >0:
- self.kill(pid)
-
- def kill(self, pid):
- overtime = time.time()-self.startTime - self.timeout
- try:
- if overtime > 3: #expired more than 3 seconds, send -9
- os.kill(-pid, signal.SIGKILL)
- elif overtime > 2: #expired more than 2 seconds, send -15
- os.kill(-pid, signal.SIGTERM)
- elif overtime >= 0:
- os.kill(-pid, signal.SIGINT)
- except OSError, e:
- if e.errno != errno.ESRCH: # No such process
- raise e
-
- def stop(self, pid):
- """stop current pid b/c we caught SIGINT twice
- """
- self.startTime = time.time() - self.timeout
- self.kill(pid)
-
-class Slots:
- pids = {}
- def __init__(self, max, screen, timeout, outdir):
- self.maxChild = max
- self.slots = range(self.maxChild)
- self.screen = screen
- self.t = timeout
- self.outdir = outdir
-
- def getSlot(self, name, count):
- if not self.slots:
- #it's empty, wait until other jobs finish
- slot = self.waitJobs().slotnum
- else:
- slot = self.slots[0]
- self.slots.remove(slot)
- return Slot(self.outdir, slot, self.screen, self.t, name, count)
-
- def mapPID(self, pid, slot):
- """@param slot: slot object
- """
- self.pids[pid] = slot
-
- def waitJobs(self):
- while 1:
- try:
- pid, status = os.wait()
- break
- except OSError, e:
- if e.errno == errno.ECHILD: #no child processes
- raise RuntimeError('no child processes when waiting')
- slot = self.pids[pid]
- if self.outdir:
- open(os.path.join(self.outdir, '%s.status' % slot.name),'w').write('%d' % (status>>8))
- if (status & 0xFF) !=0:
- open(os.path.join(self.outdir, 'killed_list'),'a').write('%s\n' % (slot.name))
- if status >>8:
- open(os.path.join(self.outdir, 'abnormal_list'),'a').write('%s\n' % (slot.name))
- del self.pids[pid]
- s = status >> 8
- if final_stats.has_key(s):
- final_stats[s]+= 1
- else:
- final_stats[s]=1
- return slot
- def update(self):
- for k,v in self.pids.items():
- v.update(k)
- def timeout(self):
- self.update()
- signal.alarm(update_rate)
-
- def drawTitle(self, stuff):
- if self.screen:
- y,x = self.screen.getmaxyx()
- spaces = ' '*(x*2)
- self.screen.addstr(0,0, spaces)
- self.screen.addstr(0,0, stuff[:x*2])
- self.screen.refresh()
- else:
- print stuff
- def stop(self):
- if stopping ==1:
- msg = 'Stopping -- Waiting current jobs done. Press Ctrl-C again to kill current jobs.'
- else:
- msg = 'Stopping -- Killing current jobs'
- self.drawTitle(msg)
- if stopping >1:
- for k,v in self.pids.items():
- v.stop(k)
- return
-
-def handler(signum, frame_unused):
- global gsl
- if signum==signal.SIGALRM:
- gsl.timeout()
- if signum==signal.SIGINT:
- global stopping
- stopping += 1
- gsl.stop()
-
-def generateCommands(cmd_line, args):
- return [per_arg.replace('{}', args[0]) for per_arg in cmd_line]
-
-def spawn(cmdline, outfn, errfn, setpgrp = False):
- """A cleverer spawn that lets you redirect stdout and stderr to
- outfn and errfn. Returns pid of child.
- You can't do this with os.spawn, sadly.
- """
- pid = os.fork()
- if pid==0: #child
- out = open(outfn, 'w')
- os.dup2(out.fileno() ,sys.stdout.fileno())
- err = open(errfn, 'w')
- os.dup2(err.fileno(), sys.stderr.fileno())
- if setpgrp:
- os.setpgrp()
- try:
- os.execvp(cmdline[0], cmdline)
- except OSError,e:
- print >> sys.stderr, "error before execution:",e
- sys.exit(255)
- #father process
- return pid
-
-def start(win, max_child, hlist, outdir, randomize, command_line, timeout):
-
- total = len(hlist)
-
- if randomize:
- random.shuffle(hlist)
-
- signal.signal(signal.SIGALRM, handler)
- signal.signal(signal.SIGINT, handler)
- signal.alarm(update_rate)
-
- sl = Slots(max_child, win, timeout, outdir)
- global gsl
- global stopping
- gsl = sl
- count = 0
- for i in hlist:
- slot = sl.getSlot(i[0], count)
- if stopping>0:
- slot.drawLine('Done', done=True)
- break
-
- count += 1
- slot.drawLine(i[1])
- x = generateCommands(command_line, i)
-
- sl.drawTitle("%d/%d:%s" %(count, total,' '.join(x)))
-
- outpath = '/dev/null'
- errpath = '/dev/null'
- if outdir:
- outpath = os.path.join(outdir, '%s.out'%i[0])
- errpath = os.path.join(outdir, '%s.err'%i[0])
-
- pid = spawn(x, outpath, errpath, setpgrp = True)
- sl.mapPID(pid, slot)
-
- while sl.pids:
- try:
- slot = sl.waitJobs()
- except RuntimeError:
- print >> sys.stderr, 'Warning: lost tracking of %d jobs' % len(sl.pids)
- return
- slot.drawLine('Done', done = True) #Done
-
-def get_output(outdir, argument_list, out= True, err=False, status=False):
- """
-
- For post processing the output dir.
-
- @param out: decide whether to process *.out files
- @param err: decide whether to process *.err files
- @param status: decide whether to process *.status files
-
- @return: (out, err, status): out is a hash table, in which the
- keys are the arguments, and the values are the string of the
- output, if available. err is similar. the values of hash table
- status is the value of exit status in int.
-
- """
- if not out and not err and not status:
- raise RuntimeError("one of out, err and status has to be True")
-
- result = ({},{},{})
- mapping = ('out','err','status')
- p = []
- if out: p.append(0)
- if err: p.append(1)
- if status: p.append(2)
- for arg in argument_list:
- basefn = os.path.join(outdir, arg)
- for i in p:
- fn = '.'.join( (basefn, mapping[i]) ) #basefn.ext
- try:
- lines = open(fn).readlines()
- result[i][arg]=''.join(lines)
- except IOError:
- pass
- if not status: return result
- int_status = {}
- for k,v in result[2].items():
- try:
- int_status[k] = int(v.strip())
- except ValueError:
- pass
- return result[0], result[1], int_status
-
-def main():
- options = 'hP:ra:o:yt:pn'
- long_opts = ['help','max-procs=','randomize','args=','output=','noprompt','timeout=','plain', 'version','no-exec']
- try:
- opts,args = getopt.getopt(sys.argv[1:], options,long_opts)
- except getopt.GetoptError:
- print "Unknown options"
- usage()
- sys.exit(1)
- #set default values
- ask_prompt = True
- maxchild = 30
- randomize = False
- hostfile = sys.stdin
- outdir = ''
- timeout = 0
- plain = False
- no_exec = False
- if os.environ.has_key('VXARGS_OUTDIR'):
- outdir = os.environ['VXARGS_OUTDIR']
- for o,a in opts:
- if o in ['--version']:
- print "vxargs version",version
- print "Copyright (c) 2004 Yun Mao (maoy@cis.upenn.edu)"
- print "Freely distributed under GNU LGPL License"
- sys.exit(1)
- elif o in ['-h','--help']:
- usage()
- sys.exit(1)
- elif o in ['-r','--randomize']:
- randomize = True
- elif o in ['-P','--max-procs']:
- maxchild = int(a)
- elif o in ['-a','--args']:
- try:
- hostfile = open(a,'r')
- except IOError, e:
- print "argument file %s has error: %s" % ( a, str(e) )
- sys.exit(3)
- elif o in ['-o','--output']:
- outdir = a
- if a =='/dev/null': outdir = ''
- elif o in ['-y','--noprompt']:
- ask_prompt = False
- elif o in ['-t','--timeout']:
- timeout = int(a)
- elif o in ['-p','--plain']:
- plain = True
- elif o in ['-n','--no-exec']:
- no_exec = True
- else:
- print 'Unknown options'
- usage()
- sys.exit(1)
- if len(args)<1:
- print "No command given."
- usage()
- sys.exit(1)
- #now test outdir
- if outdir:
- if os.path.exists(outdir):
- if not os.path.isdir(outdir):
- print "%s exists and is not a dir, won't continue" % outdir
- sys.exit(3)
- elif no_exec:
- print "%s is the destination dir and would be destroyed." % (outdir)
- elif ask_prompt:
- if hostfile == sys.stdin:
- print "You must specify --noprompt (-y) option if no --args (-a) or --no-exec (-n) is given. Doing so will destroy folder %s." % (outdir)
- sys.exit(3)
- else:
- result = raw_input("%s exists. Continue will destroy everything in it. Are you sure? (y/n) " % (outdir))
- if result not in ['y','Y']:
- sys.exit(3)
- os.system('rm -f %s' % (os.path.join(outdir,'*')))
- else:
- if not no_exec:
- os.system('mkdir -p %s' % outdir)
-
- hlist = getListFromFile(hostfile)
- if no_exec:
- for i in hlist:
- real_cmdline = generateCommands(args, i)
- print ' '.join(real_cmdline)
- sys.exit(0)
-
- if plain: # no fancy output
- return start(None, maxchild, hlist, outdir, randomize, args, timeout)
- else:
- # use fancy curses-based animation
- try:
- curses.wrapper(start, maxchild, hlist, outdir, randomize, args, timeout)
- except curses.error:
- sys.exit(4)
- #post execution, output some stats
- total = 0
- for k,v in final_stats.items():
- print "exit code %d: %d job(s)" % (k,v)
- total += v
- print "total number of jobs:", total
-def usage():
- print """\
-NAME
-
- vxargs - build and execute command lines from an argument list file
- with visualization and parallelism, and output redirection.
-
-DESCRIPTION
-
- vxargs reads a list of arguments from a txt file or standard input,
- delimited by newlines, and executes the command one or more times
- with initial arguments in which {} is substituted by the arguments
- read from the file or standard input. The current executing commands
- and progress will be dynamically updated on the screen. Stdout and
- stderr of each command will be redirected to separate files. A list
- of all processes with a non-zero exit status is generated in file
- abnormal_list. A list of all timeout processes is generated in file
- killed_list.
-
-SYNOPSIS
-
- vxargs [OPTIONS] command [initial-arguments]
-
-OPTIONS
-
- --help
- Print a summary of the options to vxargs and exit.
-
- --max-procs=max-procs, -P max-procs
- Run up to max-procs processes at a time; the default is 30.
-
- --randomize, -r [OPTIONAL]
- Randomize the host list before all execution.
-
- --args=filename, -a filename
- The arguments file. If unspecified, the arguments will be read
- from standard input, and -y option must be specified.
-
- --output=outdir, -o outdir
- output directory for stdout and stderr files
- The default value is specified by the environment variable VXARGS_OUTDIR.
- If it is unspecified, both stdout and stderr will be redirected
- to /dev/null.
- Note that if the directory existed before execution, everything
- inside will be wiped.
-
- --timeout=timeout, -t timeout
- The maximal time in second for each command to execute. timeout=0
- means infinite. 0 (i.e. infinite) is the default value. When the time is up,
- vxargs will send signal SIGINT to the process. If the process does not
- stop after 2 seconds, vxargs will send SIGTERM signal, and send SIGKILL
- if it still keeps running after 3 seconds.
-
- --noprompt, -y
- Wipe out the outdir without confirmation.
-
- --no-exec, -n
- Print the commands that would be executed, but do not execute them.
-
- --plain, -p
- Don't use curses-based output, but plain output to stdout
- instead. It will be less exciting, but will do the same job
- effectively. It is useful if one wants to start vxargs from cron
- or by another program that doesn't want to see the output.
- By default, vxargs uses the curses-based output.
-
- --version
- Display current version and copyright information.
-
-EXAMPLES:
- Suppose the iplist.txt file has following content:
-$ cat iplist.txt
-216.165.109.79
-#planetx.scs.cs.nyu.edu
-158.130.6.254
-#planetlab1.cis.upenn.edu
-158.130.6.253
-#planetlab2.cis.upenn.edu
-128.232.103.203
-#planetlab3.xeno.cl.cam.ac.uk
-
-Note that lines starting with '#' will be interpreted as comment for
-the previous lines, which is optional, for visualization purpose only.
-
-$ vxargs -a iplist.txt -o /tmp/result -P 10 ssh upenn_dharma@{} "hostname;uptime"
-
-...[ UI output]...
-
-$ cat /tmp/result/*
-planetlab3.xeno.cl.cam.ac.uk
- 03:13:21 up 4 days, 14:36, 0 users, load average: 0.36, 0.44, 0.44
-planetlab2.cis.upenn.edu
- 03:13:20 up 26 days, 16:19, 0 users, load average: 8.11, 7.41, 7.41
-planetlab1.cis.upenn.edu
- 03:13:19 up 22 days, 20:02, 0 users, load average: 13.60, 12.55, 12.59
-ssh: connect to host 216.165.109.79 port 22: Connection timed out
-$
-
-other examples:
-cat iplist.txt | vxargs -o /tmp/result rsync -az -e ssh --delete mirror $SLICE@{}:
-
-vxargs -a iplist.txt -o /tmp/result ssh {} killall -9 java
-
-For more information, please visit http://dharma.cis.upenn.edu/planetlab/vxargs/
-"""
-if __name__=='__main__':
- main()
-