3 # Copyright (c) 2004 The Trustees of Princeton University (Trustees).
5 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
6 # Stephen Soltesz <soltesz@cs.princeton.edu>
8 # $Id: diagnose.py,v 1.1 2007/08/08 13:36:46 soltesz Exp $
11 from threading import *
17 # Global config options
18 import parser as parsermodule
19 parser = parsermodule.getParser()
21 parser.set_defaults(nodelist=None,
28 parser.add_option("", "--nodelist", dest="nodelist", metavar="filename",
29 help="Read nodes to act on from specified file")
30 parser.add_option("", "--refresh", action="store_true", dest="refresh",
31 help="Refresh the cached values")
32 parser.add_option("", "--cachert", action="store_true", dest="cachert",
33 help="Cache the RT database query")
34 parser.add_option("", "--cachenodes", action="store_true", dest="cachenodes",
35 help="Cache node lookup from PLC")
36 parser.add_option("", "--ticketlist", dest="ticketlist",
37 help="Whitelist all RT tickets in this file")
38 parser.add_option("", "--blacklist", dest="blacklist",
39 help="Blacklist all nodes in this file")
41 config = parsermodule.parse_args(parser)
44 #from util.process import *
48 # Correlates input with policy to form actions
58 # Time to refresh DB and remove unused entries
60 # Time between policy enforce/update
61 #POLSLEEP=43200 #12hrs
64 # Global list of all running threads. Any threads added to
65 # list will be monitored.
67 # Seconds between checking threads
71 logger = logging.getLogger("monitor")
72 logger.setLevel(logging.DEBUG)
73 fh = logging.FileHandler(LOG, mode = 'a')
74 fh.setLevel(logging.DEBUG)
75 formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
76 fh.setFormatter(formatter)
81 Launches threads and adds them to the runningthreads global list.
82 Assigns name for thread, starts.
84 def startThread(fnct, name):
85 runningthreads[name] = fnct
86 runningthreads[name].setName(name)
88 logger.info("Starting thread " + name)
89 runningthreads[name].start()
90 except Exception, err:
91 logger.error("Thread: " + name + " " + error)
95 Watches threads and catches exceptions. Each launched thread is
96 watched and state is logged.
98 class ThreadWatcher(Thread):
100 Thread.__init__(self)
105 time.sleep(WATCHSLEEP)
107 def checkThreads(self):
108 # Iterate through treads, compare with last running.
109 for thread in runningthreads.keys():
110 # If thread found dead, remove from queue
111 #print "found %s" % thread
112 if not runningthreads[thread].isAlive():
113 logger.error("***********Thread died: %s**********" %(thread))
114 del runningthreads[thread]
115 return len(runningthreads.keys())
120 Thread.__init__(self)
125 def dict_from_nodelist(nl):
135 Start threads, do some housekeeping, then daemonize.
139 global status, logger
142 logger.info('Diagnose Started')
143 print 'Diagnose Started'
145 ########## VARIABLES ########################################
146 # Queue between Merge and RT
149 # Queue between RT and Diagnose
150 fromRT = Queue.Queue()
152 ######### GET NODES ########################################
153 logger.info('Get Nodes from PLC')
154 print "getnode from plc: %s %s %s" % (config.debug, config.cachenodes, config.refresh)
155 l_plcnodes = database.if_cached_else_refresh(config.cachenodes,
156 config.refresh, "l_plcnodes",
157 lambda : syncplcdb.create_plcdb() )
159 s_plcnodenames = Set([x['hostname'] for x in l_plcnodes])
161 # List of nodes from a user-provided file.
163 file = config.nodelist
164 nodelist = config.getListFromFile(file)
166 s_usernodes = Set(nodelist)
167 # SAFE nodes are in PLC and the list
168 s_safe_usernodes = s_plcnodenames & s_usernodes
169 # UNSAFE nodes are list but not in PLC. i.e. do not monitor.
170 s_unsafe_usernodes = s_usernodes - s_plcnodenames
171 if len(s_unsafe_usernodes) > 0 :
172 for node in s_unsafe_usernodes:
173 print "WARNING: User provided: %s but not found in PLC" % node
175 l_nodes = filter(lambda x: x['hostname'] in s_safe_usernodes,l_plcnodes)
179 print "len of l_nodes: %d" % len(l_nodes)
180 # Minus blacklisted ones..
181 l_blacklist = database.if_cached_else(1, "l_blacklist", lambda : [])
182 l_ticket_blacklist = database.if_cached_else(1,"l_ticket_blacklist",lambda : [])
183 l_nodes = filter(lambda x : not x['hostname'] in l_blacklist, l_nodes)
185 logger.info('Get Tickets from RT')
186 ####### RT tickets #########################################
187 t = moncommands.MyTimer()
188 ad_dbTickets = database.if_cached_else_refresh(config.cachert, config.refresh, "ad_dbTickets", rt.rt_tickets)
189 if ad_dbTickets == "":
190 print "ad_dbTickets failed..."
192 print "Getting tickets from RT took: %f sec" % t.diff() ; del t
194 logger.info('Start Merge/RT/Diagnose threads')
196 # only look at nodes in l_nodes
197 merge = policy.Merge( [node['hostname'] for node in l_nodes], toRT)
198 startThread(merge,"merge")
200 rt1 = rt.RT(ad_dbTickets, toRT, fromRT, l_ticket_blacklist)
201 startThread(rt1,"rt1")
203 diagnose = policy.Diagnose(fromRT)
204 startThread(diagnose,"diagnose")
209 if tw.checkThreads() == 0:
211 print "waiting... %s" % time.time()
212 time.sleep(WATCHSLEEP)
214 logger.info('Diagnose Exitting')
217 if __name__ == '__main__':
220 except KeyboardInterrupt:
221 print "Killed. Exitting."
222 logger.info('Diagnose Killed')