+ format time record
[monitor.git] / monitor.py
1 #!/usr/bin/python
2 #
3 # Copyright (c) 2004  The Trustees of Princeton University (Trustees).
4
5 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
6 # Stephen Soltesz <soltesz@cs.princeton.edu>
7 #
8 # $Id: monitor.py,v 1.6 2007/06/29 12:42:22 soltesz Exp $
9
10 import sys
11 import os
12 import getopt 
13 import thread
14 from threading import *
15 import time
16 import logging
17 import Queue
18 # Global config options
19 from config import config
20 config = config()
21 # daemonize and *pid
22 from util.process import * 
23
24 # Comon DB
25 import comon
26 # RT tickets
27 import rt
28 # Correlates input with policy to form actions
29 import policy
30 import soltesz
31 import plc
32
33 # Log to what 
34 LOG="./monitor.log"
35
36 # DAT
37 DAT="./monitor.dat"
38
39 # Email defaults
40 MTA="localhost"
41 FROM="support@planet-lab.org"
42 TECHEMAIL="tech-%s@sites.planet-lab.org"
43 PIEMAIL="pi-%s@sites.planet-lab.org"
44
45 # API
46 XMLRPC_SERVER = 'https://www.planet-lab.org/PLCAPI/'
47
48 # Time between comon refresh
49 COSLEEP=300 #5mins
50 # Time to refresh DB and remove unused entries
51 RTSLEEP=7200 #2hrs
52 # Time between policy enforce/update
53 #POLSLEEP=43200 #12hrs
54 POLSLEEP=10
55
56 # Global list of all running threads.  Any threads added to 
57 # list will be monitored.
58 runningthreads = {}
59 # Seconds between checking threads
60 WATCHSLEEP = 10
61  
62 # Set up Logging
63 logger = logging.getLogger("monitor")
64 logger.setLevel(logging.DEBUG)
65 fh = logging.FileHandler(LOG, mode = 'a')
66 fh.setLevel(logging.DEBUG)
67 formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
68 fh.setFormatter(formatter)
69 logger.addHandler(fh)
70
71
72 """
73 Launches threads and adds them to the runningthreads global list.
74 Assigns name for thread, starts.
75 """
76 def startThread(fnct, name):
77                 runningthreads[name] = fnct
78                 runningthreads[name].setName(name)
79                 try:
80                         logger.info("Starting thread " + name)
81                         runningthreads[name].start()
82                 except Exception, err:
83                         logger.error("Thread: " + name + " " + error)
84
85
86 """
87 Watches threads and catches exceptions.  Each launched thread is
88 watched and state is logged.
89 """
90 class ThreadWatcher(Thread):
91         def __init__(self):
92                 Thread.__init__(self)
93
94         def run(self):
95                 while 1:
96                         self.checkThreads()
97                         time.sleep(WATCHSLEEP)
98
99         def checkThreads(self):
100                 # Iterate through treads, compare with last running.
101                 for thread in runningthreads.keys():
102                         # If thread found dead, remove from queue
103                         #print "found %s" % thread
104                         if not runningthreads[thread].isAlive():
105                                 logger.error("***********Thread died: %s**********" %(thread))
106                                 del runningthreads[thread]
107                 return len(runningthreads.keys())
108
109
110 class Dummy(Thread):
111         def __init__(self):
112                 Thread.__init__(self)
113
114         def run(self):
115                 time.sleep(5)
116
117 def preComon(l_nodes, toCheck):
118         for host in l_nodes:
119                 diag_node = {}
120                 diag_node['nodename'] = host
121                 diag_node['message'] = None
122                 diag_node['bucket'] = ["dbg"]
123                 diag_node['stage'] = ""
124                 diag_node['args'] = None
125                 diag_node['info'] = None
126                 diag_node['time'] = time.time()
127                 toCheck.put(diag_node)
128         return 
129
130 def dict_from_nodelist(nl):
131         d = {}
132         for host in nl:
133                 h = host['hostname']
134                 d[h] = host
135         return d
136
137 """
138 Start threads, do some housekeeping, then daemonize.
139 """
140 def main():
141         # Defaults
142         global status, logger
143
144         #if not debug:
145         #       daemonize()
146         #       writepid("monitor")
147
148         logger.info('Monitor Started')
149
150         ##########  VARIABLES   ########################################
151         # Nodes to check. Queue of all sick nodes.
152         toCheck = Queue.Queue()
153         # Nodes that are sick w/o tickets
154         sickNoTicket = Queue.Queue()
155         # Comon DB of all nodes
156         cdb = {}
157         # RT DB
158         tickets = {}
159         # Nodes we've emailed.
160         # host - > (type of email, time)
161         emailed = {}
162
163         #########  GET NODES    ########################################
164         # TODO: get authoritative node list from PLC every PLCSLEEP seconds,
165         #               feed this into Comon.
166
167         # List of nodes from a user-provided file.
168         if config.userlist:
169                 file = config.userlist
170                 nodelist = config.getListFromFile(file)
171                 l_nodes = []
172                 print "Getting node info for hosts in: %s" % file
173                 for nodename in nodelist:
174                         if config.debug: print ".", ; sys.stdout.flush()
175                         l_nodes += plc.getNodes({'hostname': nodename})
176                 print ""
177         else:
178                 # Authoritative list of nodes from PLC
179                 l_nodes = soltesz.if_cached_else(config.cachenodes, "l_nodes", plc.getNodes)
180
181         # Minus blacklisted ones..
182         l_blacklist = soltesz.if_cached_else(1, "l_blacklist", lambda : [])
183         l_ticket_blacklist = soltesz.if_cached_else(1,"l_ticket_blacklist",lambda : [])
184         l_wl_nodes  = filter(lambda x : not x['hostname'] in l_blacklist, l_nodes)
185         # A handy dict of hostname-to-nodestruct mapping
186         d_allplc_nodes = dict_from_nodelist(l_wl_nodes)
187
188         #######  RT tickets    #########################################
189         t = soltesz.MyTimer()
190         ad_dbTickets = soltesz.if_cached_else(config.cachert, "ad_dbTickets", rt.rt_tickets)
191         print "Getting tickets from RT took: %f sec" % t.diff() ; del t
192
193         if os.path.isfile("precomon.txt"): 
194                 nodelist = config.getListFromFile("precomon.txt")
195                 print "PreComon node info"
196                 preComon(nodelist, toCheck)
197                 for nodename in nodelist:
198                         # TODO: temporary hack.
199                         if nodename not in d_allplc_nodes:
200                                 d_allplc_nodes[nodename] = {}
201
202         # TODO: Refreshes Comon data every COSLEEP seconds
203         cm1 = comon.Comon(cdb, d_allplc_nodes, toCheck)
204         startThread(cm1,"comon")
205
206         # TODO: make queues event based, not node based. 
207         # From the RT db, add hosts to q(toCheck) for filtering the comon nodes.
208         rt1 = rt.RT(ad_dbTickets, tickets, toCheck, sickNoTicket, l_ticket_blacklist)
209         #       Kind of a hack. Cleans the DB for stale entries and updates db.
210         #   (UNTESTED)
211         #       rt5 = rt.RT(ad_dbTickets, tickets, toCheck, sickNoTicket)
212         #       clean = Thread(target=rt5.cleanTickets)
213
214         startThread(rt1,"rt1")
215         #       startThread(rt5,"rt5")
216         #       startThread(clean,"cleanrt5")
217
218         # Actually digest the info and do something with it.
219         pol = policy.Policy(cm1, sickNoTicket, emailed)
220         # Start Sending Emails
221         startThread(pol, "policy")
222
223
224         tw = ThreadWatcher()
225         while True:
226                 if tw.checkThreads() == 0:
227                         break
228                 time.sleep(WATCHSLEEP)
229
230         logger.info('Monitor Exitting')
231         #if not debug:
232         #       removepid("monitor")
233
234         # Store state of emails
235         #pol.emailedStore("WRITE")
236         soltesz.dbDump("l_blacklist")
237         soltesz.dbDump("ad_dbTickets")
238         sys.exit(0)
239         
240 if __name__ == '__main__':
241         try:
242                 main()
243         except KeyboardInterrupt:
244                 print "Killed.  Exitting."
245                 logger.info('Monitor Killed')
246                 #soltesz.dbDump("l_blacklist")
247                 #soltesz.dbDump("ad_dbTickets")
248                 sys.exit(0)