X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=monitor.py;fp=monitor.py;h=d876dc393ffd5ea59d6dbb493d040f9dd86b080b;hb=8424072ea9faa9afaee496c039e3f626b5b36e41;hp=ddc3722a2d6d55eaf34957caa9cc0bc8255337c6;hpb=00613fa5e158819b417bd8fa016e73b965594f0e;p=monitor.git diff --git a/monitor.py b/monitor.py index ddc3722..d876dc3 100644 --- a/monitor.py +++ b/monitor.py @@ -2,224 +2,53 @@ # # Copyright (c) 2004 The Trustees of Princeton University (Trustees). # -# Faiyaz Ahmed # Stephen Soltesz # # $Id: monitor.py,v 1.7 2007/07/03 19:59:02 soltesz Exp $ -import sys -import os -import getopt -import thread -from threading import * -import time -import logging -import Queue -from sets import Set -# Global config options -from config import config -# daemonize and *pid -from util.process import * - -# Comon DB -import comon -# RT tickets -import rt -# Correlates input with policy to form actions -import policy import soltesz -import plc - -# Log to what -LOG="./monitor.log" - -# Time to refresh DB and remove unused entries -RTSLEEP=7200 #2hrs -# Time between policy enforce/update -#POLSLEEP=43200 #12hrs -POLSLEEP=10 - -# Global list of all running threads. Any threads added to -# list will be monitored. -runningthreads = {} -# Seconds between checking threads -WATCHSLEEP = 10 - -# Set up Logging -logger = logging.getLogger("monitor") -logger.setLevel(logging.DEBUG) -fh = logging.FileHandler(LOG, mode = 'a') -fh.setLevel(logging.DEBUG) -formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') -fh.setFormatter(formatter) -logger.addHandler(fh) - - -""" -Launches threads and adds them to the runningthreads global list. -Assigns name for thread, starts. -""" -def startThread(fnct, name): - runningthreads[name] = fnct - runningthreads[name].setName(name) - try: - logger.info("Starting thread " + name) - runningthreads[name].start() - except Exception, err: - logger.error("Thread: " + name + " " + error) - - -""" -Watches threads and catches exceptions. Each launched thread is -watched and state is logged. -""" -class ThreadWatcher(Thread): - def __init__(self): - Thread.__init__(self) - - def run(self): - while 1: - self.checkThreads() - time.sleep(WATCHSLEEP) - - def checkThreads(self): - # Iterate through treads, compare with last running. - for thread in runningthreads.keys(): - # If thread found dead, remove from queue - #print "found %s" % thread - if not runningthreads[thread].isAlive(): - logger.error("***********Thread died: %s**********" %(thread)) - del runningthreads[thread] - return len(runningthreads.keys()) - - -class Dummy(Thread): - def __init__(self): - Thread.__init__(self) - - def run(self): - time.sleep(5) - -def dict_from_nodelist(nl): - d = {} - for host in nl: - h = host['hostname'] - d[h] = host - return d -""" -Start threads, do some housekeeping, then daemonize. -""" -def main(): - # Defaults - global status, logger - global config - - #if not debug: - # daemonize() - # writepid("monitor") - - config = config() - #config.parse_args() - - logger.info('Monitor Started') - ########## VARIABLES ######################################## - # Nodes to check. Queue of all sick nodes. - toCheck = Queue.Queue() - # Nodes that are sick w/o tickets - sickNoTicket = Queue.Queue() - # Comon DB of all nodes - cdb = {} - # RT DB - tickets = {} - # Nodes we've emailed. - # host - > (type of email, time) - emailed = {} +from monitor_policy import * - ######### GET NODES ######################################## - # TODO: get authoritative node list from PLC every PLCSLEEP seconds, - # feed this into Comon. - l_plcnodes = soltesz.if_cached_else(config.cachenodes, - "l_plcnodes", - lambda : plc.getNodes({'peer_id':None})) +import plc +import auth +api = plc.PLC(auth.auth, auth.plc) - s_plcnodes = Set([x['hostname'] for x in l_plcnodes]) +def reboot(hostname): - # List of nodes from a user-provided file. - if config.nodelist: - file = config.nodelist - nodelist = config.getListFromFile(file) - l_nodelist = [] - print "Getting node info for hosts in: %s" % file - for nodename in nodelist: - if config.debug: print ".", ; sys.stdout.flush() - l_nodelist += plc.getNodes({'hostname': nodename, 'peer_id':None}) - if config.debug: print "" + l_nodes = api.GetNodes(hostname) + if len(l_nodes) == 0: + raise Exception("No such host: %s" % hostname) - s_usernodes = Set(nodelist) - # nodes from PLC and in the user list. - s_safe_usernodes = s_plcnodes & s_usernodes - s_unsafe_usernodes = s_usernodes - s_plcnodes - if len(s_unsafe_usernodes) > 0 : - for node in s_unsafe_usernodes: - print "WARNING: User provided: %s but not found in PLC" % node - - l_nodes = filter(lambda x: x['hostname'] in s_safe_usernodes,l_plcnodes) - else: - l_nodes = l_plcnodes - - # Minus blacklisted ones.. l_blacklist = soltesz.if_cached_else(1, "l_blacklist", lambda : []) l_ticket_blacklist = soltesz.if_cached_else(1,"l_ticket_blacklist",lambda : []) - l_wl_nodes = filter(lambda x : not x['hostname'] in l_blacklist, l_nodes) - # A handy dict of hostname-to-nodestruct mapping - d_allplc_nodes = dict_from_nodelist(l_wl_nodes) - - ####### RT tickets ######################################### - t = soltesz.MyTimer() - ad_dbTickets = soltesz.if_cached_else(config.cachert, "ad_dbTickets", rt.rt_tickets) - print "Getting tickets from RT took: %f sec" % t.diff() ; del t - # TODO: get input nodes from findbad database, pipe into toCheck - cm1 = read_findbad_db(d_allplc_nodes, toCheck) + l_nodes = filter(lambda x : not x['hostname'] in l_blacklist, l_nodes) + if len(l_nodes) == 0: + raise Exception("Host removed via blacklist: %s" % hostname) + + ad_dbTickets = soltesz.if_cached_else_refresh(True, False, "ad_dbTickets", lambda : None) + if ad_dbTickets == None: + raise Exception("Could not find cached dbTickets") + + #print "merge" + merge = Merge( [node['hostname'] for node in l_nodes]) + record_list = merge.run() + #print "rt" + rt = RT(record_list, ad_dbTickets, l_ticket_blacklist) + record_list = rt.run() + #print "diagnose" + diag = Diagnose(record_list) + diagnose_out = diag.run() + #print diagnose_out + #print "action" + action = Action(diagnose_out) + action.run() + + return True - # Search for toCheck nodes in the RT db. - rt1 = rt.RT(ad_dbTickets, tickets, toCheck, sickNoTicket, l_ticket_blacklist) - # Kind of a hack. Cleans the DB for stale entries and updates db. - # (UNTESTED) - # rt5 = rt.RT(ad_dbTickets, tickets, toCheck, sickNoTicket) - # clean = Thread(target=rt5.cleanTickets) - - startThread(rt1,"rt1") - # startThread(rt5,"rt5") - # startThread(clean,"cleanrt5") - - # Actually digest the info and do something with it. - pol = policy.Policy(cm1, sickNoTicket, emailed) - # Start Sending Emails - startThread(pol, "policy") - - - tw = ThreadWatcher() - while True: - if tw.checkThreads() == 0: - break - time.sleep(WATCHSLEEP) - - logger.info('Monitor Exitting') - #if not debug: - # removepid("monitor") +def main(): + pass - # Store state of emails - #pol.emailedStore("WRITE") - soltesz.dbDump("ad_dbTickets") - sys.exit(0) - if __name__ == '__main__': - try: - main() - except KeyboardInterrupt: - print "Killed. Exitting." - logger.info('Monitor Killed') - #soltesz.dbDump("ad_dbTickets") - sys.exit(0) + main()