fix problem with spec file error. didn't reference an installed file in
[monitor.git] / monitor.py
index 2c7f3f1..48fa514 100644 (file)
+#!/usr/bin/python
 #
 # Copyright (c) 2004  The Trustees of Princeton University (Trustees).
 # 
-# Faiyaz Ahmed <faiyaza@cs.princeton.edu>
+# Stephen Soltesz <soltesz@cs.princeton.edu>
 #
-# $Id: $
+# $Id: monitor.py,v 1.7 2007/07/03 19:59:02 soltesz Exp $
 
-import sys
-import os
-import getopt 
-import thread
-from threading import *
-import time
-import logging
-import Queue
-# daemonize and *pid
-from util.process import * 
-
-# Comon DB
-import comon
-# RT tickets
+import database
+
+from monitor_policy import *
 import rt
-# Correlates input with policy to form actions
-import policy
-# Email
-import mailer
-import emailTxt
-# Defaults
-debug = False 
-
-# Log to what 
-LOG="./monitor.log"
-
-# Email defaults
-MTA="localhost"
-FROM="support@planet-lab.org"
-# API
-XMLRPC_SERVER = 'https://www.planet-lab.org/PLCAPI/'
-# Time between comon refresh
-COSLEEP=300 #5mins
-# Time to refresh DB and remove unused entries
-RTSLEEP=7200 #2hrs
-# Time between policy enforce/update
-#POLSLEEP=43200 #12hrs
-POLSLEEP=10
-
-# Global list of all running threads.  Any threads added to 
-# list will be monitored.
-runningthreads = {}
-# Seconds between checking threads
-WATCHSLEEP = 10
-# Set up Logging
-logger = logging.getLogger("monitor")
-logger.setLevel(logging.DEBUG)
-fh = logging.FileHandler(LOG, mode = 'a')
-fh.setLevel(logging.DEBUG)
-formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
-fh.setFormatter(formatter)
-logger.addHandler(fh)
-
-def usage():
-    print """
-Usage: %s [OPTIONS]...
-
-Options:
-        -d, --debug             Enable debugging (default: %s)
-        --status                Print memory usage statistics and exit
-        -h, --help              This message
-""".lstrip() % (sys.argv[0], debug)
-
-
-"""
-Launches threads and adds them to the runningthreads global list.
-Assigns name for thread, starts.
-"""
-def startThread(fnct, name):
-               runningthreads[name] = fnct
-               runningthreads[name].setName(name)
-               try:
-                       logger.info("Starting thread " + name)
-                       runningthreads[name].start()
-               except Exception, err:
-                       logger.error("Thread: " + name + " " + error)
-
-
-"""
-Watches threads and catches exceptions.  Each launched thread is
-watched and state is logged.
-"""
-class ThreadWatcher(Thread):
-       def __init__(self):
-               Thread.__init__(self)
-
-       def run(self):
-               while 1:
-                       self.checkThreads()
-                       time.sleep(WATCHSLEEP)
-
-       def checkThreads(self):
-               # Iterate through treads, compare with last running.
-               for thread in runningthreads.keys():
-                       # If thread found dead, remove from queue
-                       if not runningthreads[thread].isAlive():
-                               logger.error("Thread Died: %s" %(thread))
-                               del runningthreads[thread]
-
-
-class Dummy(Thread):
-       def __init__(self):
-                Thread.__init__(self)
-
-       def run(self):
-               time.sleep(5)
-
-
-"""
-Start threads, do some housekeeping, then daemonize.
-"""
-def main():
-       # Defaults
-       global debug, status, logger
-
-       try:
-               longopts = ["debug", "status", "help"]
-               (opts, argv) = getopt.getopt(sys.argv[1:], "dvf:s:ph", longopts)
-       except getopt.GetoptError, err:
-               print "Error: " + err.msg
-               usage()
-               sys.exit(1)
-
-       for (opt, optval) in opts:
-               if opt == "-d" or opt == "--debug":
-                       debug = True
-               elif opt == "--status":
-                       #print summary(names)
-                       sys.exit(0)
-               else:
-                       usage()
-                       sys.exit(0)
-
-       #if not debug:
-        #      daemonize()
-        #      writepid("monitor")
-
-       # Init stuff.  Watch Threads to see if they die.  Perhaps send email?
-       logger.info('Monitor Started')
-       startThread(ThreadWatcher(), "Watcher")
-       # The meat of it.
-
-       # Nodes to check
-        bucket = Queue.Queue()
-       # Comon DB of all nodes
-       cdb = {}
-       # Nodes that are down.  Use this to maintain DB;  cleanup.
-        alldown = Queue.Queue()
-       # RT DB
-        tickets = {}
-
-       # Get RT Tickets.
-       # Event based.  Add to queue(bucket) and hosts are queried.
-       rt1 = rt.RT(tickets, bucket)
-       rt2 = rt.RT(tickets, bucket)
-       rt3 = rt.RT(tickets, bucket)
-       rt4 = rt.RT(tickets, bucket)
-       rt5 = rt.RT(tickets, bucket)
-       # Kind of a hack. Cleans the DB for stale entries and updates db.
-       clean = Thread(target=rt5.cleanTickets)
-       # Poll Comon.  Refreshes Comon data every COSLEEP seconds
-       cm1 = comon.Comon(cdb, bucket)
-
-       # Start Threads
-       startThread(rt1,"rt1")
-       startThread(rt2,"rt2")
-       startThread(rt3,"rt3")
-       startThread(rt4,"rt4")
-       startThread(rt5,"rt5")
-       startThread(clean,"cleanrt5")
-       startThread(cm1,"rt5")
-       time.sleep(10)
-
-       # Actually digest the info and do something with it.
-       pol = policy.Policy(cm1, tickets)
-
-       while bucket.empty() == False:
-               time.sleep(3)
-
-       startThread(pol, "policy")
-       time.sleep(3600)        
+import sys
+
+import plc
+api = plc.getAuthAPI()
+
+from clean_policy import *
+
+def reboot(hostname):
+       print "calling reboot!!! %s " % hostname
+
+       l_nodes = api.GetNodes(hostname)
+       if len(l_nodes) == 0:
+               raise Exception("No such host: %s" % hostname)
+       
+       l_blacklist = database.if_cached_else(1, "l_blacklist", lambda : [])
+       l_ticket_blacklist = database.if_cached_else(1,"l_ticket_blacklist",lambda : [])
+
+       l_nodes  = filter(lambda x : not x['hostname'] in l_blacklist, l_nodes)
+       if len(l_nodes) == 0:
+               raise Exception("Host removed via blacklist: %s" % hostname)
+
+       ad_dbTickets = database.if_cached_else_refresh(True, False, "ad_dbTickets", lambda : [])
+       if ad_dbTickets == None:
+               raise Exception("Could not find cached dbTickets")
+
+       print "starting new thing"
+       mon = MonitorMergeDiagnoseSendEscellate(hostname, True)
+       mon.run()
+
+       #print "merge"
+       #merge = Merge( [node['hostname'] for node in l_nodes])
+       #record_list = merge.run()
+       ##print "rt"
+       #rt = RT(record_list, ad_dbTickets, l_ticket_blacklist)
+       #record_list = rt.run()
+       ##print "diagnose"
+       #diag = Diagnose(record_list)
+       #diagnose_out = diag.run()
+       #print diagnose_out
+       #print "action"
+       #action = Action(diagnose_out)
+       #action.run()
+
+       return True
+
+def reboot2(hostname):
+       l_nodes = api.GetNodes(hostname)
+       if len(l_nodes) == 0:
+               raise Exception("No such host: %s" % hostname)
        
-       #print runningthreads["RT"].ssh 
+       l_blacklist = database.if_cached_else(1, "l_blacklist", lambda : [])
+       l_ticket_blacklist = database.if_cached_else(1,"l_ticket_blacklist",lambda : [])
 
-       logger.info('Monitor Exitted')
-       #if not debug:
-       #       removepid("monitor")
-       os._exit(0)
+       l_nodes  = filter(lambda x : not x['hostname'] in l_blacklist, l_nodes)
+       if len(l_nodes) == 0:
+               raise Exception("Host removed via blacklist: %s" % hostname)
+
+       ad_dbTickets = database.if_cached_else_refresh(True, False, "ad_dbTickets", lambda : None)
+       if ad_dbTickets == None:
+               raise Exception("Could not find cached dbTickets")
+
+
+       args = {}
+       args['hostname'] = "%s" % hostname
+       args['hostname_list'] = "%s" % hostname
+       args['loginbase'] = plc.siteId(hostname)
+
+       m = PersistMessage(hostname, "Please Update Boot Image for %s" % hostname,
+                                                       mailtxt.newalphacd_one[1] % args, True, db='bootcd_persistmessages')
        
+       #print "merge"
+       merge = Merge( [node['hostname'] for node in l_nodes])
+       record_list = merge.run()
+       #print "rt"
+       rt = RT(record_list, ad_dbTickets, l_ticket_blacklist)
+       record_list = rt.run()
+       #print "diagnose"
+       diag = Diagnose(record_list)
+       diagnose_out = diag.run()
+       #print diagnose_out
+       #print "action"
+       action = Action(diagnose_out)
+       action.run()
+
+       return True
+
+
+def main():
+       for host in sys.argv[1:]:
+               reboot(host)
+
 if __name__ == '__main__':
-       try:
-               main()
-       except KeyboardInterrupt:
-               print "Killed.  Exitting."
-               logger.info('Monitor Killed')
-               os._exit(0)
+       print "calling main"
+       main()