12 from threading import *
14 # TODO: merge the RT mailer from mailer.py into this file.
16 # RT database access constants file
17 RT_DB_CONSTANTS_PATH='rt_db'
20 logger = logging.getLogger("monitor")
22 # seconds between ticket update
25 def stripQuotes( str ):
29 if str[len(str)-1] in quotes:
34 def readConstantsFile( file_path ):
36 read a file consisting of lines of
40 and return a dictionary of the values.
42 blank lines, and lines starting with # (comments) are skipped
48 input_file= file(file_path,"r")
52 for line in input_file:
55 line= string.strip(line)
59 parts= string.split(line,"=",)
63 contents[parts[0]]= stripQuotes(parts[1])
71 # read plc database passwords and connect
72 rt_db_constants= readConstantsFile(RT_DB_CONSTANTS_PATH)
73 if rt_db_constants is None:
74 print "Unable to read database access constants from %s" % \
79 rt_db = MySQLdb.connect(host=rt_db_constants['RT_DB_HOST'],
80 user=rt_db_constants['RT_DB_USER'],
81 passwd=rt_db_constants['RT_DB_PASSWORD'],
82 db=rt_db_constants['RT_DB_NAME'])
83 except Exception, err:
84 print "Failed to connect to RT database: %s" %err
96 # sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject
98 # JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
99 # JOIN Attachments AS At ON Tr.id=At.TransactionID
100 # WHERE (At.Content LIKE '%%%s%%' OR
101 # At.Subject LIKE '%%%s%%') AND
102 # (Tk.Status = 'new' OR Tk.Status = 'open') AND
103 # Tk.Queue = 3 OR Tk.Queue = 19
104 # ORDER BY Tk.Status, Tk.LastUpdated DESC""" \
105 # % (hostname,hostname)
106 # sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject
108 # JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
109 # JOIN Attachments AS At ON Tr.id=At.TransactionID
110 # WHERE (At.Content LIKE '%%%s%%' OR
111 # At.Subject LIKE '%%%s%%') AND
112 # (Tk.Status = 'new' OR Tk.Status = 'open')
113 # ORDER BY Tk.Status, Tk.LastUpdated DESC""" \
114 # % (hostname,hostname)
116 # Queue == 10 is the spam Queue in RT.
117 sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject, At.Content
118 FROM Tickets AS Tk, Attachments AS At
119 JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
120 WHERE Tk.Queue != 10 AND Tk.id > 10000 AND
121 Tr.id=At.TransactionID AND Tk.Status = 'open'"""
122 #Tr.id=At.TransactionID AND (Tk.Status = 'new' OR Tk.Status = 'open')"""
125 # create a 'cursor' (required by MySQLdb)
128 except Exception, err:
129 print "Could not execute RT query %s" %err
132 # fetch all rows (list of lists)
135 # map list of lists (raw) to list of dicts (tickets)
136 # when int gets pulls from SQL into python ints are converted to LONG to
137 # prevent overflow .. convert back
138 #tickets = map(lambda x: {"ticket_id":int(x[0]),
139 tickets = map(lambda x: {"ticket_id":str(x[0]),
142 "content":str(x[3])},
148 def is_host_in_rt_tickets(host, ticket_blacklist, ad_rt_tickets):
149 # ad_rt_tickets is an array of dicts, defined above.
150 if len(ad_rt_tickets) == 0:
153 d_ticket = ad_rt_tickets[0]
154 if not ('ticket_id' in d_ticket and 'status' in d_ticket and
155 'subj' in d_ticket and 'content' in d_ticket):
156 logger.debug("RT_tickets array has wrong fields!!!")
159 #logger.debug("Searching all tickets for %s" % host)
160 def search_tickets(host, ad_rt_tickets):
161 # compile once for more efficiency
162 re_host = re.compile(host)
163 for x in ad_rt_tickets:
164 if re_host.search(x['subj'], re.MULTILINE|re.IGNORECASE) or \
165 re_host.search(x['content'], re.MULTILINE|re.IGNORECASE):
166 logger.debug("\t ticket %s has %s" % (x['ticket_id'], host))
167 print "\t ticket %s has %s" % (x['ticket_id'], host)
168 if x['ticket_id'] in ticket_blacklist:
172 print "\t noticket -- has %s" % host
173 #logger.debug("\t noticket -- has %s" % host)
176 # This search, while O(tickets), takes less than a millisecond, 05-25-07
177 #t = soltesz.MyTimer()
178 ret = search_tickets(host, ad_rt_tickets)
185 Finds tickets associated with hostnames.
186 The idea is if you give it an array of host names,
187 presumeably from comon's list of bad nodes, it starts
188 a few threads to query RT. RT takes a while to return.
190 This is turning into a reinvention of DB design, which I dont believe in.
191 In an effort to keep things minimal, here's the basic algo:
193 Give list of hostnames to RT()
194 Finds tickets associate with new hostnames (not in dict(tickets)).
195 Remove nodes that have come backup. Don't care of ticket is closed after first query.
196 Another thread refresh tickets of nodes already in dict and remove nodes that have come up.
199 def __init__(self, dbTickets, q_toRT, q_fromRT, l_ticket_blacklist, target = None):
200 # Time of last update of ticket DB
201 self.dbTickets = dbTickets
203 self.l_ticket_blacklist = l_ticket_blacklist
205 self.q_fromRT = q_fromRT
207 Thread.__init__(self,target = self.getTickets)
209 # Takes node from q_toRT, gets tickets.
210 # Thread that actually gets the tickets.
211 def getTickets(self):
214 diag_node = self.q_toRT.get(block = True)
215 if diag_node != None:
216 host = diag_node['nodename']
217 (b_host_inticket, r_ticket) = is_host_in_rt_tickets(host, \
218 self.l_ticket_blacklist, \
220 diag_node['found_rt_ticket'] = None
222 logger.debug("RT: found tickets for %s" %host)
223 diag_node['found_rt_ticket'] = r_ticket['ticket_id']
226 if r_ticket is not None:
227 print "Ignoring ticket %s" % r_ticket['ticket_id']
228 # TODO: why do i return the ticket id for a
229 # blacklisted ticket id?
230 #diag_node['found_rt_ticket'] = r_ticket['ticket_id']
231 self.count = self.count + 1
233 self.q_fromRT.put(diag_node)
235 print "RT processed %d nodes with noticket" % self.count
236 logger.debug("RT filtered %d noticket nodes" % self.count)
237 self.q_fromRT.put(None)
241 # Removes hosts that are no longer down.
242 def remTickets(self):
243 logger.debug("Removing stale entries from DB.")
244 prevdown = self.tickets.keys()
247 ##BEGIN HACK. This should be outside of this file. passed to class.
248 #cmn = comon.Comon(None, None)
250 #for bucket in cmn.comonbkts.keys():
251 # for host in getattr(cmn,bucket):
252 # if host not in currdown: currdown.append(host)
255 # Actually do the comparison
256 #for host in prevdown:
257 # if host not in currdown:
258 # del self.tickets[host]
259 # logger.info("RT: %s no longer down." % host)
262 def updateTickets(self):
263 logger.info("Refreshing DB.")
264 for host in self.tickets.keys():
265 # Put back in Q to refresh
266 self.q_toRT.put(host)
268 def cleanTickets(self):
275 logger.setLevel(logging.DEBUG)
276 ch = logging.StreamHandler()
277 ch.setLevel(logging.DEBUG)
278 formatter = logging.Formatter('%(message)s')
279 ch.setFormatter(formatter)
280 logger.addHandler(ch)
282 bucket = Queue.Queue()
284 a = RT(tickets, bucket)
285 b = RT(tickets, bucket)
286 c = RT(tickets, bucket)
287 d = RT(tickets, bucket)
288 e = RT(tickets, bucket)
293 tmp = ('planetlab-1.cs.ucy.ac.cy','planetlab-2.vuse.vanderbilt.edu', 'planetlab-11.cs.princeton.edu', 'planet03.csc.ncsu.edu', 'planetlab1.pop-rj.rnp.br', 'planet1.halifax.canet4.nodes.planet-lab.org', 'planet1.cavite.nodes.planet-lab.org', 'ds-pl3.technion.ac.il', 'planetlab2.cs.purdue.edu', 'planetlab3.millennium.berkeley.edu', 'planetlab1.unl.edu', 'planetlab1.cs.colorado.edu', 'planetlab02.cs.washington.edu', 'orbpl2.rutgers.edu', 'planetlab2.informatik.uni-erlangen.de', 'pl2.ernet.in', 'neu2.6planetlab.edu.cn', 'planetlab-2.cs.uni-paderborn.de', 'planetlab1.elet.polimi.it', 'planetlab2.iiitb.ac.in', 'server1.planetlab.iit-tech.net', 'planetlab2.iitb.ac.in', 'planetlab1.ece.ucdavis.edu', 'planetlab02.dis.unina.it', 'planetlab-1.dis.uniroma1.it', 'planetlab1.iitb.ac.in', 'pku1.6planetlab.edu.cn', 'planetlab1.warsaw.rd.tp.pl', 'planetlab2.cs.unc.edu', 'csu2.6planetlab.edu.cn', 'pl1.ernet.in', 'planetlab2.georgetown.edu', 'planetlab1.cs.uchicago.edu')
296 #et = Thread(target=e.pushHosts)
304 #cmn = comon.Comon(cdb, bucket)
306 #for bucket in cmn.comonbkts.keys():
307 # for host in getattr(cmn,bucket):
310 at = Thread(target=a.cleanTickets)
316 if __name__ == '__main__':