12 from threading import *
15 # RT database access constants file
16 RT_DB_CONSTANTS_PATH='/etc/planetlab/rt_db'
19 logger = logging.getLogger("monitor")
21 # seconds between ticket update
24 def stripQuotes( str ):
28 if str[len(str)-1] in quotes:
33 def readConstantsFile( file_path ):
35 read a file consisting of lines of
39 and return a dictionary of the values.
41 blank lines, and lines starting with # (comments) are skipped
47 input_file= file(file_path,"r")
51 for line in input_file:
54 line= string.strip(line)
58 parts= string.split(line,"=",)
62 contents[parts[0]]= stripQuotes(parts[1])
70 # read plc database passwords and connect
71 rt_db_constants= readConstantsFile(RT_DB_CONSTANTS_PATH)
72 if rt_db_constants is None:
73 print "Unable to read database access constants from %s" % \
78 rt_db = MySQLdb.connect(host=rt_db_constants['RT_DB_HOST'],
79 user=rt_db_constants['RT_DB_USER'],
80 passwd=rt_db_constants['RT_DB_PASSWORD'],
81 db=rt_db_constants['RT_DB_NAME'])
82 except Exception, err:
83 print "Failed to connect to RT database: %s" %err
93 # sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject
95 # JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
96 # JOIN Attachments AS At ON Tr.id=At.TransactionID
97 # WHERE (At.Content LIKE '%%%s%%' OR
98 # At.Subject LIKE '%%%s%%') AND
99 # (Tk.Status = 'new' OR Tk.Status = 'open') AND
100 # Tk.Queue = 3 OR Tk.Queue = 19
101 # ORDER BY Tk.Status, Tk.LastUpdated DESC""" \
102 # % (hostname,hostname)
103 # sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject
105 # JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
106 # JOIN Attachments AS At ON Tr.id=At.TransactionID
107 # WHERE (At.Content LIKE '%%%s%%' OR
108 # At.Subject LIKE '%%%s%%') AND
109 # (Tk.Status = 'new' OR Tk.Status = 'open')
110 # ORDER BY Tk.Status, Tk.LastUpdated DESC""" \
111 # % (hostname,hostname)
113 # Queue == 10 is the spam Queue in RT.
114 sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject, At.Content
115 FROM Tickets AS Tk, Attachments AS At
116 JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
117 WHERE Tk.Queue != 10 AND Tk.id > 10000 AND
118 Tr.id=At.TransactionID AND (Tk.Status = 'new' OR Tk.Status = 'open')"""
121 # create a 'cursor' (required by MySQLdb)
124 except Exception, err:
125 print "Could not execute RT query %s" %err
128 # fetch all rows (list of lists)
131 # map list of lists (raw) to list of dicts (tickets)
132 # when int gets pulls from SQL into python ints are converted to LONG to
133 # prevent overflow .. convert back
134 #tickets = map(lambda x: {"ticket_id":int(x[0]),
135 tickets = map(lambda x: {"ticket_id":str(x[0]),
138 "content":str(x[3])},
144 def is_host_in_rt_tickets(host, ticket_blacklist, ad_rt_tickets):
145 # ad_rt_tickets is an array of dicts, defined above.
146 if len(ad_rt_tickets) == 0:
149 d_ticket = ad_rt_tickets[0]
150 if not ('ticket_id' in d_ticket and 'status' in d_ticket and
151 'subj' in d_ticket and 'content' in d_ticket):
152 logger.debug("RT_tickets array has wrong fields!!!")
155 #logger.debug("Searching all tickets for %s" % host)
156 def search_tickets(host, ad_rt_tickets):
157 # compile once for more efficiency
158 re_host = re.compile(host)
159 for x in ad_rt_tickets:
160 if re_host.search(x['subj'], re.MULTILINE|re.IGNORECASE) or \
161 re_host.search(x['content'], re.MULTILINE|re.IGNORECASE):
162 logger.debug("\t ticket %s has %s" % (x['ticket_id'], host))
164 print ticket_blacklist
165 if x['ticket_id'] in ticket_blacklist:
169 logger.debug("\t noticket -- has %s" % host)
172 # This search, while O(tickets), takes less than a millisecond, 05-25-07
173 #t = soltesz.MyTimer()
174 ret = search_tickets(host, ad_rt_tickets)
181 Finds tickets associated with hostnames.
182 The idea is if you give it an array of host names,
183 presumeably from comon's list of bad nodes, it starts
184 a few threads to query RT. RT takes a while to return.
186 This is turning into a reinvention of DB design, which I dont believe in.
187 In an effort to keep things minimal, here's the basic algo:
189 Give list of hostnames to RT()
190 Finds tickets associate with new hostnames (not in dict(tickets)).
191 Remove nodes that have come backup. Don't care of ticket is closed after first query.
192 Another thread refresh tickets of nodes already in dict and remove nodes that have come up.
195 def __init__(self, dbTickets, tickets, qin_toCheck, qout_sickNoTicket, l_ticket_blacklist, target = None):
196 # Time of last update of ticket DB
197 self.dbTickets = dbTickets
199 self.l_ticket_blacklist = l_ticket_blacklist
200 # Check host in queue. Queue populated from comon data of sick.
201 self.qin_toCheck = qin_toCheck
202 # Result of rt db query. Nodes without tickets that are sick.
203 self.qout_sickNoTicket = qout_sickNoTicket
204 #DB of tickets. Name -> ticket
205 self.tickets = tickets
206 Thread.__init__(self,target = self.getTickets)
208 # Takes node from qin_toCheck, gets tickets.
209 # Thread that actually gets the tickets.
210 def getTickets(self):
213 diag_node = self.qin_toCheck.get(block = True)
214 if diag_node == "None":
215 print "RT processed %d nodes with noticket" % self.count
216 logger.debug("RT filtered %d noticket nodes" % self.count)
217 self.qout_sickNoTicket.put("None")
220 host = diag_node['nodename']
221 (b_host_inticket, r_ticket) = is_host_in_rt_tickets(host, \
222 self.l_ticket_blacklist, \
225 logger.debug("RT: found tickets for %s" %host)
226 diag_node['stage'] = 'stage_rt_working'
227 diag_node['ticket_id'] = r_ticket['ticket_id']
228 self.tickets[host] = r_ticket
230 #logger.debug("RT: no tix for %s" %host)
231 #print "no tix for %s" % host
232 if r_ticket is not None:
233 print "Ignoring ticket %s" % r_ticket['ticket_id']
234 self.count = self.count + 1
236 # process diag_node for either case
237 self.qout_sickNoTicket.put(diag_node)
239 # Removes hosts that are no longer down.
240 def remTickets(self):
241 logger.debug("Removing stale entries from DB.")
242 prevdown = self.tickets.keys()
245 #BEGIN HACK. This should be outside of this file. passed to class.
246 cmn = comon.Comon(None, None)
248 for bucket in cmn.comonbkts.keys():
249 for host in getattr(cmn,bucket):
250 if host not in currdown: currdown.append(host)
253 # Actually do the comparison
254 for host in prevdown:
255 if host not in currdown:
256 del self.tickets[host]
257 logger.info("RT: %s no longer down." % host)
260 def updateTickets(self):
261 logger.info("Refreshing DB.")
262 for host in self.tickets.keys():
263 # Put back in Q to refresh
264 self.qin_toCheck.put(host)
266 def cleanTickets(self):
273 logger.setLevel(logging.DEBUG)
274 ch = logging.StreamHandler()
275 ch.setLevel(logging.DEBUG)
276 formatter = logging.Formatter('%(message)s')
277 ch.setFormatter(formatter)
278 logger.addHandler(ch)
280 bucket = Queue.Queue()
282 a = RT(tickets, bucket)
283 b = RT(tickets, bucket)
284 c = RT(tickets, bucket)
285 d = RT(tickets, bucket)
286 e = RT(tickets, bucket)
291 tmp = ('planetlab-1.cs.ucy.ac.cy','planetlab-2.vuse.vanderbilt.edu', 'planetlab-11.cs.princeton.edu', 'planet03.csc.ncsu.edu', 'planetlab1.pop-rj.rnp.br', 'planet1.halifax.canet4.nodes.planet-lab.org', 'planet1.cavite.nodes.planet-lab.org', 'ds-pl3.technion.ac.il', 'planetlab2.cs.purdue.edu', 'planetlab3.millennium.berkeley.edu', 'planetlab1.unl.edu', 'planetlab1.cs.colorado.edu', 'planetlab02.cs.washington.edu', 'orbpl2.rutgers.edu', 'planetlab2.informatik.uni-erlangen.de', 'pl2.ernet.in', 'neu2.6planetlab.edu.cn', 'planetlab-2.cs.uni-paderborn.de', 'planetlab1.elet.polimi.it', 'planetlab2.iiitb.ac.in', 'server1.planetlab.iit-tech.net', 'planetlab2.iitb.ac.in', 'planetlab1.ece.ucdavis.edu', 'planetlab02.dis.unina.it', 'planetlab-1.dis.uniroma1.it', 'planetlab1.iitb.ac.in', 'pku1.6planetlab.edu.cn', 'planetlab1.warsaw.rd.tp.pl', 'planetlab2.cs.unc.edu', 'csu2.6planetlab.edu.cn', 'pl1.ernet.in', 'planetlab2.georgetown.edu', 'planetlab1.cs.uchicago.edu')
294 #et = Thread(target=e.pushHosts)
302 #cmn = comon.Comon(cdb, bucket)
304 #for bucket in cmn.comonbkts.keys():
305 # for host in getattr(cmn,bucket):
308 at = Thread(target=a.cleanTickets)
314 if __name__ == '__main__':