From: Stephen Soltesz Date: Wed, 8 Aug 2007 13:36:46 +0000 (+0000) Subject: + findbad.py: this actively probes all machines in the PLC db, using ping, X-Git-Tag: Monitor-1.0-0~65 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=8217a88623189c3034d38fdc4eae8fe575ae8624;p=monitor.git + findbad.py: this actively probes all machines in the PLC db, using ping, ssh, and then various commands on the machine to determine the actual bootstate. These records are saved to disk for diagnose.py + diagnose.py: reads entries from findbad and previous actions, merging the two together to determine if machines have improved, or gotten worse. All actions to be performed are recorded and written to a diagnose_out pickle file for action.py + action.py: reads the diagnose_out file from diagnose.py and performs the actions. It permanently records the resuls in act_all pickle file. These three in combination are Monitor. --- diff --git a/action.py b/action.py new file mode 100755 index 0000000..269007e --- /dev/null +++ b/action.py @@ -0,0 +1,203 @@ +#!/usr/bin/python +# +# Copyright (c) 2004 The Trustees of Princeton University (Trustees). +# +# Faiyaz Ahmed +# Stephen Soltesz +# +# $Id$ + +import sys +from threading import * +import time +import logging +import Queue +from sets import Set + +# Global config options +from config import config +from optparse import OptionParser +parser = OptionParser() + +parser.set_defaults(nodelist=None, + cachert=False, + cachenodes=False, + blacklist=None, + ticketlist=None) + +parser.add_option("", "--nodelist", dest="nodelist", + help="Read nodes to act on from specified file") +parser.add_option("", "--cachert", action="store_true", + help="Cache the RT database query") +parser.add_option("", "--cachenodes", action="store_true", + help="Cache node lookup from PLC") +parser.add_option("", "--ticketlist", dest="ticketlist", + help="Whitelist all RT tickets in this file") +parser.add_option("", "--blacklist", dest="blacklist", + help="Blacklist all nodes in this file") + +config = config(parser) +config.parse_args() + +# daemonize and *pid +#from util.process import * + +# RT tickets +import rt +# Correlates input with policy to form actions +import policy +import soltesz +import plc + +# Log to what +LOG="./monitor.log" + +# Time to refresh DB and remove unused entries +RTSLEEP=7200 #2hrs +# Time between policy enforce/update +#POLSLEEP=43200 #12hrs +POLSLEEP=10 + +# Global list of all running threads. Any threads added to +# list will be monitored. +runningthreads = {} +# Seconds between checking threads +WATCHSLEEP = 10 + +# Set up Logging +logger = logging.getLogger("monitor") +logger.setLevel(logging.DEBUG) +fh = logging.FileHandler(LOG, mode = 'a') +fh.setLevel(logging.DEBUG) +formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') +fh.setFormatter(formatter) +logger.addHandler(fh) + + +""" +Launches threads and adds them to the runningthreads global list. +Assigns name for thread, starts. +""" +def startThread(fnct, name): + runningthreads[name] = fnct + runningthreads[name].setName(name) + try: + logger.info("Starting thread " + name) + runningthreads[name].start() + except Exception, err: + logger.error("Thread: " + name + " " + error) + + +""" +Watches threads and catches exceptions. Each launched thread is +watched and state is logged. +""" +class ThreadWatcher(Thread): + def __init__(self): + Thread.__init__(self) + + def run(self): + while 1: + self.checkThreads() + time.sleep(WATCHSLEEP) + + def checkThreads(self): + # Iterate through treads, compare with last running. + for thread in runningthreads.keys(): + # If thread found dead, remove from queue + #print "found %s" % thread + if not runningthreads[thread].isAlive(): + logger.error("***********Thread died: %s**********" %(thread)) + del runningthreads[thread] + return len(runningthreads.keys()) + + +class Dummy(Thread): + def __init__(self): + Thread.__init__(self) + + def run(self): + time.sleep(5) + +def dict_from_nodelist(nl): + d = {} + for host in nl: + h = host['hostname'] + d[h] = host + return d + +""" +Start threads, do some housekeeping, then daemonize. +""" +def main(): + # Defaults + global status, logger + global config + + logger.info('Action Started') + print 'Action Started' + + ######### GET NODES ######################################## + logger.info('Get Nodes from PLC') + print "getnode from plc" + l_plcnodes = soltesz.if_cached_else(True, + "l_plcnodes", + lambda : plc.getNodes({'peer_id':None})) + + s_plcnodenames = Set([x['hostname'] for x in l_plcnodes]) + + # List of nodes from a user-provided file. + if config.nodelist: + file = config.nodelist + nodelist = config.getListFromFile(file) + #for node in nodelist: + # print "%s" % node + + s_usernodes = Set(nodelist) + # SAFE nodes are in PLC and the list + s_safe_usernodes = s_plcnodenames & s_usernodes + # UNSAFE nodes are in list but not in PLC. i.e. ignore them. + s_unsafe_usernodes = s_usernodes - s_plcnodenames + if len(s_unsafe_usernodes) > 0 : + for node in s_unsafe_usernodes: + print "WARNING: User provided: %s but not found in PLC" % node + + l_nodes = filter(lambda x: x['hostname'] in s_safe_usernodes,l_plcnodes) + else: + l_nodes = l_plcnodes + + print "len of l_nodes: %d" % len(l_nodes) + # Minus blacklisted ones.. + l_ticket_blacklist = soltesz.if_cached_else(1,"l_ticket_blacklist",lambda : []) + + l_blacklist = soltesz.if_cached_else(1, "l_blacklist", lambda : []) + l_nodes = filter(lambda x : not x['hostname'] in l_blacklist, l_nodes) + + ####### Get RT tickets ######################################### + #logger.info('Get Tickets from RT') + #t = soltesz.MyTimer() + #ad_dbTickets = soltesz.if_cached_else(config.cachert, "ad_dbTickets", rt.rt_tickets) + #print "Getting tickets from RT took: %f sec" % t.diff() ; del t + + logger.info('Start Action thread') + ####### Action + action = policy.Action( [node['hostname'] for node in l_nodes] ) + startThread(action,"action") + + + tw = ThreadWatcher() + while True: + if tw.checkThreads() == 0: + break + time.sleep(WATCHSLEEP) + + logger.info('Action Exitting') + sys.exit(0) + +if __name__ == '__main__': + try: + main() + except KeyboardInterrupt: + print "Killed. Exitting." + logger.info('Action Killed') + sys.exit(0) diff --git a/diagnose.py b/diagnose.py new file mode 100755 index 0000000..70bdc38 --- /dev/null +++ b/diagnose.py @@ -0,0 +1,218 @@ +#!/usr/bin/python +# +# Copyright (c) 2004 The Trustees of Princeton University (Trustees). +# +# Faiyaz Ahmed +# Stephen Soltesz +# +# $Id$ + +import sys +from threading import * +import time +import logging +import Queue +from sets import Set + +# Global config options +from config import config +from optparse import OptionParser +parser = OptionParser() + +parser.set_defaults(nodelist=None, + cachert=False, + cachenodes=False, + blacklist=None, + ticketlist=None) + +parser.add_option("", "--nodelist", dest="nodelist", metavar="filename", + help="Read nodes to act on from specified file") +parser.add_option("", "--cachert", action="store_true", + help="Cache the RT database query") +parser.add_option("", "--cachenodes", action="store_true", + help="Cache node lookup from PLC") +parser.add_option("", "--ticketlist", dest="ticketlist", + help="Whitelist all RT tickets in this file") +parser.add_option("", "--blacklist", dest="blacklist", + help="Blacklist all nodes in this file") + +config = config(parser) +print "bcalling parse_args" +config.parse_args() + +# daemonize and *pid +#from util.process import * + +# RT tickets +import rt +# Correlates input with policy to form actions +import policy +import soltesz +import plc +import syncplcdb + +# Log to what +LOG="./monitor.log" + +# Time to refresh DB and remove unused entries +RTSLEEP=7200 #2hrs +# Time between policy enforce/update +#POLSLEEP=43200 #12hrs +POLSLEEP=10 + +# Global list of all running threads. Any threads added to +# list will be monitored. +runningthreads = {} +# Seconds between checking threads +WATCHSLEEP = 10 + +# Set up Logging +logger = logging.getLogger("monitor") +logger.setLevel(logging.DEBUG) +fh = logging.FileHandler(LOG, mode = 'a') +fh.setLevel(logging.DEBUG) +formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') +fh.setFormatter(formatter) +logger.addHandler(fh) + + +""" +Launches threads and adds them to the runningthreads global list. +Assigns name for thread, starts. +""" +def startThread(fnct, name): + runningthreads[name] = fnct + runningthreads[name].setName(name) + try: + logger.info("Starting thread " + name) + runningthreads[name].start() + except Exception, err: + logger.error("Thread: " + name + " " + error) + + +""" +Watches threads and catches exceptions. Each launched thread is +watched and state is logged. +""" +class ThreadWatcher(Thread): + def __init__(self): + Thread.__init__(self) + + def run(self): + while 1: + self.checkThreads() + time.sleep(WATCHSLEEP) + + def checkThreads(self): + # Iterate through treads, compare with last running. + for thread in runningthreads.keys(): + # If thread found dead, remove from queue + #print "found %s" % thread + if not runningthreads[thread].isAlive(): + logger.error("***********Thread died: %s**********" %(thread)) + del runningthreads[thread] + return len(runningthreads.keys()) + + +class Dummy(Thread): + def __init__(self): + Thread.__init__(self) + + def run(self): + time.sleep(5) + +def dict_from_nodelist(nl): + d = {} + for host in nl: + h = host['hostname'] + d[h] = host + return d + + + +""" +Start threads, do some housekeeping, then daemonize. +""" +def main(): + # Defaults + global status, logger + global config + + logger.info('Diagnose Started') + print 'Diagnose Started' + + ########## VARIABLES ######################################## + # Queue between Merge and RT + toRT = Queue.Queue() + + # Queue between RT and Diagnose + fromRT = Queue.Queue() + + ######### GET NODES ######################################## + logger.info('Get Nodes from PLC') + print "getnode from plc" + l_plcnodes = soltesz.if_cached_else(config.cachenodes, "l_plcnodes", + lambda : syncplcdb.create_plcdb() ) + + s_plcnodenames = Set([x['hostname'] for x in l_plcnodes]) + + # List of nodes from a user-provided file. + if config.nodelist: + file = config.nodelist + nodelist = config.getListFromFile(file) + + s_usernodes = Set(nodelist) + # SAFE nodes are in PLC and the list + s_safe_usernodes = s_plcnodenames & s_usernodes + # UNSAFE nodes are list but not in PLC. i.e. do not monitor. + s_unsafe_usernodes = s_usernodes - s_plcnodenames + if len(s_unsafe_usernodes) > 0 : + for node in s_unsafe_usernodes: + print "WARNING: User provided: %s but not found in PLC" % node + + l_nodes = filter(lambda x: x['hostname'] in s_safe_usernodes,l_plcnodes) + else: + l_nodes = l_plcnodes + + print "len of l_nodes: %d" % len(l_nodes) + # Minus blacklisted ones.. + l_blacklist = soltesz.if_cached_else(1, "l_blacklist", lambda : []) + l_ticket_blacklist = soltesz.if_cached_else(1,"l_ticket_blacklist",lambda : []) + l_nodes = filter(lambda x : not x['hostname'] in l_blacklist, l_nodes) + + logger.info('Get Tickets from RT') + ####### RT tickets ######################################### + t = soltesz.MyTimer() + ad_dbTickets = soltesz.if_cached_else(config.cachert, "ad_dbTickets", rt.rt_tickets) + print "Getting tickets from RT took: %f sec" % t.diff() ; del t + + logger.info('Start Merge/RT/Diagnose threads') + ####### Merge + # only look at nodes in l_nodes + merge = policy.Merge( [node['hostname'] for node in l_nodes], toRT) + startThread(merge,"merge") + ####### RT + rt1 = rt.RT(ad_dbTickets, toRT, fromRT, l_ticket_blacklist) + startThread(rt1,"rt1") + ####### Diagnose + diagnose = policy.Diagnose(fromRT) + startThread(diagnose,"diagnose") + + + tw = ThreadWatcher() + while True: + if tw.checkThreads() == 0: + break + print "waiting... %s" % time.time() + time.sleep(WATCHSLEEP) + + logger.info('Diagnose Exitting') + sys.exit(0) + +if __name__ == '__main__': + try: + main() + except KeyboardInterrupt: + print "Killed. Exitting." + logger.info('Diagnose Killed') + sys.exit(0) diff --git a/findbad.py b/findbad.py new file mode 100755 index 0000000..d169c4a --- /dev/null +++ b/findbad.py @@ -0,0 +1,237 @@ +#!/usr/bin/python + +import os +import sys +import string +import time +import soltesz +import plc +import comon +import threadpool + +from config import config +from optparse import OptionParser +parser = OptionParser() +parser.set_defaults(filename="", increment=False, dbname="findbadnodes") +parser.add_option("-f", "--nodes", dest="filename", metavar="FILE", + help="Provide the input file for the node list") +parser.add_option("", "--dbname", dest="dbname", metavar="FILE", + help="Specify the name of the database to which the information is saved") +parser.add_option("-i", "--increment", action="store_true", dest="increment", + help="Increment round number to force refresh or retry") +config = config(parser) +config.parse_args() + +# QUERY all nodes. +COMON_COTOPURL= "http://summer.cs.princeton.edu/status/tabulator.cgi?" + \ + "table=table_nodeview&" + \ + "dumpcols='name,resptime,sshstatus,uptime,lastcotop'&" + \ + "formatcsv" + #"formatcsv&" + \ + #"select='lastcotop!=0'" + +round = 1 +externalState = {'round': round, 'nodes': {}} +count = 0 + +def collectPingAndSSH(nodename, cohash): + ### RUN PING ###################### + ping = soltesz.CMD() + (oval,eval) = ping.run_noexcept("ping -c 1 -q %s | grep rtt" % nodename) + + values = {} + + if oval == "": + # An error occurred + values['ping'] = "NOPING" + else: + values['ping'] = "PING" + + ### RUN SSH ###################### + b_getbootcd_id = True + ssh = soltesz.SSH('root', nodename) + oval = "" + eval = "" + (oval, eval) = ssh.run_noexcept('echo `uname -a ; ls /tmp/bm.log`') + val = oval + if "2.6.17" in oval or "2.6.20" in oval: + values['ssh'] = 'SSH' + if "bm.log" in oval: + values['category'] = 'ALPHA' + values['state'] = 'DEBUG' + else: + values['category'] = 'ALPHA' + values['state'] = 'BOOT' + elif "2.6.12" in oval or "2.6.10" in oval: + values['ssh'] = 'SSH' + values['category'] = 'PROD' + if "bm.log" in oval: + values['state'] = 'DEBUG' + else: + values['state'] = 'BOOT' + elif "2.4" in oval: + b_getbootcd_id = False + values['ssh'] = 'SSH' + values['category'] = 'OLDBOOTCD' + values['state'] = 'DEBUG' + elif oval != "": + values['ssh'] = 'SSH' + values['category'] = 'UNKNOWN' + if "bm.log" in oval: + values['state'] = 'DEBUG' + else: + values['state'] = 'BOOT' + else: + # An error occurred. + b_getbootcd_id = False + values['ssh'] = 'NOSSH' + values['category'] = 'ERROR' + values['state'] = 'DOWN' + val = eval.strip() + + values['kernel'] = val + + if b_getbootcd_id: + # try to get BootCD for all nodes that are not 2.4 nor inaccessible + (oval, eval) = ssh.run_noexcept('cat /mnt/cdrom/bootme/ID') + val = oval + if "BootCD" in val: + values['bootcd'] = val + if "v2" in val: + values['category'] = 'OLDBOOTCD' + else: + values['bootcd'] = "" + else: + values['bootcd'] = "" + + # TODO: get bm.log for debug nodes. + # 'zcat /tmp/bm.log' + + values['comonstats'] = cohash[nodename] + # include output value + ### GET PLC NODE ###################### + d_node = plc.getNodes({'hostname': nodename}) + site_id = -1 + if d_node and len(d_node) > 0: + pcu = d_node[0]['pcu_ids'] + if len(pcu) > 0: + values['pcu'] = "PCU" + else: + values['pcu'] = "NOPCU" + site_id = d_node[0]['site_id'] + values['plcnode'] = {'status' : 'SUCCESS', 'pcu_ids': pcu, 'site_id': site_id} + else: + values['pcu'] = "UNKNOWN" + values['plcnode'] = {'status' : "GN_FAILED"} + + + ### GET PLC SITE ###################### + d_site = plc.getSites({'site_id': site_id}) + if d_site and len(d_site) > 0: + max_slices = d_site[0]['max_slices'] + num_slices = len(d_site[0]['slice_ids']) + num_nodes = len(d_site[0]['node_ids']) + loginbase = d_site[0]['login_base'] + values['plcsite'] = {'num_nodes' : num_nodes, + 'max_slices' : max_slices, + 'num_slices' : num_slices, + 'login_base' : loginbase, + 'status' : 'SUCCESS'} + else: + values['plcsite'] = {'status' : "GS_FAILED"} + + return (nodename, values) + +def recordPingAndSSH(request, result): + global externalState + global count + (nodename, values) = result + + global_round = externalState['round'] + externalState['nodes'][nodename]['values'] = values + externalState['nodes'][nodename]['round'] = global_round + + count += 1 + print "%d %s %s" % (count, nodename, externalState['nodes'][nodename]['values']) + soltesz.dbDump(config.dbname, externalState) + +# this will be called when an exception occurs within a thread +def handle_exception(request, result): + print "Exception occured in request %s" % request.requestID + for i in result: + print "Result: %s" % i + + +def checkAndRecordState(l_nodes, cohash): + global externalState + global count + global_round = externalState['round'] + + tp = threadpool.ThreadPool(20) + + # CREATE all the work requests + for nodename in l_nodes: + if nodename not in externalState['nodes']: + externalState['nodes'][nodename] = {'round': 0, 'values': []} + + node_round = externalState['nodes'][nodename]['round'] + if node_round < global_round: + # recreate node stats when refreshed + #print "%s" % nodename + req = threadpool.WorkRequest(collectPingAndSSH, [nodename, cohash], {}, + None, recordPingAndSSH, handle_exception) + tp.putRequest(req) + else: + # We just skip it, since it's "up to date" + count += 1 + print "%d %s %s" % (count, nodename, externalState['nodes'][nodename]['values']) + pass + + # WAIT while all the work requests are processed. + while 1: + try: + time.sleep(1) + tp.poll() + except KeyboardInterrupt: + print "Interrupted!" + break + except threadpool.NoResultsPending: + print "All results collected." + break + + + +def main(): + global externalState + + externalState = soltesz.if_cached_else(1, config.dbname, lambda : externalState) + + if config.increment: + # update global round number to force refreshes across all nodes + externalState['round'] += 1 + + cotop = comon.Comon() + # lastcotop measures whether cotop is actually running. this is a better + # metric than sshstatus, or other values from CoMon + cotop_url = COMON_COTOPURL + + cohash = cotop.coget(cotop_url) + + if config.filename == "": + l_nodes = cohash.keys() + else: + l_nodes = config.getListFromFile(config.filename) + + checkAndRecordState(l_nodes, cohash) + + return 0 + + +if __name__ == '__main__': + try: + main() + except Exception, err: + print "Exception: %s" % err + print "Saving data... exitting." + soltesz.dbDump(config.dbname, externalState) + sys.exit(0)