+#!/usr/bin/python
#
# Copyright (c) 2004 The Trustees of Princeton University (Trustees).
#
# Faiyaz Ahmed <faiyaza@cs.princeton.edu>
+# Stephen Soltesz <soltesz@cs.princeton.edu>
#
-# $Id: $
+# $Id: monitor.py,v 1.7 2007/07/03 19:59:02 soltesz Exp $
import sys
import os
import time
import logging
import Queue
+from sets import Set
+# Global config options
+from config import config
# daemonize and *pid
from util.process import *
import rt
# Correlates input with policy to form actions
import policy
-# Email
-import mailer
-import emailTxt
-# Defaults
-debug = False
+import soltesz
+import plc
# Log to what
LOG="./monitor.log"
-# Email defaults
-MTA="localhost"
-FROM="support@planet-lab.org"
-# API
-XMLRPC_SERVER = 'https://www.planet-lab.org/PLCAPI/'
-# Time between comon refresh
-COSLEEP=300 #5mins
# Time to refresh DB and remove unused entries
RTSLEEP=7200 #2hrs
# Time between policy enforce/update
fh.setFormatter(formatter)
logger.addHandler(fh)
-def usage():
- print """
-Usage: %s [OPTIONS]...
-
-Options:
- -d, --debug Enable debugging (default: %s)
- --status Print memory usage statistics and exit
- -h, --help This message
-""".lstrip() % (sys.argv[0], debug)
-
"""
Launches threads and adds them to the runningthreads global list.
# Iterate through treads, compare with last running.
for thread in runningthreads.keys():
# If thread found dead, remove from queue
+ #print "found %s" % thread
if not runningthreads[thread].isAlive():
- logger.error("Thread Died: %s" %(thread))
+ logger.error("***********Thread died: %s**********" %(thread))
del runningthreads[thread]
+ return len(runningthreads.keys())
class Dummy(Thread):
def run(self):
time.sleep(5)
+def dict_from_nodelist(nl):
+ d = {}
+ for host in nl:
+ h = host['hostname']
+ d[h] = host
+ return d
"""
Start threads, do some housekeeping, then daemonize.
"""
def main():
# Defaults
- global debug, status, logger
-
- try:
- longopts = ["debug", "status", "help"]
- (opts, argv) = getopt.getopt(sys.argv[1:], "dvf:s:ph", longopts)
- except getopt.GetoptError, err:
- print "Error: " + err.msg
- usage()
- sys.exit(1)
-
- for (opt, optval) in opts:
- if opt == "-d" or opt == "--debug":
- debug = True
- elif opt == "--status":
- #print summary(names)
- sys.exit(0)
- else:
- usage()
- sys.exit(0)
+ global status, logger
+ global config
#if not debug:
# daemonize()
# writepid("monitor")
- # Init stuff. Watch Threads to see if they die. Perhaps send email?
- logger.info('Monitor Started')
- startThread(ThreadWatcher(), "Watcher")
- # The meat of it.
+ config = config()
+ #config.parse_args()
- # Nodes to check
- bucket = Queue.Queue()
+ logger.info('Monitor Started')
+ ########## VARIABLES ########################################
+ # Nodes to check. Queue of all sick nodes.
+ toCheck = Queue.Queue()
+ # Nodes that are sick w/o tickets
+ sickNoTicket = Queue.Queue()
# Comon DB of all nodes
cdb = {}
- # Nodes that are down. Use this to maintain DB; cleanup.
- alldown = Queue.Queue()
# RT DB
- tickets = {}
-
- # Get RT Tickets.
- # Event based. Add to queue(bucket) and hosts are queried.
- rt1 = rt.RT(tickets, bucket)
- rt2 = rt.RT(tickets, bucket)
- rt3 = rt.RT(tickets, bucket)
- rt4 = rt.RT(tickets, bucket)
- rt5 = rt.RT(tickets, bucket)
- # Kind of a hack. Cleans the DB for stale entries and updates db.
- clean = Thread(target=rt5.cleanTickets)
- # Poll Comon. Refreshes Comon data every COSLEEP seconds
- cm1 = comon.Comon(cdb, bucket)
-
- # Start Threads
+ tickets = {}
+ # Nodes we've emailed.
+ # host - > (type of email, time)
+ emailed = {}
+
+ ######### GET NODES ########################################
+ # TODO: get authoritative node list from PLC every PLCSLEEP seconds,
+ # feed this into Comon.
+ l_plcnodes = soltesz.if_cached_else(config.cachenodes,
+ "l_plcnodes",
+ lambda : plc.getNodes({'peer_id':None}))
+
+ s_plcnodes = Set([x['hostname'] for x in l_plcnodes])
+
+ # List of nodes from a user-provided file.
+ if config.nodelist:
+ file = config.nodelist
+ nodelist = config.getListFromFile(file)
+ l_nodelist = []
+ print "Getting node info for hosts in: %s" % file
+ for nodename in nodelist:
+ if config.debug: print ".", ; sys.stdout.flush()
+ l_nodelist += plc.getNodes({'hostname': nodename, 'peer_id':None})
+ if config.debug: print ""
+
+ s_usernodes = Set(nodelist)
+ # nodes from PLC and in the user list.
+ s_safe_usernodes = s_plcnodes & s_usernodes
+ s_unsafe_usernodes = s_usernodes - s_plcnodes
+ if len(s_unsafe_usernodes) > 0 :
+ for node in s_unsafe_usernodes:
+ print "WARNING: User provided: %s but not found in PLC" % node
+
+ l_nodes = filter(lambda x: x['hostname'] in s_safe_usernodes,l_plcnodes)
+ else:
+ l_nodes = l_plcnodes
+
+ # Minus blacklisted ones..
+ l_blacklist = soltesz.if_cached_else(1, "l_blacklist", lambda : [])
+ l_ticket_blacklist = soltesz.if_cached_else(1,"l_ticket_blacklist",lambda : [])
+ l_wl_nodes = filter(lambda x : not x['hostname'] in l_blacklist, l_nodes)
+ # A handy dict of hostname-to-nodestruct mapping
+ d_allplc_nodes = dict_from_nodelist(l_wl_nodes)
+
+ ####### RT tickets #########################################
+ t = soltesz.MyTimer()
+ ad_dbTickets = soltesz.if_cached_else(config.cachert, "ad_dbTickets", rt.rt_tickets)
+ print "Getting tickets from RT took: %f sec" % t.diff() ; del t
+
+ # TODO: get input nodes from findbad database, pipe into toCheck
+ cm1 = read_findbad_db(d_allplc_nodes, toCheck)
+
+ # Search for toCheck nodes in the RT db.
+ rt1 = rt.RT(ad_dbTickets, tickets, toCheck, sickNoTicket, l_ticket_blacklist)
+ # Kind of a hack. Cleans the DB for stale entries and updates db.
+ # (UNTESTED)
+ # rt5 = rt.RT(ad_dbTickets, tickets, toCheck, sickNoTicket)
+ # clean = Thread(target=rt5.cleanTickets)
+
startThread(rt1,"rt1")
- startThread(rt2,"rt2")
- startThread(rt3,"rt3")
- startThread(rt4,"rt4")
- startThread(rt5,"rt5")
- startThread(clean,"cleanrt5")
- startThread(cm1,"rt5")
- time.sleep(10)
+ # startThread(rt5,"rt5")
+ # startThread(clean,"cleanrt5")
# Actually digest the info and do something with it.
- pol = policy.Policy(cm1, tickets)
+ pol = policy.Policy(cm1, sickNoTicket, emailed)
+ # Start Sending Emails
+ startThread(pol, "policy")
- while bucket.empty() == False:
- time.sleep(3)
- startThread(pol, "policy")
- time.sleep(3600)
-
- #print runningthreads["RT"].ssh
+ tw = ThreadWatcher()
+ while True:
+ if tw.checkThreads() == 0:
+ break
+ time.sleep(WATCHSLEEP)
- logger.info('Monitor Exitted')
+ logger.info('Monitor Exitting')
#if not debug:
# removepid("monitor")
- os._exit(0)
+
+ # Store state of emails
+ #pol.emailedStore("WRITE")
+ soltesz.dbDump("ad_dbTickets")
+ sys.exit(0)
if __name__ == '__main__':
try:
except KeyboardInterrupt:
print "Killed. Exitting."
logger.info('Monitor Killed')
- os._exit(0)
+ #soltesz.dbDump("ad_dbTickets")
+ sys.exit(0)