+ some cleanup. some dirtying.
[monitor.git] / monitor.py
index 4b80d9a..3af44ee 100644 (file)
@@ -3,8 +3,9 @@
 # Copyright (c) 2004  The Trustees of Princeton University (Trustees).
 # 
 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
+# Stephen Soltesz <soltesz@cs.princeton.edu>
 #
-# $Id: $
+# $Id: monitor.py,v 1.6 2007/06/29 12:42:22 soltesz Exp $
 
 import sys
 import os
@@ -14,6 +15,9 @@ from threading import *
 import time
 import logging
 import Queue
+# Global config options
+from config import config
+config = config()
 # daemonize and *pid
 from util.process import * 
 
@@ -23,11 +27,8 @@ import comon
 import rt
 # Correlates input with policy to form actions
 import policy
-# Email
-import mailer
-import emailTxt
-# Defaults
-debug = False 
+import soltesz
+import plc
 
 # Log to what 
 LOG="./monitor.log"
@@ -67,16 +68,6 @@ formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
 fh.setFormatter(formatter)
 logger.addHandler(fh)
 
-def usage():
-    print """
-Usage: %s [OPTIONS]...
-
-Options:
-        -d, --debug             Enable debugging (default: %s)
-        --status                Print memory usage statistics and exit
-        -h, --help              This message
-""".lstrip() % (sys.argv[0], debug)
-
 
 """
 Launches threads and adds them to the runningthreads global list.
@@ -109,9 +100,11 @@ class ThreadWatcher(Thread):
                # Iterate through treads, compare with last running.
                for thread in runningthreads.keys():
                        # If thread found dead, remove from queue
+                       #print "found %s" % thread
                        if not runningthreads[thread].isAlive():
-                               logger.error("Thread Died: %s" %(thread))
+                               logger.error("***********Thread died: %s**********" %(thread))
                                del runningthreads[thread]
+               return len(runningthreads.keys())
 
 
 class Dummy(Thread):
@@ -121,109 +114,128 @@ class Dummy(Thread):
        def run(self):
                time.sleep(5)
 
+def preComon(l_nodes, toCheck):
+       for host in l_nodes:
+               diag_node = {}
+               diag_node['nodename'] = host
+               diag_node['message'] = None
+               diag_node['bucket'] = ["dbg"]
+               diag_node['stage'] = ""
+               diag_node['args'] = None
+               diag_node['info'] = None
+               diag_node['time'] = time.time()
+               toCheck.put(diag_node)
+       return 
+
+def dict_from_nodelist(nl):
+       d = {}
+       for host in nl:
+               h = host['hostname']
+               d[h] = host
+       return d
 
 """
 Start threads, do some housekeeping, then daemonize.
 """
 def main():
        # Defaults
-       global debug, status, logger
-
-       try:
-               longopts = ["debug", "status", "help"]
-               (opts, argv) = getopt.getopt(sys.argv[1:], "dvf:s:ph", longopts)
-       except getopt.GetoptError, err:
-               print "Error: " + err.msg
-               usage()
-               sys.exit(1)
-
-       for (opt, optval) in opts:
-               if opt == "-d" or opt == "--debug":
-                       debug = True
-               elif opt == "--status":
-                       #print summary(names)
-                       sys.exit(0)
-               else:
-                       usage()
-                       sys.exit(0)
+       global status, logger
 
        #if not debug:
         #      daemonize()
         #      writepid("monitor")
 
-       # Init stuff.  Watch Threads to see if they die.  Perhaps send email?
        logger.info('Monitor Started')
-       startThread(ThreadWatcher(), "Watcher")
-       # The meat of it.
 
+       ##########  VARIABLES   ########################################
        # Nodes to check. Queue of all sick nodes.
-        toCheck = Queue.Queue()
+       toCheck = Queue.Queue()
        # Nodes that are sick w/o tickets
        sickNoTicket = Queue.Queue()
        # Comon DB of all nodes
        cdb = {}
-       # Nodes that are down.  Use this to maintain DB;  cleanup.
-        #alldown = Queue.Queue()
        # RT DB
-        tickets = {}
+       tickets = {}
        # Nodes we've emailed.
        # host - > (type of email, time)
        emailed = {}
 
+       #########  GET NODES    ########################################
+       # TODO: get authoritative node list from PLC every PLCSLEEP seconds,
+       #               feed this into Comon.
+
+       # List of nodes from a user-provided file.
+       if config.userlist:
+               file = config.userlist
+               nodelist = config.getListFromFile(file)
+               l_nodes = []
+               print "Getting node info for hosts in: %s" % file
+               for nodename in nodelist:
+                       if config.debug: print ".", ; sys.stdout.flush()
+                       l_nodes += plc.getNodes({'hostname': nodename})
+               print ""
+       else:
+               # Authoritative list of nodes from PLC
+               l_nodes = soltesz.if_cached_else(config.cachenodes, "l_nodes", plc.getNodes)
+
+       # Minus blacklisted ones..
+       l_blacklist = soltesz.if_cached_else(1, "l_blacklist", lambda : [])
+       l_ticket_blacklist = soltesz.if_cached_else(1,"l_ticket_blacklist",lambda : [])
+       l_wl_nodes  = filter(lambda x : not x['hostname'] in l_blacklist, l_nodes)
+       # A handy dict of hostname-to-nodestruct mapping
+       d_allplc_nodes = dict_from_nodelist(l_wl_nodes)
+
+       #######  RT tickets    #########################################
+       t = soltesz.MyTimer()
+       ad_dbTickets = soltesz.if_cached_else(config.cachert, "ad_dbTickets", rt.rt_tickets)
+       print "Getting tickets from RT took: %f sec" % t.diff() ; del t
+
+       if os.path.isfile("precomon.txt"): 
+               nodelist = config.getListFromFile("precomon.txt")
+               print "PreComon node info"
+               preComon(nodelist, toCheck)
+               for nodename in nodelist:
+                       # TODO: temporary hack.
+                       if nodename not in d_allplc_nodes:
+                               d_allplc_nodes[nodename] = {}
+
+       # TODO: Refreshes Comon data every COSLEEP seconds
+       cm1 = comon.Comon(cdb, d_allplc_nodes, toCheck)
+       startThread(cm1,"comon")
 
-       # Get RT Tickets.
-       # Event based.  Add to queue(toCheck) and hosts are queried.
-       rt1 = rt.RT(tickets, toCheck, sickNoTicket)
-       rt2 = rt.RT(tickets, toCheck, sickNoTicket)
-       rt3 = rt.RT(tickets, toCheck, sickNoTicket)
-       rt4 = rt.RT(tickets, toCheck, sickNoTicket)
-       rt5 = rt.RT(tickets, toCheck, sickNoTicket)
-       # Kind of a hack. Cleans the DB for stale entries and updates db.
-       clean = Thread(target=rt5.cleanTickets)
-       # Poll Comon.  Refreshes Comon data every COSLEEP seconds
-       cm1 = comon.Comon(cdb, toCheck)
-
-       # Actually digest the info and do something with it.
-       pol = policy.Policy(cm1, sickNoTicket, emailed)
-
-       # Load emailed sites from last run.
-       pol.emailedStore("LOAD")
+       # TODO: make queues event based, not node based. 
+       # From the RT db, add hosts to q(toCheck) for filtering the comon nodes.
+       rt1 = rt.RT(ad_dbTickets, tickets, toCheck, sickNoTicket, l_ticket_blacklist)
+       #       Kind of a hack. Cleans the DB for stale entries and updates db.
+       #   (UNTESTED)
+       #       rt5 = rt.RT(ad_dbTickets, tickets, toCheck, sickNoTicket)
+       #       clean = Thread(target=rt5.cleanTickets)
 
-       # Start Threads
        startThread(rt1,"rt1")
-       startThread(rt2,"rt2")
-       startThread(rt3,"rt3")
-       startThread(rt4,"rt4")
-       startThread(rt5,"rt5")
-       startThread(clean,"cleanrt5")
-
-       # Start Comon Thread    
-       startThread(cm1,"comon")
-
-       # Wait for threads to init.  Probably should join, but work on that later.
-       time.sleep(10)
+       #       startThread(rt5,"rt5")
+       #       startThread(clean,"cleanrt5")
 
+       # Actually digest the info and do something with it.
+       pol = policy.Policy(cm1, sickNoTicket, emailed)
        # Start Sending Emails
        startThread(pol, "policy")
 
-       # Wait to finish
-       while (sickNoTicket.empty() == False) or (toCheck.empty() == False):
-               time.sleep(15)
-
-
 
-       pol.status()
+       tw = ThreadWatcher()
+       while True:
+               if tw.checkThreads() == 0:
+                       break
+               time.sleep(WATCHSLEEP)
 
-       # Store state of emails
-       pol.emailedStore("WRITE")
-
-       # Email what we did.
-       pol.status()
-
-       logger.info('Monitor Exitted')
+       logger.info('Monitor Exitting')
        #if not debug:
        #       removepid("monitor")
-       os._exit(0)
+
+       # Store state of emails
+       #pol.emailedStore("WRITE")
+       soltesz.dbDump("l_blacklist")
+       soltesz.dbDump("ad_dbTickets")
+       sys.exit(0)
        
 if __name__ == '__main__':
        try:
@@ -231,4 +243,6 @@ if __name__ == '__main__':
        except KeyboardInterrupt:
                print "Killed.  Exitting."
                logger.info('Monitor Killed')
-               os._exit(0)
+               #soltesz.dbDump("l_blacklist")
+               #soltesz.dbDump("ad_dbTickets")
+               sys.exit(0)