Adding subdirectories for remote commands to control ILO and DRAC cards over
[monitor.git] / monitor.py
index 2c7f3f1..ddc3722 100644 (file)
@@ -1,9 +1,11 @@
+#!/usr/bin/python
 #
 # Copyright (c) 2004  The Trustees of Princeton University (Trustees).
 # 
 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
+# Stephen Soltesz <soltesz@cs.princeton.edu>
 #
-# $Id: $
+# $Id: monitor.py,v 1.7 2007/07/03 19:59:02 soltesz Exp $
 
 import sys
 import os
@@ -13,6 +15,9 @@ from threading import *
 import time
 import logging
 import Queue
+from sets import Set
+# Global config options
+from config import config
 # daemonize and *pid
 from util.process import * 
 
@@ -22,22 +27,12 @@ import comon
 import rt
 # Correlates input with policy to form actions
 import policy
-# Email
-import mailer
-import emailTxt
-# Defaults
-debug = False 
+import soltesz
+import plc
 
 # Log to what 
 LOG="./monitor.log"
 
-# Email defaults
-MTA="localhost"
-FROM="support@planet-lab.org"
-# API
-XMLRPC_SERVER = 'https://www.planet-lab.org/PLCAPI/'
-# Time between comon refresh
-COSLEEP=300 #5mins
 # Time to refresh DB and remove unused entries
 RTSLEEP=7200 #2hrs
 # Time between policy enforce/update
@@ -59,16 +54,6 @@ formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
 fh.setFormatter(formatter)
 logger.addHandler(fh)
 
-def usage():
-    print """
-Usage: %s [OPTIONS]...
-
-Options:
-        -d, --debug             Enable debugging (default: %s)
-        --status                Print memory usage statistics and exit
-        -h, --help              This message
-""".lstrip() % (sys.argv[0], debug)
-
 
 """
 Launches threads and adds them to the runningthreads global list.
@@ -101,9 +86,11 @@ class ThreadWatcher(Thread):
                # Iterate through treads, compare with last running.
                for thread in runningthreads.keys():
                        # If thread found dead, remove from queue
+                       #print "found %s" % thread
                        if not runningthreads[thread].isAlive():
-                               logger.error("Thread Died: %s" %(thread))
+                               logger.error("***********Thread died: %s**********" %(thread))
                                del runningthreads[thread]
+               return len(runningthreads.keys())
 
 
 class Dummy(Thread):
@@ -113,87 +100,120 @@ class Dummy(Thread):
        def run(self):
                time.sleep(5)
 
+def dict_from_nodelist(nl):
+       d = {}
+       for host in nl:
+               h = host['hostname']
+               d[h] = host
+       return d
 
 """
 Start threads, do some housekeeping, then daemonize.
 """
 def main():
        # Defaults
-       global debug, status, logger
-
-       try:
-               longopts = ["debug", "status", "help"]
-               (opts, argv) = getopt.getopt(sys.argv[1:], "dvf:s:ph", longopts)
-       except getopt.GetoptError, err:
-               print "Error: " + err.msg
-               usage()
-               sys.exit(1)
-
-       for (opt, optval) in opts:
-               if opt == "-d" or opt == "--debug":
-                       debug = True
-               elif opt == "--status":
-                       #print summary(names)
-                       sys.exit(0)
-               else:
-                       usage()
-                       sys.exit(0)
+       global status, logger
+       global config
 
        #if not debug:
         #      daemonize()
         #      writepid("monitor")
 
-       # Init stuff.  Watch Threads to see if they die.  Perhaps send email?
-       logger.info('Monitor Started')
-       startThread(ThreadWatcher(), "Watcher")
-       # The meat of it.
+       config = config()
+       #config.parse_args()
 
-       # Nodes to check
-        bucket = Queue.Queue()
+       logger.info('Monitor Started')
+       ##########  VARIABLES   ########################################
+       # Nodes to check. Queue of all sick nodes.
+       toCheck = Queue.Queue()
+       # Nodes that are sick w/o tickets
+       sickNoTicket = Queue.Queue()
        # Comon DB of all nodes
        cdb = {}
-       # Nodes that are down.  Use this to maintain DB;  cleanup.
-        alldown = Queue.Queue()
        # RT DB
-        tickets = {}
-
-       # Get RT Tickets.
-       # Event based.  Add to queue(bucket) and hosts are queried.
-       rt1 = rt.RT(tickets, bucket)
-       rt2 = rt.RT(tickets, bucket)
-       rt3 = rt.RT(tickets, bucket)
-       rt4 = rt.RT(tickets, bucket)
-       rt5 = rt.RT(tickets, bucket)
-       # Kind of a hack. Cleans the DB for stale entries and updates db.
-       clean = Thread(target=rt5.cleanTickets)
-       # Poll Comon.  Refreshes Comon data every COSLEEP seconds
-       cm1 = comon.Comon(cdb, bucket)
-
-       # Start Threads
+       tickets = {}
+       # Nodes we've emailed.
+       # host - > (type of email, time)
+       emailed = {}
+
+       #########  GET NODES    ########################################
+       # TODO: get authoritative node list from PLC every PLCSLEEP seconds,
+       #               feed this into Comon.
+       l_plcnodes = soltesz.if_cached_else(config.cachenodes, 
+                                                               "l_plcnodes", 
+                                                               lambda : plc.getNodes({'peer_id':None}))
+
+       s_plcnodes = Set([x['hostname'] for x in l_plcnodes])
+
+       # List of nodes from a user-provided file.
+       if config.nodelist:
+               file = config.nodelist
+               nodelist = config.getListFromFile(file)
+               l_nodelist = []
+               print "Getting node info for hosts in: %s" % file
+               for nodename in nodelist:
+                       if config.debug: print ".", ; sys.stdout.flush()
+                       l_nodelist += plc.getNodes({'hostname': nodename, 'peer_id':None})
+               if config.debug: print ""
+       
+               s_usernodes = Set(nodelist)
+               # nodes from PLC and in the user list.
+               s_safe_usernodes   = s_plcnodes & s_usernodes
+               s_unsafe_usernodes = s_usernodes - s_plcnodes
+               if len(s_unsafe_usernodes) > 0 :
+                       for node in s_unsafe_usernodes:
+                               print "WARNING: User provided: %s but not found in PLC" % node
+
+               l_nodes = filter(lambda x: x['hostname'] in s_safe_usernodes,l_plcnodes)
+       else:
+               l_nodes = l_plcnodes
+
+       # Minus blacklisted ones..
+       l_blacklist = soltesz.if_cached_else(1, "l_blacklist", lambda : [])
+       l_ticket_blacklist = soltesz.if_cached_else(1,"l_ticket_blacklist",lambda : [])
+       l_wl_nodes  = filter(lambda x : not x['hostname'] in l_blacklist, l_nodes)
+       # A handy dict of hostname-to-nodestruct mapping
+       d_allplc_nodes = dict_from_nodelist(l_wl_nodes)
+
+       #######  RT tickets    #########################################
+       t = soltesz.MyTimer()
+       ad_dbTickets = soltesz.if_cached_else(config.cachert, "ad_dbTickets", rt.rt_tickets)
+       print "Getting tickets from RT took: %f sec" % t.diff() ; del t
+
+       # TODO: get input nodes from findbad database, pipe into toCheck
+       cm1 = read_findbad_db(d_allplc_nodes, toCheck)
+
+       # Search for toCheck nodes in the RT db.
+       rt1 = rt.RT(ad_dbTickets, tickets, toCheck, sickNoTicket, l_ticket_blacklist)
+       #       Kind of a hack. Cleans the DB for stale entries and updates db.
+       #   (UNTESTED)
+       #       rt5 = rt.RT(ad_dbTickets, tickets, toCheck, sickNoTicket)
+       #       clean = Thread(target=rt5.cleanTickets)
+
        startThread(rt1,"rt1")
-       startThread(rt2,"rt2")
-       startThread(rt3,"rt3")
-       startThread(rt4,"rt4")
-       startThread(rt5,"rt5")
-       startThread(clean,"cleanrt5")
-       startThread(cm1,"rt5")
-       time.sleep(10)
+       #       startThread(rt5,"rt5")
+       #       startThread(clean,"cleanrt5")
 
        # Actually digest the info and do something with it.
-       pol = policy.Policy(cm1, tickets)
+       pol = policy.Policy(cm1, sickNoTicket, emailed)
+       # Start Sending Emails
+       startThread(pol, "policy")
 
-       while bucket.empty() == False:
-               time.sleep(3)
 
-       startThread(pol, "policy")
-       time.sleep(3600)        
-       
-       #print runningthreads["RT"].ssh 
+       tw = ThreadWatcher()
+       while True:
+               if tw.checkThreads() == 0:
+                       break
+               time.sleep(WATCHSLEEP)
 
-       logger.info('Monitor Exitted')
+       logger.info('Monitor Exitting')
        #if not debug:
        #       removepid("monitor")
-       os._exit(0)
+
+       # Store state of emails
+       #pol.emailedStore("WRITE")
+       soltesz.dbDump("ad_dbTickets")
+       sys.exit(0)
        
 if __name__ == '__main__':
        try:
@@ -201,4 +221,5 @@ if __name__ == '__main__':
        except KeyboardInterrupt:
                print "Killed.  Exitting."
                logger.info('Monitor Killed')
-               os._exit(0)
+               #soltesz.dbDump("ad_dbTickets")
+               sys.exit(0)