12 from threading import *
15 # RT database access constants file
16 RT_DB_CONSTANTS_PATH='/etc/planetlab/rt_db'
19 logger = logging.getLogger("monitor")
21 # seconds between ticket update
24 def stripQuotes( str ):
28 if str[len(str)-1] in quotes:
33 def readConstantsFile( file_path ):
35 read a file consisting of lines of
39 and return a dictionary of the values.
41 blank lines, and lines starting with # (comments) are skipped
47 input_file= file(file_path,"r")
51 for line in input_file:
54 line= string.strip(line)
58 parts= string.split(line,"=",)
62 contents[parts[0]]= stripQuotes(parts[1])
70 # read plc database passwords and connect
71 rt_db_constants= readConstantsFile(RT_DB_CONSTANTS_PATH)
72 if rt_db_constants is None:
73 print "Unable to read database access constants from %s" % \
78 rt_db = MySQLdb.connect(host=rt_db_constants['RT_DB_HOST'],
79 user=rt_db_constants['RT_DB_USER'],
80 passwd=rt_db_constants['RT_DB_PASSWORD'],
81 db=rt_db_constants['RT_DB_NAME'])
82 except Exception, err:
83 print "Failed to connect to RT database: %s" %err
93 # sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject
95 # JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
96 # JOIN Attachments AS At ON Tr.id=At.TransactionID
97 # WHERE (At.Content LIKE '%%%s%%' OR
98 # At.Subject LIKE '%%%s%%') AND
99 # (Tk.Status = 'new' OR Tk.Status = 'open') AND
100 # Tk.Queue = 3 OR Tk.Queue = 19
101 # ORDER BY Tk.Status, Tk.LastUpdated DESC""" \
102 # % (hostname,hostname)
103 # sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject
105 # JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
106 # JOIN Attachments AS At ON Tr.id=At.TransactionID
107 # WHERE (At.Content LIKE '%%%s%%' OR
108 # At.Subject LIKE '%%%s%%') AND
109 # (Tk.Status = 'new' OR Tk.Status = 'open')
110 # ORDER BY Tk.Status, Tk.LastUpdated DESC""" \
111 # % (hostname,hostname)
113 # Queue == 10 is the spam Queue in RT.
114 sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject, At.Content
115 FROM Tickets AS Tk, Attachments AS At
116 JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
117 WHERE Tk.Queue != 10 AND Tk.id > 10000 AND
118 Tr.id=At.TransactionID AND (Tk.Status = 'new' OR Tk.Status = 'open')"""
121 # create a 'cursor' (required by MySQLdb)
124 except Exception, err:
125 print "Could not execute RT query %s" %err
128 # fetch all rows (list of lists)
131 # map list of lists (raw) to list of dicts (tickets)
132 # when int gets pulls from SQL into python ints are converted to LONG to
133 # prevent overflow .. convert back
134 tickets = map(lambda x: {"ticket_id":int(x[0]),
137 "content":str(x[3])},
143 def is_host_in_rt_tickets(host, ad_rt_tickets):
144 # ad_rt_tickets is an array of dicts, defined above.
145 if len(ad_rt_tickets) == 0:
148 d_ticket = ad_rt_tickets[0]
149 if not ('ticket_id' in d_ticket and 'status' in d_ticket and
150 'subj' in d_ticket and 'content' in d_ticket):
151 logger.debug("RT_tickets array has wrong fields!!!")
154 #logger.debug("Searching all tickets for %s" % host)
155 def search_tickets(host, ad_rt_tickets):
156 # compile once for more efficiency
157 re_host = re.compile(host)
158 for x in ad_rt_tickets:
159 if re_host.search(x['subj'], re.MULTILINE|re.IGNORECASE) or \
160 re_host.search(x['content'], re.MULTILINE|re.IGNORECASE):
161 logger.debug("\t ticket %d has %s" % (x['ticket_id'], host))
163 logger.debug("\t noticket -- has %s" % host)
166 # This search, while O(tickets), takes less than a millisecond, 05-25-07
167 #t = soltesz.MyTimer()
168 ret = search_tickets(host, ad_rt_tickets)
175 Finds tickets associated with hostnames.
176 The idea is if you give it an array of host names,
177 presumeably from comon's list of bad nodes, it starts
178 a few threads to query RT. RT takes a while to return.
180 This is turning into a reinvention of DB design, which I dont believe in.
181 In an effort to keep things minimal, here's the basic algo:
183 Give list of hostnames to RT()
184 Finds tickets associate with new hostnames (not in dict(tickets)).
185 Remove nodes that have come backup. Don't care of ticket is closed after first query.
186 Another thread refresh tickets of nodes already in dict and remove nodes that have come up.
189 def __init__(self, dbTickets, tickets, qin_toCheck, qout_sickNoTicket, target = None):
190 # Time of last update of ticket DB
191 self.dbTickets = dbTickets
193 # Check host in queue. Queue populated from comon data of sick.
194 self.qin_toCheck = qin_toCheck
195 # Result of rt db query. Nodes without tickets that are sick.
196 self.qout_sickNoTicket = qout_sickNoTicket
197 #DB of tickets. Name -> ticket
198 self.tickets = tickets
199 Thread.__init__(self,target = self.getTickets)
201 # Takes node from qin_toCheck, gets tickets.
202 # Thread that actually gets the tickets.
203 def getTickets(self):
206 diag_node = self.qin_toCheck.get(block = True)
207 if diag_node == "None":
208 print "RT processed %d nodes with noticket" % self.count
209 logger.debug("RT filtered %d noticket nodes" % self.count)
210 self.qout_sickNoTicket.put("None")
213 host = diag_node['nodename']
214 (b_host_inticket, r_ticket) = is_host_in_rt_tickets(host, self.dbTickets)
216 logger.debug("RT: found tickets for %s" %host)
217 diag_node['stage'] = 'stage_rt_working'
218 diag_node['ticket_id'] = r_ticket['ticket_id']
219 self.tickets[host] = r_ticket
221 #logger.debug("RT: no tix for %s" %host)
222 #print "no tix for %s" % host
223 self.count = self.count + 1
225 # process diag_node for either case
226 self.qout_sickNoTicket.put(diag_node)
228 # Removes hosts that are no longer down.
229 def remTickets(self):
230 logger.debug("Removing stale entries from DB.")
231 prevdown = self.tickets.keys()
234 #BEGIN HACK. This should be outside of this file. passed to class.
235 cmn = comon.Comon(None, None)
237 for bucket in cmn.comonbkts.keys():
238 for host in getattr(cmn,bucket):
239 if host not in currdown: currdown.append(host)
242 # Actually do the comparison
243 for host in prevdown:
244 if host not in currdown:
245 del self.tickets[host]
246 logger.info("RT: %s no longer down." % host)
249 def updateTickets(self):
250 logger.info("Refreshing DB.")
251 for host in self.tickets.keys():
252 # Put back in Q to refresh
253 self.qin_toCheck.put(host)
255 def cleanTickets(self):
262 logger.setLevel(logging.DEBUG)
263 ch = logging.StreamHandler()
264 ch.setLevel(logging.DEBUG)
265 formatter = logging.Formatter('%(message)s')
266 ch.setFormatter(formatter)
267 logger.addHandler(ch)
269 bucket = Queue.Queue()
271 a = RT(tickets, bucket)
272 b = RT(tickets, bucket)
273 c = RT(tickets, bucket)
274 d = RT(tickets, bucket)
275 e = RT(tickets, bucket)
280 tmp = ('planetlab-1.cs.ucy.ac.cy','planetlab-2.vuse.vanderbilt.edu', 'planetlab-11.cs.princeton.edu', 'planet03.csc.ncsu.edu', 'planetlab1.pop-rj.rnp.br', 'planet1.halifax.canet4.nodes.planet-lab.org', 'planet1.cavite.nodes.planet-lab.org', 'ds-pl3.technion.ac.il', 'planetlab2.cs.purdue.edu', 'planetlab3.millennium.berkeley.edu', 'planetlab1.unl.edu', 'planetlab1.cs.colorado.edu', 'planetlab02.cs.washington.edu', 'orbpl2.rutgers.edu', 'planetlab2.informatik.uni-erlangen.de', 'pl2.ernet.in', 'neu2.6planetlab.edu.cn', 'planetlab-2.cs.uni-paderborn.de', 'planetlab1.elet.polimi.it', 'planetlab2.iiitb.ac.in', 'server1.planetlab.iit-tech.net', 'planetlab2.iitb.ac.in', 'planetlab1.ece.ucdavis.edu', 'planetlab02.dis.unina.it', 'planetlab-1.dis.uniroma1.it', 'planetlab1.iitb.ac.in', 'pku1.6planetlab.edu.cn', 'planetlab1.warsaw.rd.tp.pl', 'planetlab2.cs.unc.edu', 'csu2.6planetlab.edu.cn', 'pl1.ernet.in', 'planetlab2.georgetown.edu', 'planetlab1.cs.uchicago.edu')
283 #et = Thread(target=e.pushHosts)
291 #cmn = comon.Comon(cdb, bucket)
293 #for bucket in cmn.comonbkts.keys():
294 # for host in getattr(cmn,bucket):
297 at = Thread(target=a.cleanTickets)
303 if __name__ == '__main__':