10 from threading import *
12 from monitor import config
13 from monitor import database
15 # TODO: merge the RT mailer from mailer.py into this file.
18 logger = logging.getLogger("monitor")
20 # seconds between ticket update
23 def stripQuotes( str ):
27 if str[len(str)-1] in quotes:
32 def readConstantsFile( file_path ):
34 read a file consisting of lines of
38 and return a dictionary of the values.
40 blank lines, and lines starting with # (comments) are skipped
46 input_file= file(file_path,"r")
50 for line in input_file:
53 line= string.strip(line)
57 parts= string.split(line,"=",)
61 contents[parts[0]]= stripQuotes(parts[1])
69 # read plc database passwords and connect
70 #rt_db_constants= readConstantsFile(RT_DB_CONSTANTS_PATH)
71 #if rt_db_constants is None:
72 # print "Unable to read database access constants from %s" % \
73 # RT_DB_CONSTANTS_PATH
77 rt_db = MySQLdb.connect(host=config.RT_DB_HOST,
78 user=config.RT_DB_USER,
79 passwd=config.RT_DB_PASSWORD,
81 except Exception, err:
82 print "Failed to connect to RT database: %s" %err
89 def fetch_from_db(db, sql):
91 # create a 'cursor' (required by MySQLdb)
94 except Exception, err:
95 print "Could not execute RT query %s" %err
98 # fetch all rows (list of lists)
107 # sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject
109 # JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
110 # JOIN Attachments AS At ON Tr.id=At.TransactionID
111 # WHERE (At.Content LIKE '%%%s%%' OR
112 # At.Subject LIKE '%%%s%%') AND
113 # (Tk.Status = 'new' OR Tk.Status = 'open') AND
114 # Tk.Queue = 3 OR Tk.Queue = 19
115 # ORDER BY Tk.Status, Tk.LastUpdated DESC""" \
116 # % (hostname,hostname)
117 # sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject
119 # JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
120 # JOIN Attachments AS At ON Tr.id=At.TransactionID
121 # WHERE (At.Content LIKE '%%%s%%' OR
122 # At.Subject LIKE '%%%s%%') AND
123 # (Tk.Status = 'new' OR Tk.Status = 'open')
124 # ORDER BY Tk.Status, Tk.LastUpdated DESC""" \
125 # % (hostname,hostname)
127 # Queue == 10 is the spam Queue in RT.
128 # SELECT Tk.* FROM Tickets AS Tk, Attachments AS At JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId WHERE Tk.Queue != 10 AND Tk.id > 10000 AND Tr.id=At.TransactionID AND Tk.Status = 'open' ;
131 sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject, At.Content
132 FROM Tickets AS Tk, Attachments AS At
133 JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
134 WHERE Tk.Queue != 10 AND Tk.id > 10000 AND
135 Tr.id=At.TransactionID AND Tk.Status = 'open'"""
136 #Tr.id=At.TransactionID AND (Tk.Status = 'new' OR Tk.Status = 'open')"""
137 #sqlall = """SELECT distinct Tk.id, Tk.Status, Tk.Subject, At.Content
138 #FROM Tickets AS Tk, Attachments AS At
139 #JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
140 #WHERE Tk.Queue != 10 AND Tk.id > 10000 AND
141 #Tr.id=At.TransactionID AND ( Tk.Status = 'open' OR
142 #Tk.Status = 'new') """
143 sqlall = """SELECT distinct Tk.id, Tk.Status, Tk.Subject, At.Content, Us.EmailAddress, Tk.LastUpdated, Q.Name, Tk.Owner FROM Tickets AS Tk, Attachments AS At, Queues as Q, Users as Us JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId WHERE (Tk.Queue=3 OR Tk.Queue=22) AND Tk.id > 10000 AND Tr.id=At.TransactionID AND ( Tk.Status = 'open' OR Tk.Status = 'new') AND Us.id=Tk.LastUpdatedBy AND Q.id=Tk.Queue """
146 raw = fetch_from_db(db, sql)
149 tickets = map(lambda x: {"ticket_id":str(x[0]),
152 "content":str(x[3])},
155 raw = fetch_from_db(db,sqlall)
158 tickets_all = map(lambda x: {"ticket_id":str(x[0]),
163 "lastupdated":str(x[5]),
172 for t in tickets_all:
173 idTickets[t['ticket_id']] = t
174 database.dbDump("idTickets", idTickets)
178 def is_host_in_rt_tickets(host, ticket_blacklist, ad_rt_tickets):
179 # ad_rt_tickets is an array of dicts, defined above.
180 if len(ad_rt_tickets) == 0:
183 d_ticket = ad_rt_tickets[0]
184 if not ('ticket_id' in d_ticket and 'status' in d_ticket and
185 'subj' in d_ticket and 'content' in d_ticket):
186 logger.debug("RT_tickets array has wrong fields!!!")
189 #logger.debug("Searching all tickets for %s" % host)
190 def search_tickets(host, ad_rt_tickets):
191 # compile once for more efficiency
192 re_host = re.compile(host)
193 for x in ad_rt_tickets:
194 if re_host.search(x['subj'], re.MULTILINE|re.IGNORECASE) or \
195 re_host.search(x['content'], re.MULTILINE|re.IGNORECASE):
196 logger.debug("\t ticket %s has %s" % (x['ticket_id'], host))
197 print "\t ticket %s has %s" % (x['ticket_id'], host)
198 if x['ticket_id'] in ticket_blacklist:
202 print "\t noticket -- has %s" % host
203 #logger.debug("\t noticket -- has %s" % host)
206 # This search, while O(tickets), takes less than a millisecond, 05-25-07
207 #t = commands.MyTimer()
208 ret = search_tickets(host, ad_rt_tickets)
215 Finds tickets associated with hostnames.
216 The idea is if you give it an array of host names,
217 presumeably from comon's list of bad nodes, it starts
218 a few threads to query RT. RT takes a while to return.
220 This is turning into a reinvention of DB design, which I dont believe in.
221 In an effort to keep things minimal, here's the basic algo:
223 Give list of hostnames to RT()
224 Finds tickets associate with new hostnames (not in dict(tickets)).
225 Remove nodes that have come backup. Don't care of ticket is closed after first query.
226 Another thread refresh tickets of nodes already in dict and remove nodes that have come up.
229 def __init__(self, dbTickets, q_toRT, q_fromRT, l_ticket_blacklist, target = None):
230 # Time of last update of ticket DB
231 self.dbTickets = dbTickets
233 self.l_ticket_blacklist = l_ticket_blacklist
235 self.q_fromRT = q_fromRT
237 Thread.__init__(self,target = self.getTickets)
239 # Takes node from q_toRT, gets tickets.
240 # Thread that actually gets the tickets.
241 def getTickets(self):
244 diag_node = self.q_toRT.get(block = True)
245 if diag_node != None:
246 host = diag_node['nodename']
247 (b_host_inticket, r_ticket) = is_host_in_rt_tickets(host, \
248 self.l_ticket_blacklist, \
250 diag_node['found_rt_ticket'] = None
252 logger.debug("RT: found tickets for %s" %host)
253 diag_node['found_rt_ticket'] = r_ticket['ticket_id']
256 if r_ticket is not None:
257 print "Ignoring ticket %s" % r_ticket['ticket_id']
258 # TODO: why do i return the ticket id for a
259 # blacklisted ticket id?
260 #diag_node['found_rt_ticket'] = r_ticket['ticket_id']
261 self.count = self.count + 1
263 self.q_fromRT.put(diag_node)
265 print "RT processed %d nodes with noticket" % self.count
266 logger.debug("RT filtered %d noticket nodes" % self.count)
267 self.q_fromRT.put(None)
271 # Removes hosts that are no longer down.
272 def remTickets(self):
273 logger.debug("Removing stale entries from DB.")
274 prevdown = self.tickets.keys()
277 ##BEGIN HACK. This should be outside of this file. passed to class.
278 #cmn = comon.Comon(None, None)
280 #for bucket in cmn.comonbkts.keys():
281 # for host in getattr(cmn,bucket):
282 # if host not in currdown: currdown.append(host)
285 # Actually do the comparison
286 #for host in prevdown:
287 # if host not in currdown:
288 # del self.tickets[host]
289 # logger.info("RT: %s no longer down." % host)
292 def updateTickets(self):
293 logger.info("Refreshing DB.")
294 for host in self.tickets.keys():
295 # Put back in Q to refresh
296 self.q_toRT.put(host)
298 def cleanTickets(self):
305 logger.setLevel(logging.DEBUG)
306 ch = logging.StreamHandler()
307 ch.setLevel(logging.DEBUG)
308 formatter = logging.Formatter('%(message)s')
309 ch.setFormatter(formatter)
310 logger.addHandler(ch)
312 tickets = rt_tickets()
313 database.dbDump("ad_dbTickets", tickets)
316 if __name__ == '__main__':