3 # Copyright (c) 2004 The Trustees of Princeton University (Trustees).
5 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
6 # Stephen Soltesz <soltesz@cs.princeton.edu>
8 # $Id: monitor.py,v 1.5 2007/05/16 01:53:46 faiyaza Exp $
14 from threading import *
18 # Global config options
19 from config import config
22 from util.process import *
28 # Correlates input with policy to form actions
41 FROM="support@planet-lab.org"
42 TECHEMAIL="tech-%s@sites.planet-lab.org"
43 PIEMAIL="pi-%s@sites.planet-lab.org"
46 XMLRPC_SERVER = 'https://www.planet-lab.org/PLCAPI/'
48 # Time between comon refresh
50 # Time to refresh DB and remove unused entries
52 # Time between policy enforce/update
53 #POLSLEEP=43200 #12hrs
56 # Global list of all running threads. Any threads added to
57 # list will be monitored.
59 # Seconds between checking threads
63 logger = logging.getLogger("monitor")
64 logger.setLevel(logging.DEBUG)
65 fh = logging.FileHandler(LOG, mode = 'a')
66 fh.setLevel(logging.DEBUG)
67 formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
68 fh.setFormatter(formatter)
73 Launches threads and adds them to the runningthreads global list.
74 Assigns name for thread, starts.
76 def startThread(fnct, name):
77 runningthreads[name] = fnct
78 runningthreads[name].setName(name)
80 logger.info("Starting thread " + name)
81 runningthreads[name].start()
82 except Exception, err:
83 logger.error("Thread: " + name + " " + error)
87 Watches threads and catches exceptions. Each launched thread is
88 watched and state is logged.
90 class ThreadWatcher(Thread):
97 time.sleep(WATCHSLEEP)
99 def checkThreads(self):
100 # Iterate through treads, compare with last running.
101 for thread in runningthreads.keys():
102 # If thread found dead, remove from queue
103 #print "found %s" % thread
104 if not runningthreads[thread].isAlive():
105 logger.error("***********Thread died: %s**********" %(thread))
106 del runningthreads[thread]
107 return len(runningthreads.keys())
112 Thread.__init__(self)
118 def dict_from_nodelist(nl):
126 Start threads, do some housekeeping, then daemonize.
130 global status, logger
134 # writepid("monitor")
136 logger.info('Monitor Started')
138 ########## VARIABLES ########################################
139 # Nodes to check. Queue of all sick nodes.
140 toCheck = Queue.Queue()
141 # Nodes that are sick w/o tickets
142 sickNoTicket = Queue.Queue()
143 # Comon DB of all nodes
147 # Nodes we've emailed.
148 # host - > (type of email, time)
151 ######### GET NODES ########################################
152 # TODO: get authoritative node list from PLC every PLCSLEEP seconds,
153 # feed this into Comon.
155 # List of nodes from a user-provided file.
157 file = config.userlist
158 nodelist = config.getListFromFile(file)
160 print "Getting node info for hosts in: %s" % file
161 for nodename in nodelist:
162 l_nodes += plc.getNodes({'hostname': nodename})
164 # Authoritative list of nodes from PLC
165 l_nodes = soltesz.if_cached_else(config.cachenodes, "l_nodes", plc.getNodes)
167 # Minus blacklisted ones..
168 l_blacklist = soltesz.if_cached_else(1, "l_blacklist", lambda : [])
169 l_wl_nodes = filter(lambda x : not x['hostname'] in l_blacklist, l_nodes)
170 # A handy dict of hostname-to-nodestruct mapping
171 d_allplc_nodes = dict_from_nodelist(l_wl_nodes)
173 ####### RT tickets #########################################
174 t = soltesz.MyTimer()
175 ad_dbTickets = soltesz.if_cached_else(config.cachert, "ad_dbTickets", rt.rt_tickets)
176 print "Getting tickets from RT took: %f sec" % t.diff() ; del t
178 # TODO: Refreshes Comon data every COSLEEP seconds
179 cm1 = comon.Comon(cdb, d_allplc_nodes, toCheck)
180 startThread(cm1,"comon")
182 # TODO: make queues event based, not node based.
183 # From the RT db, add hosts to q(toCheck) for filtering the comon nodes.
184 rt1 = rt.RT(ad_dbTickets, tickets, toCheck, sickNoTicket)
185 # Kind of a hack. Cleans the DB for stale entries and updates db.
187 # rt5 = rt.RT(ad_dbTickets, tickets, toCheck, sickNoTicket)
188 # clean = Thread(target=rt5.cleanTickets)
190 startThread(rt1,"rt1")
191 # startThread(rt5,"rt5")
192 # startThread(clean,"cleanrt5")
194 # Actually digest the info and do something with it.
195 pol = policy.Policy(cm1, sickNoTicket, emailed)
196 # Start Sending Emails
197 startThread(pol, "policy")
202 if tw.checkThreads() == 0:
204 time.sleep(WATCHSLEEP)
206 logger.info('Monitor Exitting')
208 # removepid("monitor")
210 # Store state of emails
211 #pol.emailedStore("WRITE")
212 soltesz.dbDump("l_blacklist")
213 soltesz.dbDump("ad_dbTickets")
216 if __name__ == '__main__':
219 except KeyboardInterrupt:
220 print "Killed. Exitting."
221 logger.info('Monitor Killed')
222 #soltesz.dbDump("l_blacklist")
223 #soltesz.dbDump("ad_dbTickets")