Changes necessary for the new operating environment. rt_tickets, returns empty strin...
[monitor.git] / diagnose.py
1 #!/usr/bin/python
2 #
3 # Copyright (c) 2004  The Trustees of Princeton University (Trustees).
4
5 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
6 # Stephen Soltesz <soltesz@cs.princeton.edu>
7 #
8 # $Id: diagnose.py,v 1.1 2007/08/08 13:36:46 soltesz Exp $
9
10 import sys
11 from threading import *
12 import time
13 import logging
14 import Queue
15 from sets import Set
16
17 # Global config options
18 from config import config
19 from optparse import OptionParser
20 parser = OptionParser()
21
22 parser.set_defaults(nodelist=None, 
23                                         refresh=False,
24                                         cachert=False, 
25                                         cachenodes=False, 
26                                         blacklist=None, 
27                                         ticketlist=None)
28
29 parser.add_option("", "--nodelist", dest="nodelist", metavar="filename",
30                                         help="Read nodes to act on from specified file")
31 parser.add_option("", "--refresh", action="store_true", dest="refresh",
32                                         help="Refresh the cached values")
33 parser.add_option("", "--cachert", action="store_true", dest="cachert",
34                                         help="Cache the RT database query")
35 parser.add_option("", "--cachenodes", action="store_true", dest="cachenodes",
36                                         help="Cache node lookup from PLC")
37 parser.add_option("", "--ticketlist", dest="ticketlist",
38                                         help="Whitelist all RT tickets in this file")
39 parser.add_option("", "--blacklist", dest="blacklist",
40                                         help="Blacklist all nodes in this file")
41
42 config = config(parser)
43 config.parse_args()
44
45 # daemonize and *pid
46 #from util.process import * 
47
48 # RT tickets
49 import rt
50 # Correlates input with policy to form actions
51 import policy
52 import soltesz
53 import plc
54 import syncplcdb
55
56 # Log to what 
57 LOG="./monitor.log"
58
59 # Time to refresh DB and remove unused entries
60 RTSLEEP=7200 #2hrs
61 # Time between policy enforce/update
62 #POLSLEEP=43200 #12hrs
63 POLSLEEP=10
64
65 # Global list of all running threads.  Any threads added to 
66 # list will be monitored.
67 runningthreads = {}
68 # Seconds between checking threads
69 WATCHSLEEP = 10
70  
71 # Set up Logging
72 logger = logging.getLogger("monitor")
73 logger.setLevel(logging.DEBUG)
74 fh = logging.FileHandler(LOG, mode = 'a')
75 fh.setLevel(logging.DEBUG)
76 formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
77 fh.setFormatter(formatter)
78 logger.addHandler(fh)
79
80
81 """
82 Launches threads and adds them to the runningthreads global list.
83 Assigns name for thread, starts.
84 """
85 def startThread(fnct, name):
86                 runningthreads[name] = fnct
87                 runningthreads[name].setName(name)
88                 try:
89                         logger.info("Starting thread " + name)
90                         runningthreads[name].start()
91                 except Exception, err:
92                         logger.error("Thread: " + name + " " + error)
93
94
95 """
96 Watches threads and catches exceptions.  Each launched thread is
97 watched and state is logged.
98 """
99 class ThreadWatcher(Thread):
100         def __init__(self):
101                 Thread.__init__(self)
102
103         def run(self):
104                 while 1:
105                         self.checkThreads()
106                         time.sleep(WATCHSLEEP)
107
108         def checkThreads(self):
109                 # Iterate through treads, compare with last running.
110                 for thread in runningthreads.keys():
111                         # If thread found dead, remove from queue
112                         #print "found %s" % thread
113                         if not runningthreads[thread].isAlive():
114                                 logger.error("***********Thread died: %s**********" %(thread))
115                                 del runningthreads[thread]
116                 return len(runningthreads.keys())
117
118
119 class Dummy(Thread):
120         def __init__(self):
121                 Thread.__init__(self)
122
123         def run(self):
124                 time.sleep(5)
125
126 def dict_from_nodelist(nl):
127         d = {}
128         for host in nl:
129                 h = host['hostname']
130                 d[h] = host
131         return d
132
133
134
135 """
136 Start threads, do some housekeeping, then daemonize.
137 """
138 def main():
139         # Defaults
140         global status, logger
141         global config
142
143         logger.info('Diagnose Started')
144         print 'Diagnose Started'
145
146         ##########  VARIABLES   ########################################
147         # Queue between Merge and RT
148         toRT = Queue.Queue()
149
150         # Queue between RT and Diagnose
151         fromRT = Queue.Queue()
152
153         #########  GET NODES    ########################################
154         logger.info('Get Nodes from PLC')
155         print "getnode from plc: %s %s %s" % (config.debug, config.cachenodes, config.refresh)
156         l_plcnodes = soltesz.if_cached_else_refresh(config.cachenodes, 
157                                                                 config.refresh, "l_plcnodes",
158                                                                 lambda : syncplcdb.create_plcdb() )
159
160         s_plcnodenames = Set([x['hostname'] for x in l_plcnodes])
161
162         # List of nodes from a user-provided file.
163         if config.nodelist:
164                 file = config.nodelist
165                 nodelist = config.getListFromFile(file)
166         
167                 s_usernodes = Set(nodelist)
168                 # SAFE nodes are in PLC and the list 
169                 s_safe_usernodes   = s_plcnodenames & s_usernodes
170                 # UNSAFE nodes are list but not in PLC. i.e. do not monitor.
171                 s_unsafe_usernodes = s_usernodes - s_plcnodenames
172                 if len(s_unsafe_usernodes) > 0 :
173                         for node in s_unsafe_usernodes:
174                                 print "WARNING: User provided: %s but not found in PLC" % node
175
176                 l_nodes = filter(lambda x: x['hostname'] in s_safe_usernodes,l_plcnodes)
177         else:
178                 l_nodes = l_plcnodes
179
180         print "len of l_nodes: %d" % len(l_nodes)
181         # Minus blacklisted ones..
182         l_blacklist = soltesz.if_cached_else(1, "l_blacklist", lambda : [])
183         l_ticket_blacklist = soltesz.if_cached_else(1,"l_ticket_blacklist",lambda : [])
184         l_nodes  = filter(lambda x : not x['hostname'] in l_blacklist, l_nodes)
185
186         logger.info('Get Tickets from RT')
187         #######  RT tickets    #########################################
188         t = soltesz.MyTimer()
189         ad_dbTickets = soltesz.if_cached_else_refresh(config.cachert, config.refresh, "ad_dbTickets", rt.rt_tickets)
190         if ad_dbTickets == "":
191                 print "ad_dbTickets failed..."
192                 sys.exit(1)
193         print "Getting tickets from RT took: %f sec" % t.diff() ; del t
194
195         logger.info('Start Merge/RT/Diagnose threads')
196         ####### Merge
197         # only look at nodes in l_nodes
198         merge = policy.Merge( [node['hostname'] for node in l_nodes], toRT)
199         startThread(merge,"merge")
200         ####### RT
201         rt1   = rt.RT(ad_dbTickets, toRT, fromRT, l_ticket_blacklist)
202         startThread(rt1,"rt1")
203         ####### Diagnose
204         diagnose = policy.Diagnose(fromRT)
205         startThread(diagnose,"diagnose")
206
207
208         tw = ThreadWatcher()
209         while True:
210                 if tw.checkThreads() == 0:
211                         break
212                 print "waiting... %s" % time.time()
213                 time.sleep(WATCHSLEEP)
214
215         logger.info('Diagnose Exitting')
216         sys.exit(0)
217         
218 if __name__ == '__main__':
219         try:
220                 main()
221         except KeyboardInterrupt:
222                 print "Killed.  Exitting."
223                 logger.info('Diagnose Killed')
224                 sys.exit(0)