3 # Copyright (c) 2004 The Trustees of Princeton University (Trustees).
5 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
6 # Stephen Soltesz <soltesz@cs.princeton.edu>
8 # $Id: monitor.py,v 1.6 2007/06/29 12:42:22 soltesz Exp $
14 from threading import *
18 # Global config options
19 from config import config
22 from util.process import *
28 # Correlates input with policy to form actions
41 FROM="support@planet-lab.org"
42 TECHEMAIL="tech-%s@sites.planet-lab.org"
43 PIEMAIL="pi-%s@sites.planet-lab.org"
46 XMLRPC_SERVER = 'https://www.planet-lab.org/PLCAPI/'
48 # Time between comon refresh
50 # Time to refresh DB and remove unused entries
52 # Time between policy enforce/update
53 #POLSLEEP=43200 #12hrs
56 # Global list of all running threads. Any threads added to
57 # list will be monitored.
59 # Seconds between checking threads
63 logger = logging.getLogger("monitor")
64 logger.setLevel(logging.DEBUG)
65 fh = logging.FileHandler(LOG, mode = 'a')
66 fh.setLevel(logging.DEBUG)
67 formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
68 fh.setFormatter(formatter)
73 Launches threads and adds them to the runningthreads global list.
74 Assigns name for thread, starts.
76 def startThread(fnct, name):
77 runningthreads[name] = fnct
78 runningthreads[name].setName(name)
80 logger.info("Starting thread " + name)
81 runningthreads[name].start()
82 except Exception, err:
83 logger.error("Thread: " + name + " " + error)
87 Watches threads and catches exceptions. Each launched thread is
88 watched and state is logged.
90 class ThreadWatcher(Thread):
97 time.sleep(WATCHSLEEP)
99 def checkThreads(self):
100 # Iterate through treads, compare with last running.
101 for thread in runningthreads.keys():
102 # If thread found dead, remove from queue
103 #print "found %s" % thread
104 if not runningthreads[thread].isAlive():
105 logger.error("***********Thread died: %s**********" %(thread))
106 del runningthreads[thread]
107 return len(runningthreads.keys())
112 Thread.__init__(self)
117 def preComon(l_nodes, toCheck):
120 diag_node['nodename'] = host
121 diag_node['message'] = None
122 diag_node['bucket'] = ["dbg"]
123 diag_node['stage'] = ""
124 diag_node['args'] = None
125 diag_node['info'] = None
126 diag_node['time'] = time.time()
127 toCheck.put(diag_node)
130 def dict_from_nodelist(nl):
138 Start threads, do some housekeeping, then daemonize.
142 global status, logger
146 # writepid("monitor")
148 logger.info('Monitor Started')
150 ########## VARIABLES ########################################
151 # Nodes to check. Queue of all sick nodes.
152 toCheck = Queue.Queue()
153 # Nodes that are sick w/o tickets
154 sickNoTicket = Queue.Queue()
155 # Comon DB of all nodes
159 # Nodes we've emailed.
160 # host - > (type of email, time)
163 ######### GET NODES ########################################
164 # TODO: get authoritative node list from PLC every PLCSLEEP seconds,
165 # feed this into Comon.
167 # List of nodes from a user-provided file.
169 file = config.userlist
170 nodelist = config.getListFromFile(file)
172 print "Getting node info for hosts in: %s" % file
173 for nodename in nodelist:
174 if config.debug: print ".", ; sys.stdout.flush()
175 l_nodes += plc.getNodes({'hostname': nodename})
178 # Authoritative list of nodes from PLC
179 l_nodes = soltesz.if_cached_else(config.cachenodes, "l_nodes", plc.getNodes)
181 # Minus blacklisted ones..
182 l_blacklist = soltesz.if_cached_else(1, "l_blacklist", lambda : [])
183 l_ticket_blacklist = soltesz.if_cached_else(1,"l_ticket_blacklist",lambda : [])
184 l_wl_nodes = filter(lambda x : not x['hostname'] in l_blacklist, l_nodes)
185 # A handy dict of hostname-to-nodestruct mapping
186 d_allplc_nodes = dict_from_nodelist(l_wl_nodes)
188 ####### RT tickets #########################################
189 t = soltesz.MyTimer()
190 ad_dbTickets = soltesz.if_cached_else(config.cachert, "ad_dbTickets", rt.rt_tickets)
191 print "Getting tickets from RT took: %f sec" % t.diff() ; del t
193 if os.path.isfile("precomon.txt"):
194 nodelist = config.getListFromFile("precomon.txt")
195 print "PreComon node info"
196 preComon(nodelist, toCheck)
197 for nodename in nodelist:
198 # TODO: temporary hack.
199 if nodename not in d_allplc_nodes:
200 d_allplc_nodes[nodename] = {}
202 # TODO: Refreshes Comon data every COSLEEP seconds
203 cm1 = comon.Comon(cdb, d_allplc_nodes, toCheck)
204 startThread(cm1,"comon")
206 # TODO: make queues event based, not node based.
207 # From the RT db, add hosts to q(toCheck) for filtering the comon nodes.
208 rt1 = rt.RT(ad_dbTickets, tickets, toCheck, sickNoTicket, l_ticket_blacklist)
209 # Kind of a hack. Cleans the DB for stale entries and updates db.
211 # rt5 = rt.RT(ad_dbTickets, tickets, toCheck, sickNoTicket)
212 # clean = Thread(target=rt5.cleanTickets)
214 startThread(rt1,"rt1")
215 # startThread(rt5,"rt5")
216 # startThread(clean,"cleanrt5")
218 # Actually digest the info and do something with it.
219 pol = policy.Policy(cm1, sickNoTicket, emailed)
220 # Start Sending Emails
221 startThread(pol, "policy")
226 if tw.checkThreads() == 0:
228 time.sleep(WATCHSLEEP)
230 logger.info('Monitor Exitting')
232 # removepid("monitor")
234 # Store state of emails
235 #pol.emailedStore("WRITE")
236 soltesz.dbDump("l_blacklist")
237 soltesz.dbDump("ad_dbTickets")
240 if __name__ == '__main__':
243 except KeyboardInterrupt:
244 print "Killed. Exitting."
245 logger.info('Monitor Killed')
246 #soltesz.dbDump("l_blacklist")
247 #soltesz.dbDump("ad_dbTickets")