11 from threading import *
14 # TODO: merge the RT mailer from mailer.py into this file.
17 logger = logging.getLogger("monitor")
19 # seconds between ticket update
22 def stripQuotes( str ):
26 if str[len(str)-1] in quotes:
31 def readConstantsFile( file_path ):
33 read a file consisting of lines of
37 and return a dictionary of the values.
39 blank lines, and lines starting with # (comments) are skipped
45 input_file= file(file_path,"r")
49 for line in input_file:
52 line= string.strip(line)
56 parts= string.split(line,"=",)
60 contents[parts[0]]= stripQuotes(parts[1])
68 # read plc database passwords and connect
69 #rt_db_constants= readConstantsFile(RT_DB_CONSTANTS_PATH)
70 #if rt_db_constants is None:
71 # print "Unable to read database access constants from %s" % \
72 # RT_DB_CONSTANTS_PATH
76 rt_db = MySQLdb.connect(host=config.RT_DB_HOST,
77 user=config.RT_DB_USER,
78 passwd=config.RT_DB_PASSWORD,
80 except Exception, err:
81 print "Failed to connect to RT database: %s" %err
88 def fetch_from_db(db, sql):
90 # create a 'cursor' (required by MySQLdb)
93 except Exception, err:
94 print "Could not execute RT query %s" %err
97 # fetch all rows (list of lists)
106 # sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject
108 # JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
109 # JOIN Attachments AS At ON Tr.id=At.TransactionID
110 # WHERE (At.Content LIKE '%%%s%%' OR
111 # At.Subject LIKE '%%%s%%') AND
112 # (Tk.Status = 'new' OR Tk.Status = 'open') AND
113 # Tk.Queue = 3 OR Tk.Queue = 19
114 # ORDER BY Tk.Status, Tk.LastUpdated DESC""" \
115 # % (hostname,hostname)
116 # sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject
118 # JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
119 # JOIN Attachments AS At ON Tr.id=At.TransactionID
120 # WHERE (At.Content LIKE '%%%s%%' OR
121 # At.Subject LIKE '%%%s%%') AND
122 # (Tk.Status = 'new' OR Tk.Status = 'open')
123 # ORDER BY Tk.Status, Tk.LastUpdated DESC""" \
124 # % (hostname,hostname)
126 # Queue == 10 is the spam Queue in RT.
127 # SELECT Tk.* FROM Tickets AS Tk, Attachments AS At JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId WHERE Tk.Queue != 10 AND Tk.id > 10000 AND Tr.id=At.TransactionID AND Tk.Status = 'open' ;
130 sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject, At.Content
131 FROM Tickets AS Tk, Attachments AS At
132 JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
133 WHERE Tk.Queue != 10 AND Tk.id > 10000 AND
134 Tr.id=At.TransactionID AND Tk.Status = 'open'"""
135 #Tr.id=At.TransactionID AND (Tk.Status = 'new' OR Tk.Status = 'open')"""
136 #sqlall = """SELECT distinct Tk.id, Tk.Status, Tk.Subject, At.Content
137 #FROM Tickets AS Tk, Attachments AS At
138 #JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
139 #WHERE Tk.Queue != 10 AND Tk.id > 10000 AND
140 #Tr.id=At.TransactionID AND ( Tk.Status = 'open' OR
141 #Tk.Status = 'new') """
142 sqlall = """SELECT distinct Tk.id, Tk.Status, Tk.Subject, At.Content, Us.EmailAddress, Tk.LastUpdated, Q.Name, Tk.Owner FROM Tickets AS Tk, Attachments AS At, Queues as Q, Users as Us JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId WHERE (Tk.Queue=3 OR Tk.Queue=22) AND Tk.id > 10000 AND Tr.id=At.TransactionID AND ( Tk.Status = 'open' OR Tk.Status = 'new') AND Us.id=Tk.LastUpdatedBy AND Q.id=Tk.Queue """
145 raw = fetch_from_db(db, sql)
148 tickets = map(lambda x: {"ticket_id":str(x[0]),
151 "content":str(x[3])},
154 raw = fetch_from_db(db,sqlall)
157 tickets_all = map(lambda x: {"ticket_id":str(x[0]),
162 "lastupdated":str(x[5]),
171 for t in tickets_all:
172 idTickets[t['ticket_id']] = t
173 database.dbDump("idTickets", idTickets)
177 def is_host_in_rt_tickets(host, ticket_blacklist, ad_rt_tickets):
178 # ad_rt_tickets is an array of dicts, defined above.
179 if len(ad_rt_tickets) == 0:
182 d_ticket = ad_rt_tickets[0]
183 if not ('ticket_id' in d_ticket and 'status' in d_ticket and
184 'subj' in d_ticket and 'content' in d_ticket):
185 logger.debug("RT_tickets array has wrong fields!!!")
188 #logger.debug("Searching all tickets for %s" % host)
189 def search_tickets(host, ad_rt_tickets):
190 # compile once for more efficiency
191 re_host = re.compile(host)
192 for x in ad_rt_tickets:
193 if re_host.search(x['subj'], re.MULTILINE|re.IGNORECASE) or \
194 re_host.search(x['content'], re.MULTILINE|re.IGNORECASE):
195 logger.debug("\t ticket %s has %s" % (x['ticket_id'], host))
196 print "\t ticket %s has %s" % (x['ticket_id'], host)
197 if x['ticket_id'] in ticket_blacklist:
201 print "\t noticket -- has %s" % host
202 #logger.debug("\t noticket -- has %s" % host)
205 # This search, while O(tickets), takes less than a millisecond, 05-25-07
206 #t = commands.MyTimer()
207 ret = search_tickets(host, ad_rt_tickets)
214 Finds tickets associated with hostnames.
215 The idea is if you give it an array of host names,
216 presumeably from comon's list of bad nodes, it starts
217 a few threads to query RT. RT takes a while to return.
219 This is turning into a reinvention of DB design, which I dont believe in.
220 In an effort to keep things minimal, here's the basic algo:
222 Give list of hostnames to RT()
223 Finds tickets associate with new hostnames (not in dict(tickets)).
224 Remove nodes that have come backup. Don't care of ticket is closed after first query.
225 Another thread refresh tickets of nodes already in dict and remove nodes that have come up.
228 def __init__(self, dbTickets, q_toRT, q_fromRT, l_ticket_blacklist, target = None):
229 # Time of last update of ticket DB
230 self.dbTickets = dbTickets
232 self.l_ticket_blacklist = l_ticket_blacklist
234 self.q_fromRT = q_fromRT
236 Thread.__init__(self,target = self.getTickets)
238 # Takes node from q_toRT, gets tickets.
239 # Thread that actually gets the tickets.
240 def getTickets(self):
243 diag_node = self.q_toRT.get(block = True)
244 if diag_node != None:
245 host = diag_node['nodename']
246 (b_host_inticket, r_ticket) = is_host_in_rt_tickets(host, \
247 self.l_ticket_blacklist, \
249 diag_node['found_rt_ticket'] = None
251 logger.debug("RT: found tickets for %s" %host)
252 diag_node['found_rt_ticket'] = r_ticket['ticket_id']
255 if r_ticket is not None:
256 print "Ignoring ticket %s" % r_ticket['ticket_id']
257 # TODO: why do i return the ticket id for a
258 # blacklisted ticket id?
259 #diag_node['found_rt_ticket'] = r_ticket['ticket_id']
260 self.count = self.count + 1
262 self.q_fromRT.put(diag_node)
264 print "RT processed %d nodes with noticket" % self.count
265 logger.debug("RT filtered %d noticket nodes" % self.count)
266 self.q_fromRT.put(None)
270 # Removes hosts that are no longer down.
271 def remTickets(self):
272 logger.debug("Removing stale entries from DB.")
273 prevdown = self.tickets.keys()
276 ##BEGIN HACK. This should be outside of this file. passed to class.
277 #cmn = comon.Comon(None, None)
279 #for bucket in cmn.comonbkts.keys():
280 # for host in getattr(cmn,bucket):
281 # if host not in currdown: currdown.append(host)
284 # Actually do the comparison
285 #for host in prevdown:
286 # if host not in currdown:
287 # del self.tickets[host]
288 # logger.info("RT: %s no longer down." % host)
291 def updateTickets(self):
292 logger.info("Refreshing DB.")
293 for host in self.tickets.keys():
294 # Put back in Q to refresh
295 self.q_toRT.put(host)
297 def cleanTickets(self):
304 logger.setLevel(logging.DEBUG)
305 ch = logging.StreamHandler()
306 ch.setLevel(logging.DEBUG)
307 formatter = logging.Formatter('%(message)s')
308 ch.setFormatter(formatter)
309 logger.addHandler(ch)
311 tickets = rt_tickets()
312 database.dbDump("ad_dbTickets", tickets)
315 if __name__ == '__main__':