X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=monitor.py;h=ddc3722a2d6d55eaf34957caa9cc0bc8255337c6;hb=582fe9dcdb6e7057e5e8e1f1aa729772fe25a321;hp=4b80d9a07094fed9c2a296448616b78357df7276;hpb=e31f3074815c6980b8ea7f63ec996f1d9b52a776;p=monitor.git diff --git a/monitor.py b/monitor.py index 4b80d9a..ddc3722 100644 --- a/monitor.py +++ b/monitor.py @@ -3,8 +3,9 @@ # Copyright (c) 2004 The Trustees of Princeton University (Trustees). # # Faiyaz Ahmed +# Stephen Soltesz # -# $Id: $ +# $Id: monitor.py,v 1.7 2007/07/03 19:59:02 soltesz Exp $ import sys import os @@ -14,6 +15,9 @@ from threading import * import time import logging import Queue +from sets import Set +# Global config options +from config import config # daemonize and *pid from util.process import * @@ -23,29 +27,12 @@ import comon import rt # Correlates input with policy to form actions import policy -# Email -import mailer -import emailTxt -# Defaults -debug = False +import soltesz +import plc # Log to what LOG="./monitor.log" -# DAT -DAT="./monitor.dat" - -# Email defaults -MTA="localhost" -FROM="support@planet-lab.org" -TECHEMAIL="tech-%s@sites.planet-lab.org" -PIEMAIL="pi-%s@sites.planet-lab.org" - -# API -XMLRPC_SERVER = 'https://www.planet-lab.org/PLCAPI/' - -# Time between comon refresh -COSLEEP=300 #5mins # Time to refresh DB and remove unused entries RTSLEEP=7200 #2hrs # Time between policy enforce/update @@ -67,16 +54,6 @@ formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') fh.setFormatter(formatter) logger.addHandler(fh) -def usage(): - print """ -Usage: %s [OPTIONS]... - -Options: - -d, --debug Enable debugging (default: %s) - --status Print memory usage statistics and exit - -h, --help This message -""".lstrip() % (sys.argv[0], debug) - """ Launches threads and adds them to the runningthreads global list. @@ -109,9 +86,11 @@ class ThreadWatcher(Thread): # Iterate through treads, compare with last running. for thread in runningthreads.keys(): # If thread found dead, remove from queue + #print "found %s" % thread if not runningthreads[thread].isAlive(): - logger.error("Thread Died: %s" %(thread)) + logger.error("***********Thread died: %s**********" %(thread)) del runningthreads[thread] + return len(runningthreads.keys()) class Dummy(Thread): @@ -121,109 +100,120 @@ class Dummy(Thread): def run(self): time.sleep(5) +def dict_from_nodelist(nl): + d = {} + for host in nl: + h = host['hostname'] + d[h] = host + return d """ Start threads, do some housekeeping, then daemonize. """ def main(): # Defaults - global debug, status, logger - - try: - longopts = ["debug", "status", "help"] - (opts, argv) = getopt.getopt(sys.argv[1:], "dvf:s:ph", longopts) - except getopt.GetoptError, err: - print "Error: " + err.msg - usage() - sys.exit(1) - - for (opt, optval) in opts: - if opt == "-d" or opt == "--debug": - debug = True - elif opt == "--status": - #print summary(names) - sys.exit(0) - else: - usage() - sys.exit(0) + global status, logger + global config #if not debug: # daemonize() # writepid("monitor") - # Init stuff. Watch Threads to see if they die. Perhaps send email? - logger.info('Monitor Started') - startThread(ThreadWatcher(), "Watcher") - # The meat of it. + config = config() + #config.parse_args() + logger.info('Monitor Started') + ########## VARIABLES ######################################## # Nodes to check. Queue of all sick nodes. - toCheck = Queue.Queue() + toCheck = Queue.Queue() # Nodes that are sick w/o tickets sickNoTicket = Queue.Queue() # Comon DB of all nodes cdb = {} - # Nodes that are down. Use this to maintain DB; cleanup. - #alldown = Queue.Queue() # RT DB - tickets = {} + tickets = {} # Nodes we've emailed. # host - > (type of email, time) emailed = {} + ######### GET NODES ######################################## + # TODO: get authoritative node list from PLC every PLCSLEEP seconds, + # feed this into Comon. + l_plcnodes = soltesz.if_cached_else(config.cachenodes, + "l_plcnodes", + lambda : plc.getNodes({'peer_id':None})) + + s_plcnodes = Set([x['hostname'] for x in l_plcnodes]) + + # List of nodes from a user-provided file. + if config.nodelist: + file = config.nodelist + nodelist = config.getListFromFile(file) + l_nodelist = [] + print "Getting node info for hosts in: %s" % file + for nodename in nodelist: + if config.debug: print ".", ; sys.stdout.flush() + l_nodelist += plc.getNodes({'hostname': nodename, 'peer_id':None}) + if config.debug: print "" + + s_usernodes = Set(nodelist) + # nodes from PLC and in the user list. + s_safe_usernodes = s_plcnodes & s_usernodes + s_unsafe_usernodes = s_usernodes - s_plcnodes + if len(s_unsafe_usernodes) > 0 : + for node in s_unsafe_usernodes: + print "WARNING: User provided: %s but not found in PLC" % node + + l_nodes = filter(lambda x: x['hostname'] in s_safe_usernodes,l_plcnodes) + else: + l_nodes = l_plcnodes + + # Minus blacklisted ones.. + l_blacklist = soltesz.if_cached_else(1, "l_blacklist", lambda : []) + l_ticket_blacklist = soltesz.if_cached_else(1,"l_ticket_blacklist",lambda : []) + l_wl_nodes = filter(lambda x : not x['hostname'] in l_blacklist, l_nodes) + # A handy dict of hostname-to-nodestruct mapping + d_allplc_nodes = dict_from_nodelist(l_wl_nodes) + + ####### RT tickets ######################################### + t = soltesz.MyTimer() + ad_dbTickets = soltesz.if_cached_else(config.cachert, "ad_dbTickets", rt.rt_tickets) + print "Getting tickets from RT took: %f sec" % t.diff() ; del t + + # TODO: get input nodes from findbad database, pipe into toCheck + cm1 = read_findbad_db(d_allplc_nodes, toCheck) + + # Search for toCheck nodes in the RT db. + rt1 = rt.RT(ad_dbTickets, tickets, toCheck, sickNoTicket, l_ticket_blacklist) + # Kind of a hack. Cleans the DB for stale entries and updates db. + # (UNTESTED) + # rt5 = rt.RT(ad_dbTickets, tickets, toCheck, sickNoTicket) + # clean = Thread(target=rt5.cleanTickets) - # Get RT Tickets. - # Event based. Add to queue(toCheck) and hosts are queried. - rt1 = rt.RT(tickets, toCheck, sickNoTicket) - rt2 = rt.RT(tickets, toCheck, sickNoTicket) - rt3 = rt.RT(tickets, toCheck, sickNoTicket) - rt4 = rt.RT(tickets, toCheck, sickNoTicket) - rt5 = rt.RT(tickets, toCheck, sickNoTicket) - # Kind of a hack. Cleans the DB for stale entries and updates db. - clean = Thread(target=rt5.cleanTickets) - # Poll Comon. Refreshes Comon data every COSLEEP seconds - cm1 = comon.Comon(cdb, toCheck) + startThread(rt1,"rt1") + # startThread(rt5,"rt5") + # startThread(clean,"cleanrt5") # Actually digest the info and do something with it. pol = policy.Policy(cm1, sickNoTicket, emailed) - - # Load emailed sites from last run. - pol.emailedStore("LOAD") - - # Start Threads - startThread(rt1,"rt1") - startThread(rt2,"rt2") - startThread(rt3,"rt3") - startThread(rt4,"rt4") - startThread(rt5,"rt5") - startThread(clean,"cleanrt5") - - # Start Comon Thread - startThread(cm1,"comon") - - # Wait for threads to init. Probably should join, but work on that later. - time.sleep(10) - # Start Sending Emails startThread(pol, "policy") - # Wait to finish - while (sickNoTicket.empty() == False) or (toCheck.empty() == False): - time.sleep(15) - + tw = ThreadWatcher() + while True: + if tw.checkThreads() == 0: + break + time.sleep(WATCHSLEEP) - pol.status() - - # Store state of emails - pol.emailedStore("WRITE") - - # Email what we did. - pol.status() - - logger.info('Monitor Exitted') + logger.info('Monitor Exitting') #if not debug: # removepid("monitor") - os._exit(0) + + # Store state of emails + #pol.emailedStore("WRITE") + soltesz.dbDump("ad_dbTickets") + sys.exit(0) if __name__ == '__main__': try: @@ -231,4 +221,5 @@ if __name__ == '__main__': except KeyboardInterrupt: print "Killed. Exitting." logger.info('Monitor Killed') - os._exit(0) + #soltesz.dbDump("ad_dbTickets") + sys.exit(0)