3 # Copyright (c) 2004 The Trustees of Princeton University (Trustees).
5 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
6 # Stephen Soltesz <soltesz@cs.princeton.edu>
8 # $Id: monitor.py,v 1.7 2007/07/03 19:59:02 soltesz Exp $
14 from threading import *
19 # Global config options
20 from config import config
22 from util.process import *
28 # Correlates input with policy to form actions
36 # Time to refresh DB and remove unused entries
38 # Time between policy enforce/update
39 #POLSLEEP=43200 #12hrs
42 # Global list of all running threads. Any threads added to
43 # list will be monitored.
45 # Seconds between checking threads
49 logger = logging.getLogger("monitor")
50 logger.setLevel(logging.DEBUG)
51 fh = logging.FileHandler(LOG, mode = 'a')
52 fh.setLevel(logging.DEBUG)
53 formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
54 fh.setFormatter(formatter)
59 Launches threads and adds them to the runningthreads global list.
60 Assigns name for thread, starts.
62 def startThread(fnct, name):
63 runningthreads[name] = fnct
64 runningthreads[name].setName(name)
66 logger.info("Starting thread " + name)
67 runningthreads[name].start()
68 except Exception, err:
69 logger.error("Thread: " + name + " " + error)
73 Watches threads and catches exceptions. Each launched thread is
74 watched and state is logged.
76 class ThreadWatcher(Thread):
83 time.sleep(WATCHSLEEP)
85 def checkThreads(self):
86 # Iterate through treads, compare with last running.
87 for thread in runningthreads.keys():
88 # If thread found dead, remove from queue
89 #print "found %s" % thread
90 if not runningthreads[thread].isAlive():
91 logger.error("***********Thread died: %s**********" %(thread))
92 del runningthreads[thread]
93 return len(runningthreads.keys())
103 def dict_from_nodelist(nl):
111 Start threads, do some housekeeping, then daemonize.
115 global status, logger
120 # writepid("monitor")
125 logger.info('Monitor Started')
126 ########## VARIABLES ########################################
127 # Nodes to check. Queue of all sick nodes.
128 toCheck = Queue.Queue()
129 # Nodes that are sick w/o tickets
130 sickNoTicket = Queue.Queue()
131 # Comon DB of all nodes
135 # Nodes we've emailed.
136 # host - > (type of email, time)
139 ######### GET NODES ########################################
140 # TODO: get authoritative node list from PLC every PLCSLEEP seconds,
141 # feed this into Comon.
142 l_plcnodes = soltesz.if_cached_else(config.cachenodes,
144 lambda : plc.getNodes({'peer_id':None}))
146 s_plcnodes = Set([x['hostname'] for x in l_plcnodes])
148 # List of nodes from a user-provided file.
150 file = config.nodelist
151 nodelist = config.getListFromFile(file)
153 print "Getting node info for hosts in: %s" % file
154 for nodename in nodelist:
155 if config.debug: print ".", ; sys.stdout.flush()
156 l_nodelist += plc.getNodes({'hostname': nodename, 'peer_id':None})
157 if config.debug: print ""
159 s_usernodes = Set(nodelist)
160 # nodes from PLC and in the user list.
161 s_safe_usernodes = s_plcnodes & s_usernodes
162 s_unsafe_usernodes = s_usernodes - s_plcnodes
163 if len(s_unsafe_usernodes) > 0 :
164 for node in s_unsafe_usernodes:
165 print "WARNING: User provided: %s but not found in PLC" % node
167 l_nodes = filter(lambda x: x['hostname'] in s_safe_usernodes,l_plcnodes)
171 # Minus blacklisted ones..
172 l_blacklist = soltesz.if_cached_else(1, "l_blacklist", lambda : [])
173 l_ticket_blacklist = soltesz.if_cached_else(1,"l_ticket_blacklist",lambda : [])
174 l_wl_nodes = filter(lambda x : not x['hostname'] in l_blacklist, l_nodes)
175 # A handy dict of hostname-to-nodestruct mapping
176 d_allplc_nodes = dict_from_nodelist(l_wl_nodes)
178 ####### RT tickets #########################################
179 t = soltesz.MyTimer()
180 ad_dbTickets = soltesz.if_cached_else(config.cachert, "ad_dbTickets", rt.rt_tickets)
181 print "Getting tickets from RT took: %f sec" % t.diff() ; del t
183 # TODO: get input nodes from findbad database, pipe into toCheck
184 cm1 = read_findbad_db(d_allplc_nodes, toCheck)
186 # Search for toCheck nodes in the RT db.
187 rt1 = rt.RT(ad_dbTickets, tickets, toCheck, sickNoTicket, l_ticket_blacklist)
188 # Kind of a hack. Cleans the DB for stale entries and updates db.
190 # rt5 = rt.RT(ad_dbTickets, tickets, toCheck, sickNoTicket)
191 # clean = Thread(target=rt5.cleanTickets)
193 startThread(rt1,"rt1")
194 # startThread(rt5,"rt5")
195 # startThread(clean,"cleanrt5")
197 # Actually digest the info and do something with it.
198 pol = policy.Policy(cm1, sickNoTicket, emailed)
199 # Start Sending Emails
200 startThread(pol, "policy")
205 if tw.checkThreads() == 0:
207 time.sleep(WATCHSLEEP)
209 logger.info('Monitor Exitting')
211 # removepid("monitor")
213 # Store state of emails
214 #pol.emailedStore("WRITE")
215 soltesz.dbDump("ad_dbTickets")
218 if __name__ == '__main__':
221 except KeyboardInterrupt:
222 print "Killed. Exitting."
223 logger.info('Monitor Killed')
224 #soltesz.dbDump("ad_dbTickets")