X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=action.py;fp=action.py;h=269007e6273c8df813998def1b33d2ed1b250389;hb=8217a88623189c3034d38fdc4eae8fe575ae8624;hp=0000000000000000000000000000000000000000;hpb=b9334d048a326519351238d197ca44ecb6a96dfe;p=monitor.git diff --git a/action.py b/action.py new file mode 100755 index 0000000..269007e --- /dev/null +++ b/action.py @@ -0,0 +1,203 @@ +#!/usr/bin/python +# +# Copyright (c) 2004 The Trustees of Princeton University (Trustees). +# +# Faiyaz Ahmed +# Stephen Soltesz +# +# $Id$ + +import sys +from threading import * +import time +import logging +import Queue +from sets import Set + +# Global config options +from config import config +from optparse import OptionParser +parser = OptionParser() + +parser.set_defaults(nodelist=None, + cachert=False, + cachenodes=False, + blacklist=None, + ticketlist=None) + +parser.add_option("", "--nodelist", dest="nodelist", + help="Read nodes to act on from specified file") +parser.add_option("", "--cachert", action="store_true", + help="Cache the RT database query") +parser.add_option("", "--cachenodes", action="store_true", + help="Cache node lookup from PLC") +parser.add_option("", "--ticketlist", dest="ticketlist", + help="Whitelist all RT tickets in this file") +parser.add_option("", "--blacklist", dest="blacklist", + help="Blacklist all nodes in this file") + +config = config(parser) +config.parse_args() + +# daemonize and *pid +#from util.process import * + +# RT tickets +import rt +# Correlates input with policy to form actions +import policy +import soltesz +import plc + +# Log to what +LOG="./monitor.log" + +# Time to refresh DB and remove unused entries +RTSLEEP=7200 #2hrs +# Time between policy enforce/update +#POLSLEEP=43200 #12hrs +POLSLEEP=10 + +# Global list of all running threads. Any threads added to +# list will be monitored. +runningthreads = {} +# Seconds between checking threads +WATCHSLEEP = 10 + +# Set up Logging +logger = logging.getLogger("monitor") +logger.setLevel(logging.DEBUG) +fh = logging.FileHandler(LOG, mode = 'a') +fh.setLevel(logging.DEBUG) +formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') +fh.setFormatter(formatter) +logger.addHandler(fh) + + +""" +Launches threads and adds them to the runningthreads global list. +Assigns name for thread, starts. +""" +def startThread(fnct, name): + runningthreads[name] = fnct + runningthreads[name].setName(name) + try: + logger.info("Starting thread " + name) + runningthreads[name].start() + except Exception, err: + logger.error("Thread: " + name + " " + error) + + +""" +Watches threads and catches exceptions. Each launched thread is +watched and state is logged. +""" +class ThreadWatcher(Thread): + def __init__(self): + Thread.__init__(self) + + def run(self): + while 1: + self.checkThreads() + time.sleep(WATCHSLEEP) + + def checkThreads(self): + # Iterate through treads, compare with last running. + for thread in runningthreads.keys(): + # If thread found dead, remove from queue + #print "found %s" % thread + if not runningthreads[thread].isAlive(): + logger.error("***********Thread died: %s**********" %(thread)) + del runningthreads[thread] + return len(runningthreads.keys()) + + +class Dummy(Thread): + def __init__(self): + Thread.__init__(self) + + def run(self): + time.sleep(5) + +def dict_from_nodelist(nl): + d = {} + for host in nl: + h = host['hostname'] + d[h] = host + return d + +""" +Start threads, do some housekeeping, then daemonize. +""" +def main(): + # Defaults + global status, logger + global config + + logger.info('Action Started') + print 'Action Started' + + ######### GET NODES ######################################## + logger.info('Get Nodes from PLC') + print "getnode from plc" + l_plcnodes = soltesz.if_cached_else(True, + "l_plcnodes", + lambda : plc.getNodes({'peer_id':None})) + + s_plcnodenames = Set([x['hostname'] for x in l_plcnodes]) + + # List of nodes from a user-provided file. + if config.nodelist: + file = config.nodelist + nodelist = config.getListFromFile(file) + #for node in nodelist: + # print "%s" % node + + s_usernodes = Set(nodelist) + # SAFE nodes are in PLC and the list + s_safe_usernodes = s_plcnodenames & s_usernodes + # UNSAFE nodes are in list but not in PLC. i.e. ignore them. + s_unsafe_usernodes = s_usernodes - s_plcnodenames + if len(s_unsafe_usernodes) > 0 : + for node in s_unsafe_usernodes: + print "WARNING: User provided: %s but not found in PLC" % node + + l_nodes = filter(lambda x: x['hostname'] in s_safe_usernodes,l_plcnodes) + else: + l_nodes = l_plcnodes + + print "len of l_nodes: %d" % len(l_nodes) + # Minus blacklisted ones.. + l_ticket_blacklist = soltesz.if_cached_else(1,"l_ticket_blacklist",lambda : []) + + l_blacklist = soltesz.if_cached_else(1, "l_blacklist", lambda : []) + l_nodes = filter(lambda x : not x['hostname'] in l_blacklist, l_nodes) + + ####### Get RT tickets ######################################### + #logger.info('Get Tickets from RT') + #t = soltesz.MyTimer() + #ad_dbTickets = soltesz.if_cached_else(config.cachert, "ad_dbTickets", rt.rt_tickets) + #print "Getting tickets from RT took: %f sec" % t.diff() ; del t + + logger.info('Start Action thread') + ####### Action + action = policy.Action( [node['hostname'] for node in l_nodes] ) + startThread(action,"action") + + + tw = ThreadWatcher() + while True: + if tw.checkThreads() == 0: + break + time.sleep(WATCHSLEEP) + + logger.info('Action Exitting') + sys.exit(0) + +if __name__ == '__main__': + try: + main() + except KeyboardInterrupt: + print "Killed. Exitting." + logger.info('Action Killed') + sys.exit(0)