From: Stephen Soltesz Date: Mon, 23 Jun 2008 17:04:08 +0000 (+0000) Subject: commit of tools I use, but are not documented or guaranteed to work for anyone X-Git-Tag: Monitor-1.0-5~11 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=590ac12c941310b40a92d2fe938e62e3538f2893;hp=01e3fe051d372c8de607da799f07f8aeca0b5d44;p=monitor.git commit of tools I use, but are not documented or guaranteed to work for anyone else. --- diff --git a/fetch.py b/fetch.py new file mode 100755 index 0000000..968ad66 --- /dev/null +++ b/fetch.py @@ -0,0 +1,79 @@ +#!/usr/bin/python + +import csv +import sys +import os +import config +from glob import glob + +import vxargs +from config import config +from optparse import OptionParser +from automate import * + +parser = OptionParser() +parser.set_defaults(nodelist=None, + node=None, + outdir=None, + querystr=None, + timeout=0, + simple=False, + run=False, + cmdfile=None,) + +parser.add_option("", "--nodelist", dest="nodelist", metavar="filename", + help="Read list of nodes from specified file") +parser.add_option("", "--node", dest="node", metavar="hostname", + help="specify a single node name.") +parser.add_option("", "--timeout", dest="timeout", metavar="seconds", + help="Number of seconds to wait before timing out on host.") +parser.add_option("", "--outdir", dest="outdir", metavar="dirname", + help="Name of directory to place output") +parser.add_option("", "--cmd", dest="cmdfile", metavar="filename", + help="Name of file that contains a unix-to-csv command " + \ + "to run on the hosts.") + +config = config(parser) +config.parse_args() + +def build_vx_args(shell_cmd): + ssh_options="-q -o UserKnownHostsFile=junkssh -o StrictHostKeyChecking=no" + cmd="""ssh %s root@{} """ % ssh_options + args = cmd.split() + args.append(shell_cmd) + return args + +def vx_start(filelist,outdir,cmd): + args = build_vx_args(cmd) + #vxargs.start(None, 10, filelist, outdir, False, args, 120) + vxargs.start(None, 10, filelist, outdir, False, args, int(config.timeout)) + +if config.outdir == None: + outdir="checkhosts" +else: + outdir=config.outdir + +if not os.path.exists(outdir): + os.system('mkdir -p %s' % outdir) + +if config.nodelist == None and config.node == None: + filelist="nocomon.txt" + filelist = vxargs.getListFromFile(open(filelist,'r')) +elif os.path.exists(str(config.nodelist)) and os.path.isfile(config.nodelist): + filelist = vxargs.getListFromFile(open(config.nodelist,'r')) +elif os.path.exists(str(config.nodelist)) and os.path.isdir(config.nodelist): + filelist = get_hostlist_from_dir(config.nodelist) +elif config.node is not None: + filelist = [(config.node, '')] +else: + # probably no such file. + raise Exception("No such file %s" % config.nodelist) + +if config.cmdfile == None: + f = open("command.txt",'r') + cmd = f.read() +else: + f = open(config.cmdfile,'r') + cmd = f.read() + +vx_start(filelist, outdir, cmd) diff --git a/getconf.py b/getconf.py new file mode 100755 index 0000000..802eced --- /dev/null +++ b/getconf.py @@ -0,0 +1,114 @@ +#!/usr/bin/python + +import auth +import plc +import sys +import os + +def getconf(hostname): + api = plc.PLC(auth.auth, auth.plc) + n = api.GetNodes(hostname) + filename = "bootcd-alpha/" + hostname + ".txt" + if not os.path.exists(filename): + f = open("bootcd-alpha/" + hostname + ".txt", 'w') + f.write( api.AdmGenerateNodeConfFile(n[0]['node_id']) ) + f.close() + print os.system("cd bootcd-alpha; ./build.sh -f %s.txt -t iso -o /plc/data/var/www/html/bootcds/%s.iso &> /dev/null" % ( hostname, hostname)) + print os.system("cd bootcd-alpha; ./build.sh -f %s.txt -t usb_partition -o /plc/data/var/www/html/bootcds/%s-partition.usb &> /dev/null" % ( hostname, hostname)) + else: + # assume that the images have already been generated.. + pass + + args = {} + args['url_list'] = " http://pl-virtual-03.cs.princeton.edu/bootcds/%s-partition.usb\n" % hostname + args['url_list'] += " http://pl-virtual-03.cs.princeton.edu/bootcds/%s.iso" % hostname + #print "http://pl-virtual-03.cs.princeton.edu/bootcds/%s.usb\n" % hostname + + return args + +if __name__ == '__main__': + from config import config as cfg + from optparse import OptionParser + parser = OptionParser() + parser.set_defaults(media='both') + parser.add_option("", "--media", dest="media", metavar="usb, iso, both", + help="""Which media to generate the message for.""") + + config = cfg(parser) + config.parse_args() + + ret = {'url_list' : ''} + for i in config.args: + conf = getconf(i) + ret['url_list'] += conf['url_list'] + ret['hostname'] = i + + if config.media == "both": + print """ +Hello, + +Here are links to both the ISO CD image, and partitioned, USB image for the +DC7800 and others. These are based on the new 4.2 BootImage, and are the most +up-to-date software for PlanetLab nodes. + +%(url_list)s + +All that is necessary is to raw-write these images to a usb stick or CD-ROM, and +then boot from them. If using USB, please use a command like: + + dd if=%(hostname)s.usb of=/dev/sdX + +Where sdX is your USB device. It is not necessary to run any other formatting +commands for these images, because they already include a MBR, partition +table, and fs. + +Please let me know if you have any trouble. + +Thank you, + +""" % ret + + elif config.media == "cd": + print """ +Hello, + +Here are links to the ISO CD image(s) for your machines. These are based on +the new 4.2 BootImage, and are the most up-to-date software for PlanetLab +nodes. + +%(url_list)s + +All that is necessary is to burn these images to a CD-ROM, and +then boot from them. + +Please let me know if you have any trouble. + +Thank you, + +""" % ret + + elif config.media == "usb": + print """ +Hello, + +Here are links to the partitioned, USB images for the DC7800 and others. +These are based on the new 4.2 BootImage, and are the most +up-to-date software for PlanetLab nodes. + +%(url_list)s + +All that is necessary is to raw-write these images to a usb stick, and +then boot from them. Please use a command like: + + dd if=%(hostname)s.usb of=/dev/sdX + +Where sdX is your direct, USB device. Do not use a partition on the usb +image, or the boot will fail. It is not necessary to run any other formatting +commands for these images, because they already include a MBR, partition +table, and fs. + +Please let me know if you have any trouble. + +Thank you, + +""" % ret diff --git a/grouprins.py b/grouprins.py new file mode 100755 index 0000000..99af752 --- /dev/null +++ b/grouprins.py @@ -0,0 +1,348 @@ +#!/usr/bin/python + +# This script is used to manipulate the operational state of nodes in +# different node groups. These are basically set operations on nodes via the +# PLC api. +# +# Take the ng name as an argument.... +# optionally, +# * get a list of nodes in the given nodegroup. +# * set some or all in the set to rins. +# * restart them all. +# * do something else to them all. +# + +import plc +import auth +api = plc.PLC(auth.auth, auth.plc) + +import policy + +from config import config as cfg +from optparse import OptionParser + +from nodecommon import * +from nodequery import verify,query_to_dict,node_select +import soltesz +from unified_model import * + +import time + +from model import * +import bootman # debug nodes +import monitor # down nodes with pcu +import reboot # down nodes without pcu +reboot.verbose = 0 +import sys + +class Reboot(object): + def __init__(self, fbnode): + self.fbnode = fbnode + + def _send_pcunotice(self, host): + args = {} + args['hostname'] = host + try: + args['pcu_id'] = plc.getpcu(host)['pcu_id'] + except: + args['pcu_id'] = host + + m = PersistMessage(host, mailtxt.pcudown_one[0] % args, + mailtxt.pcudown_one[1] % args, True, db='pcu_persistmessages') + + loginbase = plc.siteId(hostname) + m.send([policy.TECHEMAIL % loginbase]) + + def pcu(self, host): + # TODO: It should be possible to diagnose the various conditions of + # the PCU here, and send different messages as appropriate. + if self.fbnode['pcu'] == "PCU": + self.action = "reboot.reboot('%s')" % host + + pflags = PersistFlags(host, 1*60*60*24, db='pcu_persistflags') + if not pflags.getRecentFlag('pcutried'): # or not pflags.getFlag('pcufailed'): + pflags.setRecentFlag('pcutried') + try: + ret = reboot.reboot(host) + + pflags.save() + return ret + + except Exception,e: + import traceback; print traceback.print_exc(); print e + + # NOTE: this failure could be an implementation issue on + # our end. So, extra notices are confusing... + # self._send_pcunotice(host) + + pflags.setRecentFlag('pcufailed') + pflags.save() + return False + else: + # we've tried the pcu recently, but it didn't work, + # so did we send a message about it recently? + if not pflags.getRecentFlag('pcumessagesent'): + + self._send_pcunotice(host) + + pflags.setRecentFlag('pcumessagesent') + pflags.save() + + else: + pass # just skip it? + + else: + self.action = "None" + return False + + def mail(self, host): + + # Reset every 4 weeks or so + pflags = PersistFlags(host, 27*60*60*24, db='mail_persistflags') + if not pflags.getRecentFlag('endrecord'): + node_end_record(host) + pflags.setRecentFlag('endrecord') + pflags.save() + + # Then in either case, run monitor.reboot() + self.action = "monitor.reboot('%s')" % host + try: + return monitor.reboot(host) + except Exception, e: + import traceback; print traceback.print_exc(); print e + return False + +class RebootDebug(Reboot): + + def direct(self, host): + self.action = "bootman.reboot('%s', config, None)" % host + return bootman.reboot(host, config, None) + +class RebootBoot(Reboot): + + def direct(self, host): + self.action = "bootman.reboot('%s', config, 'reboot')" % host + return bootman.reboot(host, config, 'reboot') + +class RebootDown(Reboot): + + def direct(self, host): + self.action = "None" + return False # this always fails, since the node will be down. + + +try: + rebootlog = soltesz.dbLoad("rebootlog") +except: + rebootlog = LogRoll() + +parser = OptionParser() +parser.set_defaults(nodegroup=None, + node=None, + nodelist=None, + nodeselect=None, + timewait=30, + skip=0, + rins=False, + reboot=False, + findbad=False, + force=False, + nosetup=False, + verbose=False, + stopkey=None, + stopvalue=None, + quiet=False, + ) +parser.add_option("", "--node", dest="node", metavar="nodename.edu", + help="A single node name to add to the nodegroup") +parser.add_option("", "--nodelist", dest="nodelist", metavar="list.txt", + help="Use all nodes in the given file for operation.") +parser.add_option("", "--nodegroup", dest="nodegroup", metavar="NodegroupName", + help="Specify a nodegroup to perform actions on") +parser.add_option("", "--nodeselect", dest="nodeselect", metavar="querystring", + help="Specify a query to perform on findbad db") + +parser.add_option("", "--verbose", dest="verbose", action="store_true", + help="Extra debug output messages.") +parser.add_option("", "--nosetup", dest="nosetup", action="store_true", + help="Do not perform the orginary setup phase.") + +parser.add_option("", "--skip", dest="skip", + help="Number of machines to skip on the input queue.") +parser.add_option("", "--timewait", dest="timewait", + help="Minutes to wait between iterations of 10 nodes.") + +parser.add_option("", "--stopselect", dest="stopselect", metavar="", + help="The select string that must evaluate to true for the node to be considered 'done'") + +parser.add_option("", "--stopkey", dest="stopkey", metavar="", + help="") +parser.add_option("", "--stopvalue", dest="stopvalue", metavar="", + help="") + +parser.add_option("", "--findbad", dest="findbad", action="store_true", + help="Re-run findbad on the nodes we're going to check before acting.") +parser.add_option("", "--force", dest="force", action="store_true", + help="Force action regardless of previous actions/logs.") +parser.add_option("", "--rins", dest="rins", action="store_true", + help="Set the boot_state to 'rins' for all nodes.") +parser.add_option("", "--reboot", dest="reboot", action="store_true", + help="Actively try to reboot the nodes, keeping a log of actions.") +#config = config(parser) +config = cfg(parser) +config.parse_args() + +# COLLECT nodegroups, nodes and node lists +if config.nodegroup: + ng = api.GetNodeGroups({'name' : config.nodegroup}) + nodelist = api.GetNodes(ng[0]['node_ids']) + hostnames = [ n['hostname'] for n in nodelist ] + +if config.node or config.nodelist: + if config.node: hostnames = [ config.node ] + else: hostnames = config.getListFromFile(config.nodelist) + +if config.nodeselect: + hostnames = node_select(config.nodeselect) + +if config.findbad: + # rerun findbad with the nodes in the given nodes. + import os + file = "findbad.txt" + config.setFileFromList(file, hostnames) + os.system("./findbad.py --cachenodes --debug=0 --dbname=findbad --increment --nodelist %s" % file) + +fb = soltesz.dbLoad("findbad") +# commands: +i = 1 +count = 1 +for host in hostnames: + + #if 'echo' in host or 'hptest-1' in host: continue + + try: + try: + node = api.GetNodes(host)[0] + except: + import traceback; print traceback.print_exc(); + print "FAILED GETNODES for host: %s" % host + continue + + print "%-2d" % i, nodegroup_display(node, fb) + i += 1 + if i < int(config.skip): continue + + if config.stopselect: + dict_query = query_to_dict(config.stopselect) + fbnode = fb['nodes'][host]['values'] + observed_state = get_current_state(fbnode) + + if verify(dict_query, fbnode) and observed_state != "dbg ": + # evaluates to true, therefore skip. + print "%s evaluates true for %s ; skipping..." % ( config.stopselect, host ) + continue + + if config.stopkey and config.stopvalue: + fbnode = fb['nodes'][host]['values'] + observed_state = get_current_state(fbnode) + + if config.stopkey in fbnode: + if config.stopvalue in fbnode[config.stopkey] and observed_state != "dbg ": + print "%s has stopvalue; skipping..." % host + continue + else: + print "stopkey %s not in fbnode record for %s; skipping..." % (config.stopkey, host) + print fbnode + continue + + if not config.force and rebootlog.find(host, {'action' : ".*reboot"}, 60*60*2): + print "recently rebooted %s. skipping... " % host + continue + + if config.rins: + # reset the boot_state to 'rins' + node = api.GetNodes(host, ['boot_state', 'last_contact', 'last_updated', 'date_created']) + record = {'observation' : node[0], + 'model' : 'USER_REQUEST', + 'action' : 'api.UpdateNode(%s, {"boot_state" : "rins"})' % host, + 'time' : time.time()} + l = Log(host, record) + + ret = api.UpdateNode(host, {'boot_state' : 'rins'}) + if ret: + # it's nice to see the current status rather than the previous status on the console + node = api.GetNodes(host)[0] + print l + print "%-2d" % (i-1), nodegroup_display(node, fb) + rebootlog.add(l) + else: + print "FAILED TO UPDATE NODE BOOT STATE : %s" % host + + + if config.reboot: + + fbnode = fb['nodes'][host]['values'] + observed_state = get_current_state(fbnode) + + if observed_state == "dbg ": + o = RebootDebug(fbnode) + + elif observed_state == "boot" : + o = RebootBoot(fbnode) + + elif observed_state == "down": + o = RebootDown(fbnode) + + + if o.direct(host): + record = {'observation' : "DIRECT_SUCCESS: %s" % observed_state, + 'action' : o.action, + 'model' : "none", + 'time' : time.time()} + elif o.pcu(host): + record = {'observation' : "PCU_SUCCESS: %s" % observed_state, + 'action' : o.action, + 'model' : "none", + 'time' : time.time()} + elif o.mail(host): + record = {'observation' : "MAIL_SUCCESS: %s" % observed_state, + 'action' : o.action, + 'model' : "none", + 'time' : time.time()} + else: + record = {'observation' : "REBOOT_FAILED: %s" % observed_state, + 'action' : "log failure", + 'model' : "none", + 'time' : time.time()} + + print "ALL METHODS OF RESTARTING %s FAILED" % host + + l = Log(host, record) + print l + rebootlog.add(l) + except KeyboardInterrupt: + print "Killed by interrupt" + sys.exit(0) + except: + import traceback; print traceback.print_exc(); + print "Continuing..." + + time.sleep(1) + if count % 10 == 0: + print "Saving rebootlog" + soltesz.dbDump("rebootlog", rebootlog) + wait_time = int(config.timewait) + print "Sleeping %d minutes" % wait_time + ti = 0 + print "Minutes slept: ", + sys.stdout.flush() + while ti < wait_time: + print "%s" % ti, + sys.stdout.flush() + time.sleep(60) + ti = ti+1 + + count = count + 1 + +print "Saving rebootlog" +soltesz.dbDump("rebootlog", rebootlog) diff --git a/query.py b/query.py new file mode 100755 index 0000000..53eadfb --- /dev/null +++ b/query.py @@ -0,0 +1,77 @@ +#!/usr/bin/python + +import csv +import sys +import os +import config +from glob import glob +import re +from cgi import parse_qs + +import vxargs +from config import config +from optparse import OptionParser +from automate import * + +parser = OptionParser() +parser.set_defaults(nodelist=None, + outdir=None, + querystr=None, + simple=False, + run=False, + cmdfile=None,) + +parser.add_option("", "--nodelist", dest="nodelist", metavar="filename", + help="Read list of nodes from specified file") +parser.add_option("", "--outdir", dest="outdir", metavar="dirname", + help="Name of directory to place output") +parser.add_option("", "--query", dest="querystr", metavar="QUERY", + help="a simple query string: key=value") +parser.add_option("", "--simple", dest="simple", action="store_true", + help="display simple output") + +config = config(parser) +config.parse_args() + + +if config.outdir == None: + outdir="checkhosts" +else: + outdir=config.outdir + +nodelist = None +if config.nodelist is not None: + nodelist = config.getListFromFile(config.nodelist) + +if config.querystr == None: + queries = parse_qs("IP_SUBNET=127.0.0.1") +else: + queries = parse_qs(config.querystr) + #(key,query) = config.querystr.split("=") + +# Create a file list based on the provide nodelist or a simple pattern for all +# files in the given 'outdir' directory +filelist = None +if nodelist is not None: + filelist = [] + for node in nodelist: + filelist.append("%s/%s.out" % (outdir,node)) +else: + filelist = glob("%s/*.out" % outdir) + +for file in filelist: + vals = csv_to_hash(csv.reader(open(file,'r'))) + hostname = file[len(outdir):-4] + m = True + for key in queries.keys(): + q = re.compile(queries[key][0]) + if key in vals and q.match(vals[key]): + m=(m and True) + else: + m=(m and False) + + if m: + if config.simple: + print hostname, vals[key] + else: + print hostname, vals diff --git a/ticket_blacklist.py b/ticket_blacklist.py new file mode 100755 index 0000000..63bdcc0 --- /dev/null +++ b/ticket_blacklist.py @@ -0,0 +1,55 @@ +#!/usr/bin/python + +import os +import sys +import string +import time +import soltesz +import plc +import getopt + +def usage(): + print "ticket_blacklist.py --delete=" + +def main(): + + try: + longopts = ["delete=", "help"] + (opts, argv) = getopt.getopt(sys.argv[1:], "d:h", longopts) + except getopt.GetoptError, err: + print "Error: " + err.msg + sys.exit(1) + + l_ticket_blacklist = soltesz.if_cached_else(1, "l_ticket_blacklist", lambda : []) + + for (opt, optval) in opts: + if opt in ["-d", "--delete"]: + i = int(optval) + del l_ticket_blacklist[i] + else: + usage() + sys.exit(0) + + i_cnt = 0 + for i in l_ticket_blacklist: + print i_cnt, " ", i + i_cnt += 1 + + while 1: + line = sys.stdin.readline() + if not line: + break + line = line.strip() + if not line in l_ticket_blacklist: + l_ticket_blacklist.append(line) + + print "Total %d nodes in ticket_blacklist" % (len(l_ticket_blacklist)) + soltesz.dbDump("l_ticket_blacklist") + +if __name__ == '__main__': + import os + #try: + main() + #except Exception, error: + # print "Exception %s" % error + # sys.exit(0) diff --git a/unified_model.py b/unified_model.py new file mode 100755 index 0000000..918f653 --- /dev/null +++ b/unified_model.py @@ -0,0 +1,463 @@ +#!/usr/bin/python + +import soltesz + +import plc +import auth +api = plc.PLC(auth.auth, auth.plc) + +import config +import mailer +import time + +def gethostlist(hostlist_file): + return config.getListFromFile(hostlist_file) + + #nodes = api.GetNodes({'peer_id' : None}, ['hostname']) + #return [ n['hostname'] for n in nodes ] + +def array_to_priority_map(array): + """ Create a mapping where each entry of array is given a priority equal + to its position in the array. This is useful for subsequent use in the + cmpMap() function.""" + map = {} + count = 0 + for i in array: + map[i] = count + count += 1 + return map + +def cmpValMap(v1, v2, map): + if v1 in map and v2 in map and map[v1] < map[v2]: + return 1 + elif v1 in map and v2 in map and map[v1] > map[v2]: + return -1 + elif v1 in map and v2 in map: + return 0 + else: + raise Exception("No index %s or %s in map" % (v1, v2)) + +def cmpCategoryVal(v1, v2): + map = array_to_priority_map([ None, 'ALPHA', 'PROD', 'OLDBOOTCD', 'UNKNOWN', 'FORCED', 'ERROR', ]) + return cmpValMap(v1,v2,map) + + +class PCU: + def __init__(self, hostname): + self.hostname = hostname + + def reboot(self): + return True + def available(self): + return True + def previous_attempt(self): + return True + def setValidMapping(self): + pass + +class Penalty: + def __init__(self, key, valuepattern, action): + pass + +class PenaltyMap: + def __init__(self): + pass + + # connect one penalty to another, in a FSM diagram. After one + # condition/penalty is applied, move to the next phase. + + +fb = soltesz.dbLoad("findbad") + +class RT(object): + def __init__(self, ticket_id = None): + self.ticket_id = ticket_id + if self.ticket_id: + print "getting ticket status", + self.status = mailer.getTicketStatus(self.ticket_id) + print self.status + + def setTicketStatus(self, status): + mailer.setTicketStatus(self.ticket_id, status) + self.status = mailer.getTicketStatus(self.ticket_id) + return True + + def getTicketStatus(self): + if not self.status: + self.status = mailer.getTicketStatus(self.ticket_id) + return self.status + + def closeTicket(self): + mailer.closeTicketViaRT(self.ticket_id) + + def email(self, subject, body, to): + self.ticket_id = mailer.emailViaRT(subject, body, to, self.ticket_id) + return self.ticket_id + +class Message(object): + def __init__(self, subject, message, via_rt=True, ticket_id=None, **kwargs): + self.via_rt = via_rt + self.subject = subject + self.message = message + self.rt = RT(ticket_id) + + def send(self, to): + if self.via_rt: + return self.rt.email(self.subject, self.message, to) + else: + return mailer.email(self.subject, self.message, to) + +class Recent(object): + def __init__(self, withintime): + self.withintime = withintime + self.time = time.time() + self.action_taken = False + + def isRecent(self): + if self.time + self.withintime < time.time(): + self.action_taken = False + + if self.time + self.withintime > time.time() and self.action_taken: + return True + else: + return False + + def unsetRecent(self): + self.action_taken = False + self.time = time.time() + return True + + def setRecent(self): + self.action_taken = True + self.time = time.time() + return True + +class PersistFlags(Recent): + def __new__(typ, id, *args, **kwargs): + if 'db' in kwargs: + db = kwargs['db'] + del kwargs['db'] + else: + db = "persistflags" + + try: + pm = soltesz.dbLoad(db) + except: + soltesz.dbDump(db, {}) + pm = soltesz.dbLoad(db) + #print pm + if id in pm: + obj = pm[id] + else: + obj = super(PersistFlags, typ).__new__(typ, *args, **kwargs) + for key in kwargs.keys(): + obj.__setattr__(key, kwargs[key]) + + obj.db = db + return obj + + def __init__(self, id, withintime, **kwargs): + self.id = id + Recent.__init__(self, withintime) + + def save(self): + pm = soltesz.dbLoad(self.db) + pm[self.id] = self + soltesz.dbDump(self.db, pm) + + def resetFlag(self, name): + self.__setattr__(name, False) + + def setFlag(self, name): + self.__setattr__(name, True) + + def getFlag(self, name): + try: + return self.__getattribute__(name) + except: + self.__setattr__(name, False) + return False + + def setRecentFlag(self, name): + self.setFlag(name) + self.setRecent() + + def getRecentFlag(self, name): + # if recent and flag set -> true + # else false + try: + return self.isRecent() & self.__getattribute__(name) + except: + self.__setattr__(name, False) + return False + +class PersistMessage(Message): + def __new__(typ, id, subject, message, via_rt, **kwargs): + if 'db' in kwargs: + db = kwargs['db'] + else: + db = "persistmessages" + + try: + pm = soltesz.dbLoad(db) + except: + soltesz.dbDump(db, {}) + pm = soltesz.dbLoad(db) + + #print pm + if id in pm: + print "Using existing object" + obj = pm[id] + else: + print "creating new object" + obj = super(PersistMessage, typ).__new__(typ, [id, subject, message, via_rt], **kwargs) + obj.id = id + obj.actiontracker = Recent(3*60*60*24) + obj.ticket_id = None + + obj.db = db + return obj + + def __init__(self, id, subject, message, via_rt=True, **kwargs): + print "initializing object: %s" % self.ticket_id + self.id = id + Message.__init__(self, subject, message, via_rt, self.ticket_id) + + def reset(self): + self.actiontracker.unsetRecent() + + def send(self, to): + if not self.actiontracker.isRecent(): + self.ticket_id = Message.send(self, to) + self.actiontracker.setRecent() + + #print "recording object for persistance" + pm = soltesz.dbLoad(self.db) + pm[self.id] = self + soltesz.dbDump(self.db, pm) + else: + # NOTE: only send a new message every week, regardless. + print "Not sending to host b/c not within window of 6 days" + pass + +class MonitorMessage(object): + def __new__(typ, id, *args, **kwargs): + if 'db' in kwargs: + db = kwargs['db'] + else: + db = "monitormessages" + + try: + if 'reset' in kwargs and kwargs['reset'] == True: + soltesz.dbDump(db, {}) + pm = soltesz.dbLoad(db) + except: + soltesz.dbDump(db, {}) + pm = soltesz.dbLoad(db) + + #print pm + if id in pm: + print "Using existing object" + obj = pm[id] + else: + print "creating new object" + obj = super(object, typ).__new__(typ, id, *args, **kwargs) + obj.id = id + obj.sp = PersistSitePenalty(id, 0) + + obj.db = db + return obj + + def __init__(self, id, message): + pass + + +class SitePenalty(object): + penalty_map = [] + penalty_map.append( { 'name': 'noop', 'enable' : lambda host: None, + 'disable' : lambda host: None } ) + penalty_map.append( { 'name': 'nocreate', 'enable' : lambda host: plc.removeSliceCreation(host), + 'disable' : lambda host: plc.enableSliceCreation(host) } ) + penalty_map.append( { 'name': 'suspendslices', 'enable' : lambda host: plc.suspendSlices(host), + 'disable' : lambda host: plc.enableSlices(host) } ) + + #def __init__(self, index=0, **kwargs): + # self.index = index + + def get_penalties(self): + # TODO: get penalties actually applied to a node from PLC DB. + return [ n['name'] for n in SitePenalty.penalty_map ] + + def increase(self): + self.index = self.index + 1 + if self.index > len(SitePenalty.penalty_map)-1: self.index = len(SitePenalty.penalty_map)-1 + return True + + def decrease(self): + self.index = self.index - 1 + if self.index < 0: self.index = 0 + return True + + def apply(self, host): + + for i in range(len(SitePenalty.penalty_map)-1,self.index,-1): + print "\tdisabling %s on %s" % (SitePenalty.penalty_map[i]['name'], host) + SitePenalty.penalty_map[i]['disable'](host) + + for i in range(0,self.index+1): + print "\tapplying %s on %s" % (SitePenalty.penalty_map[i]['name'], host) + SitePenalty.penalty_map[i]['enable'](host) + + return + + + +class PersistSitePenalty(SitePenalty): + def __new__(typ, id, index, **kwargs): + if 'db' in kwargs: + db = kwargs['db'] + else: + db = "persistpenalties" + + try: + if 'reset' in kwargs and kwargs['reset'] == True: + soltesz.dbDump(db, {}) + pm = soltesz.dbLoad(db) + except: + soltesz.dbDump(db, {}) + pm = soltesz.dbLoad(db) + + #print pm + if id in pm: + print "Using existing object" + obj = pm[id] + else: + print "creating new object" + obj = super(PersistSitePenalty, typ).__new__(typ, [index], **kwargs) + obj.id = id + obj.index = index + + obj.db = db + return obj + + def __init__(self, id, index, **kwargs): + self.id = id + #SitePenalty.__init__(self, self.index) + + def save(self): + pm = soltesz.dbLoad(self.db) + pm[self.id] = self + soltesz.dbDump(self.db, pm) + + + +class Target: + """ + Each host has a target set of attributes. Some may be set manually, + or others are set globally for the preferred target. + + For instance: + All nodes in the Alpha or Beta group would have constraints like: + [ { 'state' : 'BOOT', 'kernel' : '2.6.22' } ] + """ + def __init__(self, constraints): + self.constraints = constraints + + def verify(self, data): + """ + self.constraints is a list of key, value pairs. + # [ {... : ...}==AND , ... , ... , ] == OR + """ + con_or_true = False + for con in self.constraints: + #print "con: %s" % con + con_and_true = True + for key in con.keys(): + #print "looking at key: %s" % key + if key in data: + #print "%s %s" % (con[key], data[key]) + con_and_true = con_and_true & (con[key] in data[key]) + elif key not in data: + print "missing key %s" % key + con_and_true = False + + con_or_true = con_or_true | con_and_true + + return con_or_true + +class NodeRecord: + def __init__(self, hostname, target): + self.hostname = hostname + self.pcu = PCU(hostname) + self.ticket = None + self.target = target + if hostname in fb['nodes']: + self.data = fb['nodes'][hostname]['values'] + else: + raise Exception("Hostname not in scan database") + + def get(self): + pass + def severity(self): + category = self.data['category'] + prev_category = self.data['prev_category'] + val = cmpCategoryVal(category, prev_category) + return val + def open_tickets(self): + if self.ticket and self.ticket.status['status'] == 'open': + return 1 + return 0 + def setIntrospect(self): + pass + + def email_notice(self): + message = self._get_message_for_condition() + message.send(self._get_contacts_for_condition()) + return True + def close_ticket(self): + if self.ticket: + self.ticket.closeTicket() + + def exempt_from_penalties(self): + bl = soltesz.dbLoad("l_blacklist") + return self.hostname in bl + + def penalties(self): + return [] + def escellate_penalty(self): + return True + def reduce_penalty(self): + return True + + + def atTarget(self): + return self.target.verify(self.data) + + def _get_condition(self): + return self.data['category'].lower() + + def _get_stage(self): + "improvement" + "firstnotice_noop" + "secondnotice_noslicecreation" + "thirdnotice_disableslices" + + delta = current_time - self.data['time'] + + def _get_message_for_condition(self): + pass + def _get_contacts_for_condition(self): + pass + +if __name__ == "__main__": + #r = RT() + #r.email("test", "body of test message", ['soltesz@cs.princeton.edu']) + from emailTxt import mailtxt + soltesz.dbDump("persistmessages", {}); + args = {'url_list': 'http://www.planet-lab.org/bootcds/planet1.usb\n','hostname': 'planet1','hostname_list': ' blahblah - days down\n'} + m = PersistMessage("blue", "test 1", mailtxt.newdown_one[1] % args, True) + m.send(['soltesz@cs.utk.edu']) + m = PersistMessage("blue", "test 1 - part 2", mailtxt.newalphacd_one[1] % args, True) + # TRICK timer to thinking some time has passed. + m.actiontracker.time = time.time() - 6*60*60*24 + m.send(['soltesz@cs.utk.edu']) diff --git a/vxargs.py b/vxargs.py new file mode 100755 index 0000000..fea9e37 --- /dev/null +++ b/vxargs.py @@ -0,0 +1,529 @@ +#!/usr/bin/env python + +# DHARMA Project +# Copyright (C) 2003-2004 Yun Mao, University of Pennsylvania +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of version 2.1 of the GNU Lesser General Public +# License as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + +""" +vxargs: Visualized xargs with redirected output + +""" +version = "0.3.1" +import os, sys, time, signal, errno +import curses, random +import getopt + +update_rate = 1 + +final_stats = {} +gsl = None +stopping = 0 + +def getListFromFile(f): + """I'll ignore the line starting with # + + @param f: file object of the host list file + @return: a list of hostnames (or IPs) + """ + hostlist = [] + for line in f: + if line[0]!='#': + if line.strip(): + hostlist.append([line.strip(),'']) + elif hostlist and hostlist[-1][1]=='': + hostlist[-1][1] = line.strip()[1:] + return hostlist + +def get_last_line(fn): + #equ to tail -n1 fn + try: + lines = open(fn,'r').readlines() + if len(lines)>0: + return (0, lines[-1].strip()) + except IOError: + pass + return (1,'') + +class Slot: + def __init__(self, outdir, num, screen, timeout, name, count): + self.outdir = outdir + self.slotnum = num + self.screen = screen + self.comment = "" + self.startTime = time.time() + self.timeout = timeout + self.name = name + self.count = count + + def drawLine(self, comment='', done = False): + if self.screen is None: return + if comment == '': + comment = self.comment + else: + self.comment = comment + stdscr = self.screen + elapsed = time.time()-self.startTime + try: + y,x = stdscr.getmaxyx() + spaces = ' '*x + stdscr.addstr(self.slotnum+2, 0, spaces) #title occupies two lines + if done: + stdscr.addstr(self.slotnum+2,0, comment[:x]) + else: + #construct the string + output = "(%3ds)%3d: %s " % ( round(elapsed), self.count, self.name ) + spaceleft = x - len(output) + if self.outdir and spaceleft>1: + outfn = os.path.join(self.outdir, '%s.out' % self.name) + errfn = os.path.join(self.outdir, '%s.err' % self.name) + lout = get_last_line(outfn) + lerr = get_last_line(errfn) + if lerr[0]==0 and lerr[1]: + output += lerr[1] + elif lout[0]==0 and lout[1]: + output += lout[1] + else: + output += comment + else: + output += comment + stdscr.addstr(self.slotnum+2, 0, output[:x] ) + stdscr.refresh() + except curses.error: #some of them will be out of screen, ignore it + pass + def update(self, pid): + self.drawLine() + if self.timeout >0: + self.kill(pid) + + def kill(self, pid): + overtime = time.time()-self.startTime - self.timeout + try: + if overtime > 3: #expired more than 3 seconds, send -9 + os.kill(-pid, signal.SIGKILL) + elif overtime > 2: #expired more than 2 seconds, send -15 + os.kill(-pid, signal.SIGTERM) + elif overtime >= 0: + os.kill(-pid, signal.SIGINT) + except OSError, e: + if e.errno != errno.ESRCH: # No such process + raise e + + def stop(self, pid): + """stop current pid b/c we caught SIGINT twice + """ + self.startTime = time.time() - self.timeout + self.kill(pid) + +class Slots: + pids = {} + def __init__(self, max, screen, timeout, outdir): + self.maxChild = max + self.slots = range(self.maxChild) + self.screen = screen + self.t = timeout + self.outdir = outdir + + def getSlot(self, name, count): + if not self.slots: + #it's empty, wait until other jobs finish + slot = self.waitJobs().slotnum + else: + slot = self.slots[0] + self.slots.remove(slot) + return Slot(self.outdir, slot, self.screen, self.t, name, count) + + def mapPID(self, pid, slot): + """@param slot: slot object + """ + self.pids[pid] = slot + + def waitJobs(self): + while 1: + try: + pid, status = os.wait() + break + except OSError, e: + if e.errno == errno.ECHILD: #no child processes + raise RuntimeError('no child processes when waiting') + slot = self.pids[pid] + if self.outdir: + open(os.path.join(self.outdir, '%s.status' % slot.name),'w').write('%d' % (status>>8)) + if (status & 0xFF) !=0: + open(os.path.join(self.outdir, 'killed_list'),'a').write('%s\n' % (slot.name)) + if status >>8: + open(os.path.join(self.outdir, 'abnormal_list'),'a').write('%s\n' % (slot.name)) + del self.pids[pid] + s = status >> 8 + if final_stats.has_key(s): + final_stats[s]+= 1 + else: + final_stats[s]=1 + return slot + def update(self): + for k,v in self.pids.items(): + v.update(k) + def timeout(self): + self.update() + signal.alarm(update_rate) + + def drawTitle(self, stuff): + if self.screen: + y,x = self.screen.getmaxyx() + spaces = ' '*(x*2) + self.screen.addstr(0,0, spaces) + self.screen.addstr(0,0, stuff[:x*2]) + self.screen.refresh() + else: + print stuff + def stop(self): + if stopping ==1: + msg = 'Stopping -- Waiting current jobs done. Press Ctrl-C again to kill current jobs.' + else: + msg = 'Stopping -- Killing current jobs' + self.drawTitle(msg) + if stopping >1: + for k,v in self.pids.items(): + v.stop(k) + return + +def handler(signum, frame_unused): + global gsl + if signum==signal.SIGALRM: + gsl.timeout() + if signum==signal.SIGINT: + global stopping + stopping += 1 + gsl.stop() + +def generateCommands(cmd_line, args): + return [per_arg.replace('{}', args[0]) for per_arg in cmd_line] + +def spawn(cmdline, outfn, errfn, setpgrp = False): + """A cleverer spawn that lets you redirect stdout and stderr to + outfn and errfn. Returns pid of child. + You can't do this with os.spawn, sadly. + """ + pid = os.fork() + if pid==0: #child + out = open(outfn, 'w') + os.dup2(out.fileno() ,sys.stdout.fileno()) + err = open(errfn, 'w') + os.dup2(err.fileno(), sys.stderr.fileno()) + if setpgrp: + os.setpgrp() + try: + os.execvp(cmdline[0], cmdline) + except OSError,e: + print >> sys.stderr, "error before execution:",e + sys.exit(255) + #father process + return pid + +def start(win, max_child, hlist, outdir, randomize, command_line, timeout): + + total = len(hlist) + + if randomize: + random.shuffle(hlist) + + signal.signal(signal.SIGALRM, handler) + signal.signal(signal.SIGINT, handler) + signal.alarm(update_rate) + + sl = Slots(max_child, win, timeout, outdir) + global gsl + global stopping + gsl = sl + count = 0 + for i in hlist: + slot = sl.getSlot(i[0], count) + if stopping>0: + slot.drawLine('Done', done=True) + break + + count += 1 + slot.drawLine(i[1]) + x = generateCommands(command_line, i) + + sl.drawTitle("%d/%d:%s" %(count, total,' '.join(x))) + + outpath = '/dev/null' + errpath = '/dev/null' + if outdir: + outpath = os.path.join(outdir, '%s.out'%i[0]) + errpath = os.path.join(outdir, '%s.err'%i[0]) + + pid = spawn(x, outpath, errpath, setpgrp = True) + sl.mapPID(pid, slot) + + while sl.pids: + try: + slot = sl.waitJobs() + except RuntimeError: + print >> sys.stderr, 'Warning: lost tracking of %d jobs' % len(sl.pids) + return + slot.drawLine('Done', done = True) #Done + +def get_output(outdir, argument_list, out= True, err=False, status=False): + """ + + For post processing the output dir. + + @param out: decide whether to process *.out files + @param err: decide whether to process *.err files + @param status: decide whether to process *.status files + + @return: (out, err, status): out is a hash table, in which the + keys are the arguments, and the values are the string of the + output, if available. err is similar. the values of hash table + status is the value of exit status in int. + + """ + if not out and not err and not status: + raise RuntimeError("one of out, err and status has to be True") + + result = ({},{},{}) + mapping = ('out','err','status') + p = [] + if out: p.append(0) + if err: p.append(1) + if status: p.append(2) + for arg in argument_list: + basefn = os.path.join(outdir, arg) + for i in p: + fn = '.'.join( (basefn, mapping[i]) ) #basefn.ext + try: + lines = open(fn).readlines() + result[i][arg]=''.join(lines) + except IOError: + pass + if not status: return result + int_status = {} + for k,v in result[2].items(): + try: + int_status[k] = int(v.strip()) + except ValueError: + pass + return result[0], result[1], int_status + +def main(): + options = 'hP:ra:o:yt:pn' + long_opts = ['help','max-procs=','randomize','args=','output=','noprompt','timeout=','plain', 'version','no-exec'] + try: + opts,args = getopt.getopt(sys.argv[1:], options,long_opts) + except getopt.GetoptError: + print "Unknown options" + usage() + sys.exit(1) + #set default values + ask_prompt = True + maxchild = 30 + randomize = False + hostfile = sys.stdin + outdir = '' + timeout = 0 + plain = False + no_exec = False + if os.environ.has_key('VXARGS_OUTDIR'): + outdir = os.environ['VXARGS_OUTDIR'] + for o,a in opts: + if o in ['--version']: + print "vxargs version",version + print "Copyright (c) 2004 Yun Mao (maoy@cis.upenn.edu)" + print "Freely distributed under GNU LGPL License" + sys.exit(1) + elif o in ['-h','--help']: + usage() + sys.exit(1) + elif o in ['-r','--randomize']: + randomize = True + elif o in ['-P','--max-procs']: + maxchild = int(a) + elif o in ['-a','--args']: + try: + hostfile = open(a,'r') + except IOError, e: + print "argument file %s has error: %s" % ( a, str(e) ) + sys.exit(3) + elif o in ['-o','--output']: + outdir = a + if a =='/dev/null': outdir = '' + elif o in ['-y','--noprompt']: + ask_prompt = False + elif o in ['-t','--timeout']: + timeout = int(a) + elif o in ['-p','--plain']: + plain = True + elif o in ['-n','--no-exec']: + no_exec = True + else: + print 'Unknown options' + usage() + sys.exit(1) + if len(args)<1: + print "No command given." + usage() + sys.exit(1) + #now test outdir + if outdir: + if os.path.exists(outdir): + if not os.path.isdir(outdir): + print "%s exists and is not a dir, won't continue" % outdir + sys.exit(3) + elif no_exec: + print "%s is the destination dir and would be destroyed." % (outdir) + elif ask_prompt: + if hostfile == sys.stdin: + print "You must specify --noprompt (-y) option if no --args (-a) or --no-exec (-n) is given. Doing so will destroy folder %s." % (outdir) + sys.exit(3) + else: + result = raw_input("%s exists. Continue will destroy everything in it. Are you sure? (y/n) " % (outdir)) + if result not in ['y','Y']: + sys.exit(3) + os.system('rm -f %s' % (os.path.join(outdir,'*'))) + else: + if not no_exec: + os.system('mkdir -p %s' % outdir) + + hlist = getListFromFile(hostfile) + if no_exec: + for i in hlist: + real_cmdline = generateCommands(args, i) + print ' '.join(real_cmdline) + sys.exit(0) + + if plain: # no fancy output + return start(None, maxchild, hlist, outdir, randomize, args, timeout) + else: + # use fancy curses-based animation + try: + curses.wrapper(start, maxchild, hlist, outdir, randomize, args, timeout) + except curses.error: + sys.exit(4) + #post execution, output some stats + total = 0 + for k,v in final_stats.items(): + print "exit code %d: %d job(s)" % (k,v) + total += v + print "total number of jobs:", total +def usage(): + print """\ +NAME + + vxargs - build and execute command lines from an argument list file + with visualization and parallelism, and output redirection. + +DESCRIPTION + + vxargs reads a list of arguments from a txt file or standard input, + delimited by newlines, and executes the command one or more times + with initial arguments in which {} is substituted by the arguments + read from the file or standard input. The current executing commands + and progress will be dynamically updated on the screen. Stdout and + stderr of each command will be redirected to separate files. A list + of all processes with a non-zero exit status is generated in file + abnormal_list. A list of all timeout processes is generated in file + killed_list. + +SYNOPSIS + + vxargs [OPTIONS] command [initial-arguments] + +OPTIONS + + --help + Print a summary of the options to vxargs and exit. + + --max-procs=max-procs, -P max-procs + Run up to max-procs processes at a time; the default is 30. + + --randomize, -r [OPTIONAL] + Randomize the host list before all execution. + + --args=filename, -a filename + The arguments file. If unspecified, the arguments will be read + from standard input, and -y option must be specified. + + --output=outdir, -o outdir + output directory for stdout and stderr files + The default value is specified by the environment variable VXARGS_OUTDIR. + If it is unspecified, both stdout and stderr will be redirected + to /dev/null. + Note that if the directory existed before execution, everything + inside will be wiped. + + --timeout=timeout, -t timeout + The maximal time in second for each command to execute. timeout=0 + means infinite. 0 (i.e. infinite) is the default value. When the time is up, + vxargs will send signal SIGINT to the process. If the process does not + stop after 2 seconds, vxargs will send SIGTERM signal, and send SIGKILL + if it still keeps running after 3 seconds. + + --noprompt, -y + Wipe out the outdir without confirmation. + + --no-exec, -n + Print the commands that would be executed, but do not execute them. + + --plain, -p + Don't use curses-based output, but plain output to stdout + instead. It will be less exciting, but will do the same job + effectively. It is useful if one wants to start vxargs from cron + or by another program that doesn't want to see the output. + By default, vxargs uses the curses-based output. + + --version + Display current version and copyright information. + +EXAMPLES: + Suppose the iplist.txt file has following content: +$ cat iplist.txt +216.165.109.79 +#planetx.scs.cs.nyu.edu +158.130.6.254 +#planetlab1.cis.upenn.edu +158.130.6.253 +#planetlab2.cis.upenn.edu +128.232.103.203 +#planetlab3.xeno.cl.cam.ac.uk + +Note that lines starting with '#' will be interpreted as comment for +the previous lines, which is optional, for visualization purpose only. + +$ vxargs -a iplist.txt -o /tmp/result -P 10 ssh upenn_dharma@{} "hostname;uptime" + +...[ UI output]... + +$ cat /tmp/result/* +planetlab3.xeno.cl.cam.ac.uk + 03:13:21 up 4 days, 14:36, 0 users, load average: 0.36, 0.44, 0.44 +planetlab2.cis.upenn.edu + 03:13:20 up 26 days, 16:19, 0 users, load average: 8.11, 7.41, 7.41 +planetlab1.cis.upenn.edu + 03:13:19 up 22 days, 20:02, 0 users, load average: 13.60, 12.55, 12.59 +ssh: connect to host 216.165.109.79 port 22: Connection timed out +$ + +other examples: +cat iplist.txt | vxargs -o /tmp/result rsync -az -e ssh --delete mirror $SLICE@{}: + +vxargs -a iplist.txt -o /tmp/result ssh {} killall -9 java + +For more information, please visit http://dharma.cis.upenn.edu/planetlab/vxargs/ +""" +if __name__=='__main__': + main() +