4 # Copyright (C) 2003-2004 Yun Mao, University of Pennsylvania
6 # This library is free software; you can redistribute it and/or
7 # modify it under the terms of version 2.1 of the GNU Lesser General Public
8 # License as published by the Free Software Foundation.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this program; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21 vxargs: Visualized xargs with redirected output
25 import os, sys, time, signal, errno
35 def getListFromFile(f):
36 """I'll ignore the line starting with #
38 @param f: file object of the host list file
39 @return: a list of hostnames (or IPs)
45 hostlist.append([line.strip(),''])
46 elif hostlist and hostlist[-1][1]=='':
47 hostlist[-1][1] = line.strip()[1:]
50 def get_last_line(fn):
53 lines = open(fn,'r').readlines()
55 return (0, lines[-1].strip())
61 def __init__(self, outdir, num, screen, timeout, name, count):
66 self.startTime = time.time()
67 self.timeout = timeout
71 def drawLine(self, comment='', done = False):
72 if self.screen is None: return
74 comment = self.comment
76 self.comment = comment
78 elapsed = time.time()-self.startTime
80 y,x = stdscr.getmaxyx()
82 stdscr.addstr(self.slotnum+2, 0, spaces) #title occupies two lines
84 stdscr.addstr(self.slotnum+2,0, comment[:x])
87 output = "(%3ds)%3d: %s " % ( round(elapsed), self.count, self.name )
88 spaceleft = x - len(output)
89 if self.outdir and spaceleft>1:
90 outfn = os.path.join(self.outdir, '%s.out' % self.name)
91 errfn = os.path.join(self.outdir, '%s.err' % self.name)
92 lout = get_last_line(outfn)
93 lerr = get_last_line(errfn)
94 if lerr[0]==0 and lerr[1]:
96 elif lout[0]==0 and lout[1]:
102 stdscr.addstr(self.slotnum+2, 0, output[:x] )
104 except curses.error: #some of them will be out of screen, ignore it
106 def update(self, pid):
112 overtime = time.time()-self.startTime - self.timeout
114 if overtime > 3: #expired more than 3 seconds, send -9
115 os.kill(-pid, signal.SIGKILL)
116 elif overtime > 2: #expired more than 2 seconds, send -15
117 os.kill(-pid, signal.SIGTERM)
119 os.kill(-pid, signal.SIGINT)
121 if e.errno != errno.ESRCH: # No such process
125 """stop current pid b/c we caught SIGINT twice
127 self.startTime = time.time() - self.timeout
132 def __init__(self, max, screen, timeout, outdir):
134 self.slots = range(self.maxChild)
139 def getSlot(self, name, count):
141 #it's empty, wait until other jobs finish
142 slot = self.waitJobs().slotnum
145 self.slots.remove(slot)
146 return Slot(self.outdir, slot, self.screen, self.t, name, count)
148 def mapPID(self, pid, slot):
149 """@param slot: slot object
151 self.pids[pid] = slot
156 pid, status = os.wait()
159 if e.errno == errno.ECHILD: #no child processes
160 raise RuntimeError('no child processes when waiting')
161 slot = self.pids[pid]
163 open(os.path.join(self.outdir, '%s.status' % slot.name),'w').write('%d' % (status>>8))
164 if (status & 0xFF) !=0:
165 open(os.path.join(self.outdir, 'killed_list'),'a').write('%s\n' % (slot.name))
167 open(os.path.join(self.outdir, 'abnormal_list'),'a').write('%s\n' % (slot.name))
170 if final_stats.has_key(s):
176 for k,v in self.pids.items():
180 signal.alarm(update_rate)
182 def drawTitle(self, stuff):
184 y,x = self.screen.getmaxyx()
186 self.screen.addstr(0,0, spaces)
187 self.screen.addstr(0,0, stuff[:x*2])
188 self.screen.refresh()
193 msg = 'Stopping -- Waiting current jobs done. Press Ctrl-C again to kill current jobs.'
195 msg = 'Stopping -- Killing current jobs'
198 for k,v in self.pids.items():
202 def handler(signum, frame_unused):
204 if signum==signal.SIGALRM:
206 if signum==signal.SIGINT:
211 def generateCommands(cmd_line, args):
212 return [per_arg.replace('{}', args[0]) for per_arg in cmd_line]
214 def spawn(cmdline, outfn, errfn, setpgrp = False):
215 """A cleverer spawn that lets you redirect stdout and stderr to
216 outfn and errfn. Returns pid of child.
217 You can't do this with os.spawn, sadly.
221 out = open(outfn, 'w')
222 os.dup2(out.fileno() ,sys.stdout.fileno())
223 err = open(errfn, 'w')
224 os.dup2(err.fileno(), sys.stderr.fileno())
228 os.execvp(cmdline[0], cmdline)
230 print >> sys.stderr, "error before execution:",e
235 def start(win, max_child, hlist, outdir, randomize, command_line, timeout):
240 random.shuffle(hlist)
242 signal.signal(signal.SIGALRM, handler)
243 signal.signal(signal.SIGINT, handler)
244 signal.alarm(update_rate)
246 sl = Slots(max_child, win, timeout, outdir)
252 slot = sl.getSlot(i[0], count)
254 slot.drawLine('Done', done=True)
259 x = generateCommands(command_line, i)
261 sl.drawTitle("%d/%d:%s" %(count, total,' '.join(x)))
263 outpath = '/dev/null'
264 errpath = '/dev/null'
266 outpath = os.path.join(outdir, '%s.out'%i[0])
267 errpath = os.path.join(outdir, '%s.err'%i[0])
269 pid = spawn(x, outpath, errpath, setpgrp = True)
276 print >> sys.stderr, 'Warning: lost tracking of %d jobs' % len(sl.pids)
278 slot.drawLine('Done', done = True) #Done
280 def get_output(outdir, argument_list, out= True, err=False, status=False):
283 For post processing the output dir.
285 @param out: decide whether to process *.out files
286 @param err: decide whether to process *.err files
287 @param status: decide whether to process *.status files
289 @return: (out, err, status): out is a hash table, in which the
290 keys are the arguments, and the values are the string of the
291 output, if available. err is similar. the values of hash table
292 status is the value of exit status in int.
295 if not out and not err and not status:
296 raise RuntimeError("one of out, err and status has to be True")
299 mapping = ('out','err','status')
303 if status: p.append(2)
304 for arg in argument_list:
305 basefn = os.path.join(outdir, arg)
307 fn = '.'.join( (basefn, mapping[i]) ) #basefn.ext
309 lines = open(fn).readlines()
310 result[i][arg]=''.join(lines)
313 if not status: return result
315 for k,v in result[2].items():
317 int_status[k] = int(v.strip())
320 return result[0], result[1], int_status
323 options = 'hP:ra:o:yt:pn'
324 long_opts = ['help','max-procs=','randomize','args=','output=','noprompt','timeout=','plain', 'version','no-exec']
326 opts,args = getopt.getopt(sys.argv[1:], options,long_opts)
327 except getopt.GetoptError:
328 print "Unknown options"
340 if os.environ.has_key('VXARGS_OUTDIR'):
341 outdir = os.environ['VXARGS_OUTDIR']
343 if o in ['--version']:
344 print "vxargs version",version
345 print "Copyright (c) 2004 Yun Mao (maoy@cis.upenn.edu)"
346 print "Freely distributed under GNU LGPL License"
348 elif o in ['-h','--help']:
351 elif o in ['-r','--randomize']:
353 elif o in ['-P','--max-procs']:
355 elif o in ['-a','--args']:
357 hostfile = open(a,'r')
359 print "argument file %s has error: %s" % ( a, str(e) )
361 elif o in ['-o','--output']:
363 if a =='/dev/null': outdir = ''
364 elif o in ['-y','--noprompt']:
366 elif o in ['-t','--timeout']:
368 elif o in ['-p','--plain']:
370 elif o in ['-n','--no-exec']:
373 print 'Unknown options'
377 print "No command given."
382 if os.path.exists(outdir):
383 if not os.path.isdir(outdir):
384 print "%s exists and is not a dir, won't continue" % outdir
387 print "%s is the destination dir and would be destroyed." % (outdir)
389 if hostfile == sys.stdin:
390 print "You must specify --noprompt (-y) option if no --args (-a) or --no-exec (-n) is given. Doing so will destroy folder %s." % (outdir)
393 result = raw_input("%s exists. Continue will destroy everything in it. Are you sure? (y/n) " % (outdir))
394 if result not in ['y','Y']:
396 os.system('rm -f %s' % (os.path.join(outdir,'*')))
399 os.system('mkdir -p %s' % outdir)
401 hlist = getListFromFile(hostfile)
404 real_cmdline = generateCommands(args, i)
405 print ' '.join(real_cmdline)
408 if plain: # no fancy output
409 return start(None, maxchild, hlist, outdir, randomize, args, timeout)
411 # use fancy curses-based animation
413 curses.wrapper(start, maxchild, hlist, outdir, randomize, args, timeout)
416 #post execution, output some stats
418 for k,v in final_stats.items():
419 print "exit code %d: %d job(s)" % (k,v)
421 print "total number of jobs:", total
426 vxargs - build and execute command lines from an argument list file
427 with visualization and parallelism, and output redirection.
431 vxargs reads a list of arguments from a txt file or standard input,
432 delimited by newlines, and executes the command one or more times
433 with initial arguments in which {} is substituted by the arguments
434 read from the file or standard input. The current executing commands
435 and progress will be dynamically updated on the screen. Stdout and
436 stderr of each command will be redirected to separate files. A list
437 of all processes with a non-zero exit status is generated in file
438 abnormal_list. A list of all timeout processes is generated in file
443 vxargs [OPTIONS] command [initial-arguments]
448 Print a summary of the options to vxargs and exit.
450 --max-procs=max-procs, -P max-procs
451 Run up to max-procs processes at a time; the default is 30.
453 --randomize, -r [OPTIONAL]
454 Randomize the host list before all execution.
456 --args=filename, -a filename
457 The arguments file. If unspecified, the arguments will be read
458 from standard input, and -y option must be specified.
460 --output=outdir, -o outdir
461 output directory for stdout and stderr files
462 The default value is specified by the environment variable VXARGS_OUTDIR.
463 If it is unspecified, both stdout and stderr will be redirected
465 Note that if the directory existed before execution, everything
466 inside will be wiped.
468 --timeout=timeout, -t timeout
469 The maximal time in second for each command to execute. timeout=0
470 means infinite. 0 (i.e. infinite) is the default value. When the time is up,
471 vxargs will send signal SIGINT to the process. If the process does not
472 stop after 2 seconds, vxargs will send SIGTERM signal, and send SIGKILL
473 if it still keeps running after 3 seconds.
476 Wipe out the outdir without confirmation.
479 Print the commands that would be executed, but do not execute them.
482 Don't use curses-based output, but plain output to stdout
483 instead. It will be less exciting, but will do the same job
484 effectively. It is useful if one wants to start vxargs from cron
485 or by another program that doesn't want to see the output.
486 By default, vxargs uses the curses-based output.
489 Display current version and copyright information.
492 Suppose the iplist.txt file has following content:
495 #planetx.scs.cs.nyu.edu
497 #planetlab1.cis.upenn.edu
499 #planetlab2.cis.upenn.edu
501 #planetlab3.xeno.cl.cam.ac.uk
503 Note that lines starting with '#' will be interpreted as comment for
504 the previous lines, which is optional, for visualization purpose only.
506 $ vxargs -a iplist.txt -o /tmp/result -P 10 ssh upenn_dharma@{} "hostname;uptime"
511 planetlab3.xeno.cl.cam.ac.uk
512 03:13:21 up 4 days, 14:36, 0 users, load average: 0.36, 0.44, 0.44
513 planetlab2.cis.upenn.edu
514 03:13:20 up 26 days, 16:19, 0 users, load average: 8.11, 7.41, 7.41
515 planetlab1.cis.upenn.edu
516 03:13:19 up 22 days, 20:02, 0 users, load average: 13.60, 12.55, 12.59
517 ssh: connect to host 216.165.109.79 port 22: Connection timed out
521 cat iplist.txt | vxargs -o /tmp/result rsync -az -e ssh --delete mirror $SLICE@{}:
523 vxargs -a iplist.txt -o /tmp/result ssh {} killall -9 java
525 For more information, please visit http://dharma.cis.upenn.edu/planetlab/vxargs/
527 if __name__=='__main__':