+ findbad.py: this actively probes all machines in the PLC db, using ping,
[monitor.git] / diagnose.py
1 #!/usr/bin/python
2 #
3 # Copyright (c) 2004  The Trustees of Princeton University (Trustees).
4
5 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
6 # Stephen Soltesz <soltesz@cs.princeton.edu>
7 #
8 # $Id$
9
10 import sys
11 from threading import *
12 import time
13 import logging
14 import Queue
15 from sets import Set
16
17 # Global config options
18 from config import config
19 from optparse import OptionParser
20 parser = OptionParser()
21
22 parser.set_defaults(nodelist=None, 
23                                         cachert=False, 
24                                         cachenodes=False, 
25                                         blacklist=None, 
26                                         ticketlist=None)
27
28 parser.add_option("", "--nodelist", dest="nodelist", metavar="filename",
29                                         help="Read nodes to act on from specified file")
30 parser.add_option("", "--cachert", action="store_true",
31                                         help="Cache the RT database query")
32 parser.add_option("", "--cachenodes", action="store_true",
33                                         help="Cache node lookup from PLC")
34 parser.add_option("", "--ticketlist", dest="ticketlist",
35                                         help="Whitelist all RT tickets in this file")
36 parser.add_option("", "--blacklist", dest="blacklist",
37                                         help="Blacklist all nodes in this file")
38
39 config = config(parser)
40 print "bcalling parse_args"
41 config.parse_args()
42
43 # daemonize and *pid
44 #from util.process import * 
45
46 # RT tickets
47 import rt
48 # Correlates input with policy to form actions
49 import policy
50 import soltesz
51 import plc
52 import syncplcdb
53
54 # Log to what 
55 LOG="./monitor.log"
56
57 # Time to refresh DB and remove unused entries
58 RTSLEEP=7200 #2hrs
59 # Time between policy enforce/update
60 #POLSLEEP=43200 #12hrs
61 POLSLEEP=10
62
63 # Global list of all running threads.  Any threads added to 
64 # list will be monitored.
65 runningthreads = {}
66 # Seconds between checking threads
67 WATCHSLEEP = 10
68  
69 # Set up Logging
70 logger = logging.getLogger("monitor")
71 logger.setLevel(logging.DEBUG)
72 fh = logging.FileHandler(LOG, mode = 'a')
73 fh.setLevel(logging.DEBUG)
74 formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
75 fh.setFormatter(formatter)
76 logger.addHandler(fh)
77
78
79 """
80 Launches threads and adds them to the runningthreads global list.
81 Assigns name for thread, starts.
82 """
83 def startThread(fnct, name):
84                 runningthreads[name] = fnct
85                 runningthreads[name].setName(name)
86                 try:
87                         logger.info("Starting thread " + name)
88                         runningthreads[name].start()
89                 except Exception, err:
90                         logger.error("Thread: " + name + " " + error)
91
92
93 """
94 Watches threads and catches exceptions.  Each launched thread is
95 watched and state is logged.
96 """
97 class ThreadWatcher(Thread):
98         def __init__(self):
99                 Thread.__init__(self)
100
101         def run(self):
102                 while 1:
103                         self.checkThreads()
104                         time.sleep(WATCHSLEEP)
105
106         def checkThreads(self):
107                 # Iterate through treads, compare with last running.
108                 for thread in runningthreads.keys():
109                         # If thread found dead, remove from queue
110                         #print "found %s" % thread
111                         if not runningthreads[thread].isAlive():
112                                 logger.error("***********Thread died: %s**********" %(thread))
113                                 del runningthreads[thread]
114                 return len(runningthreads.keys())
115
116
117 class Dummy(Thread):
118         def __init__(self):
119                 Thread.__init__(self)
120
121         def run(self):
122                 time.sleep(5)
123
124 def dict_from_nodelist(nl):
125         d = {}
126         for host in nl:
127                 h = host['hostname']
128                 d[h] = host
129         return d
130
131
132
133 """
134 Start threads, do some housekeeping, then daemonize.
135 """
136 def main():
137         # Defaults
138         global status, logger
139         global config
140
141         logger.info('Diagnose Started')
142         print 'Diagnose Started'
143
144         ##########  VARIABLES   ########################################
145         # Queue between Merge and RT
146         toRT = Queue.Queue()
147
148         # Queue between RT and Diagnose
149         fromRT = Queue.Queue()
150
151         #########  GET NODES    ########################################
152         logger.info('Get Nodes from PLC')
153         print "getnode from plc"
154         l_plcnodes = soltesz.if_cached_else(config.cachenodes, "l_plcnodes",
155                                 lambda : syncplcdb.create_plcdb() )
156
157         s_plcnodenames = Set([x['hostname'] for x in l_plcnodes])
158
159         # List of nodes from a user-provided file.
160         if config.nodelist:
161                 file = config.nodelist
162                 nodelist = config.getListFromFile(file)
163         
164                 s_usernodes = Set(nodelist)
165                 # SAFE nodes are in PLC and the list 
166                 s_safe_usernodes   = s_plcnodenames & s_usernodes
167                 # UNSAFE nodes are list but not in PLC. i.e. do not monitor.
168                 s_unsafe_usernodes = s_usernodes - s_plcnodenames
169                 if len(s_unsafe_usernodes) > 0 :
170                         for node in s_unsafe_usernodes:
171                                 print "WARNING: User provided: %s but not found in PLC" % node
172
173                 l_nodes = filter(lambda x: x['hostname'] in s_safe_usernodes,l_plcnodes)
174         else:
175                 l_nodes = l_plcnodes
176
177         print "len of l_nodes: %d" % len(l_nodes)
178         # Minus blacklisted ones..
179         l_blacklist = soltesz.if_cached_else(1, "l_blacklist", lambda : [])
180         l_ticket_blacklist = soltesz.if_cached_else(1,"l_ticket_blacklist",lambda : [])
181         l_nodes  = filter(lambda x : not x['hostname'] in l_blacklist, l_nodes)
182
183         logger.info('Get Tickets from RT')
184         #######  RT tickets    #########################################
185         t = soltesz.MyTimer()
186         ad_dbTickets = soltesz.if_cached_else(config.cachert, "ad_dbTickets", rt.rt_tickets)
187         print "Getting tickets from RT took: %f sec" % t.diff() ; del t
188
189         logger.info('Start Merge/RT/Diagnose threads')
190         ####### Merge
191         # only look at nodes in l_nodes
192         merge = policy.Merge( [node['hostname'] for node in l_nodes], toRT)
193         startThread(merge,"merge")
194         ####### RT
195         rt1   = rt.RT(ad_dbTickets, toRT, fromRT, l_ticket_blacklist)
196         startThread(rt1,"rt1")
197         ####### Diagnose
198         diagnose = policy.Diagnose(fromRT)
199         startThread(diagnose,"diagnose")
200
201
202         tw = ThreadWatcher()
203         while True:
204                 if tw.checkThreads() == 0:
205                         break
206                 print "waiting... %s" % time.time()
207                 time.sleep(WATCHSLEEP)
208
209         logger.info('Diagnose Exitting')
210         sys.exit(0)
211         
212 if __name__ == '__main__':
213         try:
214                 main()
215         except KeyboardInterrupt:
216                 print "Killed.  Exitting."
217                 logger.info('Diagnose Killed')
218                 sys.exit(0)