hope I got the merge wright...
[monitor.git] / tools / automate / vxargs.py
diff --git a/tools/automate/vxargs.py b/tools/automate/vxargs.py
new file mode 100755 (executable)
index 0000000..fea9e37
--- /dev/null
@@ -0,0 +1,529 @@
+#!/usr/bin/env python
+
+# DHARMA Project
+# Copyright (C) 2003-2004 Yun Mao, University of Pennsylvania
+# 
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of version 2.1 of the GNU Lesser General Public
+# License as published by the Free Software Foundation.
+# 
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+# 
+# You should have received a copy of the GNU Lesser General Public
+# License along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+
+"""
+vxargs: Visualized xargs with redirected output
+
+"""
+version = "0.3.1"
+import os, sys, time, signal, errno
+import curses, random
+import getopt
+
+update_rate = 1
+
+final_stats = {}
+gsl = None
+stopping = 0
+
+def getListFromFile(f):
+    """I'll ignore the line starting with #
+
+    @param f: file object of the host list file
+    @return: a list of hostnames (or IPs)
+    """
+    hostlist = []
+    for line in f:
+        if line[0]!='#':
+            if line.strip():
+                hostlist.append([line.strip(),''])
+        elif hostlist and hostlist[-1][1]=='':
+            hostlist[-1][1] = line.strip()[1:]
+    return hostlist
+
+def get_last_line(fn):
+    #equ to tail -n1 fn
+    try:
+        lines = open(fn,'r').readlines()
+        if len(lines)>0:
+            return (0, lines[-1].strip())
+    except IOError:
+        pass
+    return (1,'')
+
+class Slot:
+    def __init__(self, outdir, num, screen, timeout, name, count):
+        self.outdir = outdir
+        self.slotnum = num
+        self.screen = screen
+        self.comment = ""
+        self.startTime = time.time()
+        self.timeout = timeout
+        self.name = name
+        self.count = count
+
+    def drawLine(self, comment='', done = False):
+        if self.screen is None: return
+        if comment == '':
+            comment = self.comment
+        else:
+            self.comment = comment
+        stdscr = self.screen
+        elapsed = time.time()-self.startTime
+        try:
+            y,x = stdscr.getmaxyx()
+            spaces = ' '*x
+            stdscr.addstr(self.slotnum+2, 0, spaces) #title occupies two lines
+            if done:
+                stdscr.addstr(self.slotnum+2,0, comment[:x])
+            else:
+                #construct the string
+                output = "(%3ds)%3d: %s " % ( round(elapsed), self.count, self.name )
+                spaceleft = x - len(output)
+                if self.outdir and spaceleft>1:
+                     outfn = os.path.join(self.outdir, '%s.out' % self.name)
+                     errfn = os.path.join(self.outdir, '%s.err' % self.name)
+                     lout = get_last_line(outfn)
+                     lerr = get_last_line(errfn)
+                     if lerr[0]==0 and lerr[1]:
+                         output += lerr[1]
+                     elif lout[0]==0 and lout[1]:
+                         output += lout[1]
+                     else:
+                         output += comment
+                else:
+                    output += comment
+                stdscr.addstr(self.slotnum+2, 0, output[:x] )
+            stdscr.refresh()
+        except curses.error: #some of them will be out of screen, ignore it
+            pass
+    def update(self, pid):
+        self.drawLine()
+        if self.timeout >0:
+            self.kill(pid)
+
+    def kill(self, pid):
+        overtime = time.time()-self.startTime - self.timeout
+        try:
+            if overtime > 3: #expired more than 3 seconds, send -9
+                os.kill(-pid, signal.SIGKILL)
+            elif overtime > 2: #expired more than 2 seconds, send -15
+                os.kill(-pid, signal.SIGTERM)
+            elif overtime >= 0:
+                os.kill(-pid, signal.SIGINT)
+        except OSError, e:
+            if e.errno != errno.ESRCH: # No such process
+                raise e
+
+    def stop(self, pid):
+        """stop current pid b/c we caught SIGINT twice
+        """
+        self.startTime = time.time() - self.timeout 
+        self.kill(pid)
+    
+class Slots:
+    pids = {}
+    def __init__(self, max, screen, timeout, outdir):
+        self.maxChild = max
+        self.slots = range(self.maxChild)
+        self.screen = screen
+        self.t = timeout
+        self.outdir = outdir
+        
+    def getSlot(self, name, count):
+        if not self.slots:
+            #it's empty, wait until other jobs finish
+            slot =  self.waitJobs().slotnum
+        else:
+            slot = self.slots[0]
+            self.slots.remove(slot)
+        return Slot(self.outdir, slot, self.screen, self.t, name, count)
+    
+    def mapPID(self, pid, slot):
+        """@param slot: slot object
+        """
+        self.pids[pid] = slot
+
+    def waitJobs(self):
+        while 1:
+            try:
+                pid, status = os.wait()
+                break
+            except OSError, e:
+                if e.errno == errno.ECHILD: #no child processes
+                    raise RuntimeError('no child processes when waiting')
+        slot = self.pids[pid]
+        if self.outdir:
+            open(os.path.join(self.outdir, '%s.status' % slot.name),'w').write('%d' % (status>>8))
+            if (status & 0xFF) !=0:
+                open(os.path.join(self.outdir, 'killed_list'),'a').write('%s\n' % (slot.name))
+            if status >>8:
+                open(os.path.join(self.outdir, 'abnormal_list'),'a').write('%s\n' % (slot.name))
+        del self.pids[pid]
+        s = status >> 8
+        if final_stats.has_key(s):
+            final_stats[s]+= 1
+        else:
+            final_stats[s]=1
+        return slot
+    def update(self):
+        for k,v in self.pids.items():
+            v.update(k)
+    def timeout(self):
+        self.update()
+        signal.alarm(update_rate)
+        
+    def drawTitle(self, stuff):
+        if self.screen:
+            y,x = self.screen.getmaxyx()
+            spaces = ' '*(x*2)
+            self.screen.addstr(0,0,  spaces)
+            self.screen.addstr(0,0, stuff[:x*2])
+            self.screen.refresh()
+        else:
+            print stuff
+    def stop(self):
+        if stopping ==1:
+            msg = 'Stopping -- Waiting current jobs done. Press Ctrl-C again to kill current jobs.'
+        else:
+            msg = 'Stopping -- Killing current jobs'
+        self.drawTitle(msg)
+        if stopping >1:
+            for k,v in self.pids.items():
+                v.stop(k)
+        return
+
+def handler(signum, frame_unused):
+    global gsl
+    if signum==signal.SIGALRM:
+        gsl.timeout()
+    if signum==signal.SIGINT:
+        global stopping
+        stopping += 1
+        gsl.stop()
+
+def generateCommands(cmd_line, args):
+    return [per_arg.replace('{}', args[0]) for per_arg in cmd_line]
+        
+def spawn(cmdline, outfn, errfn, setpgrp = False):
+   """A cleverer spawn that lets you redirect stdout and stderr to
+   outfn and errfn.  Returns pid of child.
+   You can't do this with os.spawn, sadly.
+   """
+   pid = os.fork()
+   if pid==0: #child
+       out = open(outfn, 'w')
+       os.dup2(out.fileno() ,sys.stdout.fileno())
+       err = open(errfn, 'w')
+       os.dup2(err.fileno(), sys.stderr.fileno())
+       if setpgrp:
+           os.setpgrp()
+       try:
+           os.execvp(cmdline[0], cmdline)
+       except OSError,e:
+           print >> sys.stderr, "error before execution:",e
+           sys.exit(255)
+   #father process
+   return pid
+
+def start(win, max_child, hlist, outdir, randomize, command_line, timeout):
+
+    total = len(hlist)
+
+    if randomize:
+        random.shuffle(hlist)
+
+    signal.signal(signal.SIGALRM, handler)
+    signal.signal(signal.SIGINT, handler)
+    signal.alarm(update_rate)
+
+    sl = Slots(max_child, win, timeout, outdir)
+    global gsl
+    global stopping
+    gsl = sl
+    count = 0
+    for i in hlist:
+        slot = sl.getSlot(i[0], count)
+        if stopping>0:
+            slot.drawLine('Done', done=True)
+            break
+            
+        count += 1
+        slot.drawLine(i[1])
+        x = generateCommands(command_line, i)
+        
+        sl.drawTitle("%d/%d:%s" %(count, total,' '.join(x)))
+        
+        outpath = '/dev/null'
+        errpath = '/dev/null'
+        if outdir:
+            outpath = os.path.join(outdir, '%s.out'%i[0])
+            errpath = os.path.join(outdir, '%s.err'%i[0])
+
+        pid = spawn(x, outpath, errpath, setpgrp = True)
+        sl.mapPID(pid, slot)
+
+    while sl.pids:
+        try:
+            slot = sl.waitJobs()
+        except RuntimeError:
+            print >> sys.stderr, 'Warning: lost tracking of %d jobs' % len(sl.pids)
+            return
+        slot.drawLine('Done', done = True) #Done
+
+def get_output(outdir, argument_list, out= True, err=False, status=False):
+    """
+
+    For post processing the output dir.
+
+    @param out: decide whether to process *.out files
+    @param err: decide whether to process *.err files
+    @param status: decide whether to process *.status files
+    
+    @return: (out, err, status): out is a hash table, in which the
+    keys are the arguments, and the values are the string of the
+    output, if available. err is similar. the values of hash table
+    status is the value of exit status in int.
+    
+    """
+    if not out and not err and not status:
+        raise RuntimeError("one of out, err and status has to be True")
+    
+    result = ({},{},{})
+    mapping = ('out','err','status')
+    p = []
+    if out: p.append(0)
+    if err: p.append(1)
+    if status: p.append(2)
+    for arg in argument_list:
+        basefn = os.path.join(outdir, arg)
+        for i in p:
+            fn = '.'.join( (basefn, mapping[i]) ) #basefn.ext
+            try:
+                lines = open(fn).readlines()
+                result[i][arg]=''.join(lines)
+            except IOError:
+                pass
+    if not status: return result
+    int_status = {}
+    for k,v in result[2].items():
+        try:
+            int_status[k] = int(v.strip())
+        except ValueError:
+            pass
+    return result[0], result[1], int_status
+
+def main():
+    options = 'hP:ra:o:yt:pn'
+    long_opts = ['help','max-procs=','randomize','args=','output=','noprompt','timeout=','plain', 'version','no-exec']
+    try:
+        opts,args = getopt.getopt(sys.argv[1:], options,long_opts)
+    except getopt.GetoptError:
+        print "Unknown options"
+        usage()
+        sys.exit(1)
+    #set default values
+    ask_prompt = True
+    maxchild = 30
+    randomize = False
+    hostfile = sys.stdin
+    outdir = ''
+    timeout = 0
+    plain = False
+    no_exec = False
+    if os.environ.has_key('VXARGS_OUTDIR'):
+        outdir = os.environ['VXARGS_OUTDIR']
+    for o,a in opts:
+        if o in ['--version']:
+            print "vxargs version",version
+            print "Copyright (c) 2004 Yun Mao (maoy@cis.upenn.edu)"
+            print "Freely distributed under GNU LGPL License"
+            sys.exit(1)
+        elif o in ['-h','--help']:
+            usage()
+            sys.exit(1)
+        elif o in ['-r','--randomize']:
+            randomize = True
+        elif o in ['-P','--max-procs']:
+            maxchild = int(a)
+        elif o in ['-a','--args']:
+            try:
+                hostfile = open(a,'r')
+            except IOError, e:
+                print "argument file %s has error: %s" % ( a, str(e) )
+                sys.exit(3)
+        elif o in ['-o','--output']:
+            outdir = a
+            if a =='/dev/null': outdir = ''
+        elif o in ['-y','--noprompt']:
+            ask_prompt = False
+        elif o in ['-t','--timeout']:
+            timeout = int(a)
+        elif o in ['-p','--plain']:
+            plain = True
+        elif o in ['-n','--no-exec']:
+            no_exec = True
+        else:
+            print 'Unknown options'
+            usage()
+            sys.exit(1)
+    if len(args)<1:
+        print "No command given."
+        usage()
+        sys.exit(1)
+    #now test outdir
+    if outdir:
+        if os.path.exists(outdir):
+            if not os.path.isdir(outdir):
+                print "%s exists and is not a dir, won't continue" % outdir
+                sys.exit(3)
+            elif no_exec:
+                print "%s is the destination dir and would be destroyed." % (outdir)
+            elif ask_prompt:
+                if hostfile == sys.stdin:
+                    print "You must specify --noprompt (-y) option if no --args (-a) or --no-exec (-n) is given. Doing so will destroy folder %s." % (outdir)
+                    sys.exit(3)
+                else:
+                    result = raw_input("%s exists. Continue will destroy everything in it. Are you sure? (y/n) " % (outdir))
+                    if result not in ['y','Y']:
+                        sys.exit(3)
+            os.system('rm -f %s' % (os.path.join(outdir,'*')))
+        else:
+            if not no_exec:
+                os.system('mkdir -p %s' % outdir)
+    
+    hlist = getListFromFile(hostfile)
+    if no_exec:
+        for i in hlist:
+            real_cmdline = generateCommands(args, i)
+            print ' '.join(real_cmdline)
+        sys.exit(0)
+        
+    if plain: # no fancy output
+        return start(None, maxchild, hlist, outdir, randomize, args, timeout)
+    else:
+        # use fancy curses-based animation
+        try:
+            curses.wrapper(start, maxchild, hlist, outdir, randomize, args, timeout)
+        except curses.error:
+            sys.exit(4)
+    #post execution, output some stats
+    total = 0
+    for k,v in final_stats.items():
+        print "exit code %d: %d job(s)" % (k,v)
+        total += v
+    print "total number of jobs:", total
+def usage():
+    print """\
+NAME
+
+  vxargs - build and execute command lines from an argument list file
+  with visualization and parallelism, and output redirection.
+   
+DESCRIPTION
+
+  vxargs reads a list of arguments from a txt file or standard input,
+  delimited by newlines, and executes the command one or more times
+  with initial arguments in which {} is substituted by the arguments
+  read from the file or standard input. The current executing commands
+  and progress will be dynamically updated on the screen. Stdout and
+  stderr of each command will be redirected to separate files. A list
+  of all processes with a non-zero exit status is generated in file
+  abnormal_list. A list of all timeout processes is generated in file
+  killed_list.
+  
+SYNOPSIS
+
+  vxargs [OPTIONS] command [initial-arguments]
+
+OPTIONS
+
+  --help
+    Print a summary of the options to vxargs and exit.
+
+  --max-procs=max-procs, -P max-procs
+    Run up to max-procs processes at a time; the default is 30.
+
+  --randomize, -r [OPTIONAL]
+    Randomize the host list before all execution.
+
+  --args=filename, -a filename
+    The arguments file. If unspecified, the arguments will be read
+    from standard input, and -y option must be specified.
+    
+  --output=outdir, -o outdir
+    output directory for stdout and stderr files
+    The default value is specified by the environment variable VXARGS_OUTDIR.
+    If it is unspecified, both stdout and stderr will be redirected
+    to /dev/null.
+    Note that if the directory existed before execution, everything
+    inside will be wiped.
+
+  --timeout=timeout, -t timeout
+    The maximal time in second for each command to execute. timeout=0
+    means infinite.  0 (i.e. infinite) is the default value. When the time is up,
+    vxargs will send signal SIGINT to the process. If the process does not
+    stop after 2 seconds, vxargs will send SIGTERM signal, and send SIGKILL
+    if it still keeps running after 3 seconds.
+
+  --noprompt, -y
+    Wipe out the outdir without confirmation.
+
+  --no-exec, -n
+    Print the commands that would be executed, but do not execute them.
+
+  --plain, -p
+    Don't use curses-based output, but plain output to stdout
+    instead. It will be less exciting, but will do the same job
+    effectively. It is useful if one wants to start vxargs from cron
+    or by another program that doesn't want to see the output.
+    By default, vxargs uses the curses-based output.
+
+  --version
+    Display current version and copyright information.
+    
+EXAMPLES:
+  Suppose the iplist.txt file has following content:
+$ cat iplist.txt
+216.165.109.79
+#planetx.scs.cs.nyu.edu
+158.130.6.254
+#planetlab1.cis.upenn.edu
+158.130.6.253
+#planetlab2.cis.upenn.edu
+128.232.103.203
+#planetlab3.xeno.cl.cam.ac.uk
+
+Note that lines starting with '#' will be interpreted as comment for
+the previous lines, which is optional, for visualization purpose only.
+
+$ vxargs -a iplist.txt -o /tmp/result -P 10 ssh upenn_dharma@{} "hostname;uptime"
+
+...[ UI output]...
+
+$ cat /tmp/result/*
+planetlab3.xeno.cl.cam.ac.uk
+ 03:13:21 up 4 days, 14:36,  0 users,  load average: 0.36, 0.44, 0.44
+planetlab2.cis.upenn.edu
+ 03:13:20  up 26 days, 16:19,  0 users,  load average: 8.11, 7.41, 7.41
+planetlab1.cis.upenn.edu
+ 03:13:19  up 22 days, 20:02,  0 users,  load average: 13.60, 12.55, 12.59
+ssh: connect to host 216.165.109.79 port 22: Connection timed out
+$
+
+other examples:
+cat iplist.txt | vxargs -o /tmp/result rsync -az -e ssh --delete mirror $SLICE@{}:
+
+vxargs -a iplist.txt -o /tmp/result ssh {} killall -9 java
+
+For more information, please visit http://dharma.cis.upenn.edu/planetlab/vxargs/
+"""
+if __name__=='__main__':
+    main()
+