12 from threading import *
14 # TODO: merge the RT mailer from mailer.py into this file.
16 # RT database access constants file
17 RT_DB_CONSTANTS_PATH='/etc/planetlab/rt_db'
20 logger = logging.getLogger("monitor")
22 # seconds between ticket update
25 def stripQuotes( str ):
29 if str[len(str)-1] in quotes:
34 def readConstantsFile( file_path ):
36 read a file consisting of lines of
40 and return a dictionary of the values.
42 blank lines, and lines starting with # (comments) are skipped
48 input_file= file(file_path,"r")
52 for line in input_file:
55 line= string.strip(line)
59 parts= string.split(line,"=",)
63 contents[parts[0]]= stripQuotes(parts[1])
71 # read plc database passwords and connect
72 rt_db_constants= readConstantsFile(RT_DB_CONSTANTS_PATH)
73 if rt_db_constants is None:
74 print "Unable to read database access constants from %s" % \
79 rt_db = MySQLdb.connect(host=rt_db_constants['RT_DB_HOST'],
80 user=rt_db_constants['RT_DB_USER'],
81 passwd=rt_db_constants['RT_DB_PASSWORD'],
82 db=rt_db_constants['RT_DB_NAME'])
83 except Exception, err:
84 print "Failed to connect to RT database: %s" %err
94 # sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject
96 # JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
97 # JOIN Attachments AS At ON Tr.id=At.TransactionID
98 # WHERE (At.Content LIKE '%%%s%%' OR
99 # At.Subject LIKE '%%%s%%') AND
100 # (Tk.Status = 'new' OR Tk.Status = 'open') AND
101 # Tk.Queue = 3 OR Tk.Queue = 19
102 # ORDER BY Tk.Status, Tk.LastUpdated DESC""" \
103 # % (hostname,hostname)
104 # sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject
106 # JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
107 # JOIN Attachments AS At ON Tr.id=At.TransactionID
108 # WHERE (At.Content LIKE '%%%s%%' OR
109 # At.Subject LIKE '%%%s%%') AND
110 # (Tk.Status = 'new' OR Tk.Status = 'open')
111 # ORDER BY Tk.Status, Tk.LastUpdated DESC""" \
112 # % (hostname,hostname)
114 # Queue == 10 is the spam Queue in RT.
115 sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject, At.Content
116 FROM Tickets AS Tk, Attachments AS At
117 JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
118 WHERE Tk.Queue != 10 AND Tk.id > 10000 AND
119 Tr.id=At.TransactionID AND Tk.Status = 'open'"""
120 #Tr.id=At.TransactionID AND (Tk.Status = 'new' OR Tk.Status = 'open')"""
123 # create a 'cursor' (required by MySQLdb)
126 except Exception, err:
127 print "Could not execute RT query %s" %err
130 # fetch all rows (list of lists)
133 # map list of lists (raw) to list of dicts (tickets)
134 # when int gets pulls from SQL into python ints are converted to LONG to
135 # prevent overflow .. convert back
136 #tickets = map(lambda x: {"ticket_id":int(x[0]),
137 tickets = map(lambda x: {"ticket_id":str(x[0]),
140 "content":str(x[3])},
146 def is_host_in_rt_tickets(host, ticket_blacklist, ad_rt_tickets):
147 # ad_rt_tickets is an array of dicts, defined above.
148 if len(ad_rt_tickets) == 0:
151 d_ticket = ad_rt_tickets[0]
152 if not ('ticket_id' in d_ticket and 'status' in d_ticket and
153 'subj' in d_ticket and 'content' in d_ticket):
154 logger.debug("RT_tickets array has wrong fields!!!")
157 #logger.debug("Searching all tickets for %s" % host)
158 def search_tickets(host, ad_rt_tickets):
159 # compile once for more efficiency
160 re_host = re.compile(host)
161 for x in ad_rt_tickets:
162 if re_host.search(x['subj'], re.MULTILINE|re.IGNORECASE) or \
163 re_host.search(x['content'], re.MULTILINE|re.IGNORECASE):
164 logger.debug("\t ticket %s has %s" % (x['ticket_id'], host))
165 print "\t ticket %s has %s" % (x['ticket_id'], host)
166 if x['ticket_id'] in ticket_blacklist:
170 print "\t noticket -- has %s" % host
171 #logger.debug("\t noticket -- has %s" % host)
174 # This search, while O(tickets), takes less than a millisecond, 05-25-07
175 #t = soltesz.MyTimer()
176 ret = search_tickets(host, ad_rt_tickets)
183 Finds tickets associated with hostnames.
184 The idea is if you give it an array of host names,
185 presumeably from comon's list of bad nodes, it starts
186 a few threads to query RT. RT takes a while to return.
188 This is turning into a reinvention of DB design, which I dont believe in.
189 In an effort to keep things minimal, here's the basic algo:
191 Give list of hostnames to RT()
192 Finds tickets associate with new hostnames (not in dict(tickets)).
193 Remove nodes that have come backup. Don't care of ticket is closed after first query.
194 Another thread refresh tickets of nodes already in dict and remove nodes that have come up.
197 def __init__(self, dbTickets, q_toRT, q_fromRT, l_ticket_blacklist, target = None):
198 # Time of last update of ticket DB
199 self.dbTickets = dbTickets
201 self.l_ticket_blacklist = l_ticket_blacklist
203 self.q_fromRT = q_fromRT
205 Thread.__init__(self,target = self.getTickets)
207 # Takes node from q_toRT, gets tickets.
208 # Thread that actually gets the tickets.
209 def getTickets(self):
212 diag_node = self.q_toRT.get(block = True)
213 if diag_node != None:
214 host = diag_node['nodename']
215 (b_host_inticket, r_ticket) = is_host_in_rt_tickets(host, \
216 self.l_ticket_blacklist, \
218 diag_node['found_rt_ticket'] = None
220 logger.debug("RT: found tickets for %s" %host)
221 diag_node['found_rt_ticket'] = r_ticket['ticket_id']
224 if r_ticket is not None:
225 print "Ignoring ticket %s" % r_ticket['ticket_id']
226 # TODO: why do i return the ticket id for a
227 # blacklisted ticket id?
228 #diag_node['found_rt_ticket'] = r_ticket['ticket_id']
229 self.count = self.count + 1
231 self.q_fromRT.put(diag_node)
233 print "RT processed %d nodes with noticket" % self.count
234 logger.debug("RT filtered %d noticket nodes" % self.count)
235 self.q_fromRT.put(None)
239 # Removes hosts that are no longer down.
240 def remTickets(self):
241 logger.debug("Removing stale entries from DB.")
242 prevdown = self.tickets.keys()
245 ##BEGIN HACK. This should be outside of this file. passed to class.
246 #cmn = comon.Comon(None, None)
248 #for bucket in cmn.comonbkts.keys():
249 # for host in getattr(cmn,bucket):
250 # if host not in currdown: currdown.append(host)
253 # Actually do the comparison
254 #for host in prevdown:
255 # if host not in currdown:
256 # del self.tickets[host]
257 # logger.info("RT: %s no longer down." % host)
260 def updateTickets(self):
261 logger.info("Refreshing DB.")
262 for host in self.tickets.keys():
263 # Put back in Q to refresh
264 self.q_toRT.put(host)
266 def cleanTickets(self):
273 logger.setLevel(logging.DEBUG)
274 ch = logging.StreamHandler()
275 ch.setLevel(logging.DEBUG)
276 formatter = logging.Formatter('%(message)s')
277 ch.setFormatter(formatter)
278 logger.addHandler(ch)
280 bucket = Queue.Queue()
282 a = RT(tickets, bucket)
283 b = RT(tickets, bucket)
284 c = RT(tickets, bucket)
285 d = RT(tickets, bucket)
286 e = RT(tickets, bucket)
291 tmp = ('planetlab-1.cs.ucy.ac.cy','planetlab-2.vuse.vanderbilt.edu', 'planetlab-11.cs.princeton.edu', 'planet03.csc.ncsu.edu', 'planetlab1.pop-rj.rnp.br', 'planet1.halifax.canet4.nodes.planet-lab.org', 'planet1.cavite.nodes.planet-lab.org', 'ds-pl3.technion.ac.il', 'planetlab2.cs.purdue.edu', 'planetlab3.millennium.berkeley.edu', 'planetlab1.unl.edu', 'planetlab1.cs.colorado.edu', 'planetlab02.cs.washington.edu', 'orbpl2.rutgers.edu', 'planetlab2.informatik.uni-erlangen.de', 'pl2.ernet.in', 'neu2.6planetlab.edu.cn', 'planetlab-2.cs.uni-paderborn.de', 'planetlab1.elet.polimi.it', 'planetlab2.iiitb.ac.in', 'server1.planetlab.iit-tech.net', 'planetlab2.iitb.ac.in', 'planetlab1.ece.ucdavis.edu', 'planetlab02.dis.unina.it', 'planetlab-1.dis.uniroma1.it', 'planetlab1.iitb.ac.in', 'pku1.6planetlab.edu.cn', 'planetlab1.warsaw.rd.tp.pl', 'planetlab2.cs.unc.edu', 'csu2.6planetlab.edu.cn', 'pl1.ernet.in', 'planetlab2.georgetown.edu', 'planetlab1.cs.uchicago.edu')
294 #et = Thread(target=e.pushHosts)
302 #cmn = comon.Comon(cdb, bucket)
304 #for bucket in cmn.comonbkts.keys():
305 # for host in getattr(cmn,bucket):
308 at = Thread(target=a.cleanTickets)
314 if __name__ == '__main__':