mass commit. updates for the new db schema in findbad, findbadpcu, nodequery,
[monitor.git] / diagnose.py
1 #!/usr/bin/python
2 #
3 # Copyright (c) 2004  The Trustees of Princeton University (Trustees).
4
5 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
6 # Stephen Soltesz <soltesz@cs.princeton.edu>
7 #
8 # $Id: diagnose.py,v 1.1 2007/08/08 13:36:46 soltesz Exp $
9
10 import sys
11 from threading import *
12 import time
13 import logging
14 import Queue
15 from sets import Set
16
17 # Global config options
18 import parser as parsermodule
19 parser = parsermodule.getParser()
20
21 parser.set_defaults(nodelist=None, 
22                                         refresh=False,
23                                         cachert=False, 
24                                         cachenodes=False, 
25                                         blacklist=None, 
26                                         ticketlist=None)
27
28 parser.add_option("", "--nodelist", dest="nodelist", metavar="filename",
29                                         help="Read nodes to act on from specified file")
30 parser.add_option("", "--refresh", action="store_true", dest="refresh",
31                                         help="Refresh the cached values")
32 parser.add_option("", "--cachert", action="store_true", dest="cachert",
33                                         help="Cache the RT database query")
34 parser.add_option("", "--cachenodes", action="store_true", dest="cachenodes",
35                                         help="Cache node lookup from PLC")
36 parser.add_option("", "--ticketlist", dest="ticketlist",
37                                         help="Whitelist all RT tickets in this file")
38 parser.add_option("", "--blacklist", dest="blacklist",
39                                         help="Blacklist all nodes in this file")
40
41 config = parsermodule.parse_args(parser)
42
43 # daemonize and *pid
44 #from util.process import * 
45
46 # RT tickets
47 import rt
48 # Correlates input with policy to form actions
49 import policy
50 import moncommands
51 import database 
52 import plc
53 import syncplcdb
54
55 # Log to what 
56 LOG="./monitor.log"
57
58 # Time to refresh DB and remove unused entries
59 RTSLEEP=7200 #2hrs
60 # Time between policy enforce/update
61 #POLSLEEP=43200 #12hrs
62 POLSLEEP=10
63
64 # Global list of all running threads.  Any threads added to 
65 # list will be monitored.
66 runningthreads = {}
67 # Seconds between checking threads
68 WATCHSLEEP = 5
69  
70 # Set up Logging
71 logger = logging.getLogger("monitor")
72 logger.setLevel(logging.DEBUG)
73 fh = logging.FileHandler(LOG, mode = 'a')
74 fh.setLevel(logging.DEBUG)
75 formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
76 fh.setFormatter(formatter)
77 logger.addHandler(fh)
78
79
80 """
81 Launches threads and adds them to the runningthreads global list.
82 Assigns name for thread, starts.
83 """
84 def startThread(fnct, name):
85                 runningthreads[name] = fnct
86                 runningthreads[name].setName(name)
87                 try:
88                         logger.info("Starting thread " + name)
89                         runningthreads[name].start()
90                 except Exception, err:
91                         logger.error("Thread: " + name + " " + error)
92
93
94 """
95 Watches threads and catches exceptions.  Each launched thread is
96 watched and state is logged.
97 """
98 class ThreadWatcher(Thread):
99         def __init__(self):
100                 Thread.__init__(self)
101
102         def run(self):
103                 while 1:
104                         self.checkThreads()
105                         time.sleep(WATCHSLEEP)
106
107         def checkThreads(self):
108                 # Iterate through treads, compare with last running.
109                 for thread in runningthreads.keys():
110                         # If thread found dead, remove from queue
111                         #print "found %s" % thread
112                         if not runningthreads[thread].isAlive():
113                                 logger.error("***********Thread died: %s**********" %(thread))
114                                 del runningthreads[thread]
115                 return len(runningthreads.keys())
116
117
118 class Dummy(Thread):
119         def __init__(self):
120                 Thread.__init__(self)
121
122         def run(self):
123                 time.sleep(5)
124
125 def dict_from_nodelist(nl):
126         d = {}
127         for host in nl:
128                 h = host['hostname']
129                 d[h] = host
130         return d
131
132
133
134 """
135 Start threads, do some housekeeping, then daemonize.
136 """
137 def main():
138         # Defaults
139         global status, logger
140         global config
141
142         logger.info('Diagnose Started')
143         print 'Diagnose Started'
144
145         ##########  VARIABLES   ########################################
146         # Queue between Merge and RT
147         toRT = Queue.Queue()
148
149         # Queue between RT and Diagnose
150         fromRT = Queue.Queue()
151
152         #########  GET NODES    ########################################
153         logger.info('Get Nodes from PLC')
154         print "getnode from plc: %s %s %s" % (config.debug, config.cachenodes, config.refresh)
155         l_plcnodes = database.if_cached_else_refresh(config.cachenodes, 
156                                                                 config.refresh, "l_plcnodes",
157                                                                 lambda : syncplcdb.create_plcdb() )
158
159         s_plcnodenames = Set([x['hostname'] for x in l_plcnodes])
160
161         # List of nodes from a user-provided file.
162         if config.nodelist:
163                 file = config.nodelist
164                 nodelist = config.getListFromFile(file)
165         
166                 s_usernodes = Set(nodelist)
167                 # SAFE nodes are in PLC and the list 
168                 s_safe_usernodes   = s_plcnodenames & s_usernodes
169                 # UNSAFE nodes are list but not in PLC. i.e. do not monitor.
170                 s_unsafe_usernodes = s_usernodes - s_plcnodenames
171                 if len(s_unsafe_usernodes) > 0 :
172                         for node in s_unsafe_usernodes:
173                                 print "WARNING: User provided: %s but not found in PLC" % node
174
175                 l_nodes = filter(lambda x: x['hostname'] in s_safe_usernodes,l_plcnodes)
176         else:
177                 l_nodes = l_plcnodes
178
179         print "len of l_nodes: %d" % len(l_nodes)
180         # Minus blacklisted ones..
181         l_blacklist = database.if_cached_else(1, "l_blacklist", lambda : [])
182         l_ticket_blacklist = database.if_cached_else(1,"l_ticket_blacklist",lambda : [])
183         l_nodes  = filter(lambda x : not x['hostname'] in l_blacklist, l_nodes)
184
185         logger.info('Get Tickets from RT')
186         #######  RT tickets    #########################################
187         t = moncommands.MyTimer()
188         ad_dbTickets = database.if_cached_else_refresh(config.cachert, config.refresh, "ad_dbTickets", rt.rt_tickets)
189         if ad_dbTickets == "":
190                 print "ad_dbTickets failed..."
191                 sys.exit(1)
192         print "Getting tickets from RT took: %f sec" % t.diff() ; del t
193
194         logger.info('Start Merge/RT/Diagnose threads')
195         ####### Merge
196         # only look at nodes in l_nodes
197         merge = policy.Merge( [node['hostname'] for node in l_nodes], toRT)
198         startThread(merge,"merge")
199         ####### RT
200         rt1   = rt.RT(ad_dbTickets, toRT, fromRT, l_ticket_blacklist)
201         startThread(rt1,"rt1")
202         ####### Diagnose
203         diagnose = policy.Diagnose(fromRT)
204         startThread(diagnose,"diagnose")
205
206
207         tw = ThreadWatcher()
208         while True:
209                 if tw.checkThreads() == 0:
210                         break
211                 print "waiting... %s" % time.time()
212                 time.sleep(WATCHSLEEP)
213
214         logger.info('Diagnose Exitting')
215         sys.exit(0)
216         
217 if __name__ == '__main__':
218         try:
219                 main()
220         except KeyboardInterrupt:
221                 print "Killed.  Exitting."
222                 logger.info('Diagnose Killed')
223                 sys.exit(0)