X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=rt.py;h=4a9c3fd9c5f837dd7118c5abbac01b50d5da8acb;hb=d5b0651a666c9a6b2deb4103ed15c3c06075395d;hp=850920087da001ac224d480fa0ef6149cd695bd4;hpb=34a4c8387ad2e397f46a03d9476096f1cd5abfc6;p=monitor.git diff --git a/rt.py b/rt.py index 8509200..4a9c3fd 100644 --- a/rt.py +++ b/rt.py @@ -10,10 +10,11 @@ import re import comon import soltesz from threading import * -import config + +# TODO: merge the RT mailer from mailer.py into this file. # RT database access constants file -RT_DB_CONSTANTS_PATH='/etc/planetlab/rt_db' +RT_DB_CONSTANTS_PATH='rt_db' #Logging logger = logging.getLogger("monitor") @@ -87,9 +88,24 @@ def open_rt_db(): +def fetch_from_db(db, sql): + try: + # create a 'cursor' (required by MySQLdb) + c = db.cursor() + c.execute(sql) + except Exception, err: + print "Could not execute RT query %s" %err + return -1 + + # fetch all rows (list of lists) + raw = c.fetchall() + return raw + def rt_tickets(): db = open_rt_db() + if db == -1: + return "" # sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject # FROM Tickets AS Tk # JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId @@ -111,36 +127,57 @@ def rt_tickets(): # % (hostname,hostname) # Queue == 10 is the spam Queue in RT. +# SELECT Tk.* FROM Tickets AS Tk, Attachments AS At JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId WHERE Tk.Queue != 10 AND Tk.id > 10000 AND Tr.id=At.TransactionID AND Tk.Status = 'open' ; +# + sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject, At.Content FROM Tickets AS Tk, Attachments AS At JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId WHERE Tk.Queue != 10 AND Tk.id > 10000 AND - Tr.id=At.TransactionID AND (Tk.Status = 'new' OR Tk.Status = 'open')""" - - try: - # create a 'cursor' (required by MySQLdb) - c = db.cursor() - c.execute(sql) - except Exception, err: - print "Could not execute RT query %s" %err - return -1 - - # fetch all rows (list of lists) - raw = c.fetchall() - - # map list of lists (raw) to list of dicts (tickets) - # when int gets pulls from SQL into python ints are converted to LONG to - # prevent overflow .. convert back - tickets = map(lambda x: {"ticket_id":int(x[0]), + Tr.id=At.TransactionID AND Tk.Status = 'open'""" + #Tr.id=At.TransactionID AND (Tk.Status = 'new' OR Tk.Status = 'open')""" + #sqlall = """SELECT distinct Tk.id, Tk.Status, Tk.Subject, At.Content +#FROM Tickets AS Tk, Attachments AS At +#JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId +#WHERE Tk.Queue != 10 AND Tk.id > 10000 AND +#Tr.id=At.TransactionID AND ( Tk.Status = 'open' OR +#Tk.Status = 'new') """ + sqlall = """SELECT distinct Tk.id, Tk.Status, Tk.Subject, At.Content, Us.EmailAddress, Tk.LastUpdated, Q.Name, Tk.Owner FROM Tickets AS Tk, Attachments AS At, Queues as Q, Users as Us JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId WHERE (Tk.Queue=3 OR Tk.Queue=22) AND Tk.id > 10000 AND Tr.id=At.TransactionID AND ( Tk.Status = 'open' OR Tk.Status = 'new') AND Us.id=Tk.LastUpdatedBy AND Q.id=Tk.Queue """ + + + raw = fetch_from_db(db, sql) + if raw == -1: + return raw + tickets = map(lambda x: {"ticket_id":str(x[0]), "status":x[1], "subj":str(x[2]), "content":str(x[3])}, raw) + + raw = fetch_from_db(db,sqlall) + if raw == -1: + return raw + tickets_all = map(lambda x: {"ticket_id":str(x[0]), + "status":x[1], + "subj":str(x[2]), + "content":str(x[3]), + "email":str(x[4]), + "lastupdated":str(x[5]), + "queue":str(x[6]), + "owner":str(x[7]), + }, + raw) + db.close() + idTickets = {} + for t in tickets_all: + idTickets[t['ticket_id']] = t + soltesz.dbDump("idTickets", idTickets) + return tickets -def is_host_in_rt_tickets(host, ad_rt_tickets): +def is_host_in_rt_tickets(host, ticket_blacklist, ad_rt_tickets): # ad_rt_tickets is an array of dicts, defined above. if len(ad_rt_tickets) == 0: return (False, None) @@ -158,9 +195,14 @@ def is_host_in_rt_tickets(host, ad_rt_tickets): for x in ad_rt_tickets: if re_host.search(x['subj'], re.MULTILINE|re.IGNORECASE) or \ re_host.search(x['content'], re.MULTILINE|re.IGNORECASE): - logger.debug("\t ticket %d has %s" % (x['ticket_id'], host)) - return (True, x) - logger.debug("\t noticket -- has %s" % host) + logger.debug("\t ticket %s has %s" % (x['ticket_id'], host)) + print "\t ticket %s has %s" % (x['ticket_id'], host) + if x['ticket_id'] in ticket_blacklist: + return (False, x) + else: + return (True, x) + print "\t noticket -- has %s" % host + #logger.debug("\t noticket -- has %s" % host) return (False, None) # This search, while O(tickets), takes less than a millisecond, 05-25-07 @@ -186,44 +228,47 @@ Remove nodes that have come backup. Don't care of ticket is closed after first q Another thread refresh tickets of nodes already in dict and remove nodes that have come up. ''' class RT(Thread): - def __init__(self, dbTickets, tickets, qin_toCheck, qout_sickNoTicket, target = None): + def __init__(self, dbTickets, q_toRT, q_fromRT, l_ticket_blacklist, target = None): # Time of last update of ticket DB self.dbTickets = dbTickets self.lastupdated = 0 - # Check host in queue. Queue populated from comon data of sick. - self.qin_toCheck = qin_toCheck - # Result of rt db query. Nodes without tickets that are sick. - self.qout_sickNoTicket = qout_sickNoTicket - #DB of tickets. Name -> ticket - self.tickets = tickets + self.l_ticket_blacklist = l_ticket_blacklist + self.q_toRT = q_toRT + self.q_fromRT = q_fromRT + self.tickets = {} Thread.__init__(self,target = self.getTickets) - # Takes node from qin_toCheck, gets tickets. + # Takes node from q_toRT, gets tickets. # Thread that actually gets the tickets. def getTickets(self): self.count = 0 while 1: - diag_node = self.qin_toCheck.get(block = True) - if diag_node == "None": - print "RT processed %d nodes with noticket" % self.count - logger.debug("RT filtered %d noticket nodes" % self.count) - self.qout_sickNoTicket.put("None") - break - else: + diag_node = self.q_toRT.get(block = True) + if diag_node != None: host = diag_node['nodename'] - (b_host_inticket, r_ticket) = is_host_in_rt_tickets(host, self.dbTickets) + (b_host_inticket, r_ticket) = is_host_in_rt_tickets(host, \ + self.l_ticket_blacklist, \ + self.dbTickets) + diag_node['found_rt_ticket'] = None if b_host_inticket: logger.debug("RT: found tickets for %s" %host) - diag_node['stage'] = 'stage_rt_working' - diag_node['ticket_id'] = r_ticket['ticket_id'] - self.tickets[host] = r_ticket + diag_node['found_rt_ticket'] = r_ticket['ticket_id'] + else: - #logger.debug("RT: no tix for %s" %host) - #print "no tix for %s" % host + if r_ticket is not None: + print "Ignoring ticket %s" % r_ticket['ticket_id'] + # TODO: why do i return the ticket id for a + # blacklisted ticket id? + #diag_node['found_rt_ticket'] = r_ticket['ticket_id'] self.count = self.count + 1 - # process diag_node for either case - self.qout_sickNoTicket.put(diag_node) + self.q_fromRT.put(diag_node) + else: + print "RT processed %d nodes with noticket" % self.count + logger.debug("RT filtered %d noticket nodes" % self.count) + self.q_fromRT.put(None) + + break # Removes hosts that are no longer down. def remTickets(self): @@ -231,26 +276,26 @@ class RT(Thread): prevdown = self.tickets.keys() currdown = [] - #BEGIN HACK. This should be outside of this file. passed to class. - cmn = comon.Comon(None, None) - cmn.updatebkts() - for bucket in cmn.comonbkts.keys(): - for host in getattr(cmn,bucket): - if host not in currdown: currdown.append(host) - #END HACK + ##BEGIN HACK. This should be outside of this file. passed to class. + #cmn = comon.Comon(None, None) + # cmn.updatebkts() + #for bucket in cmn.comonbkts.keys(): + # for host in getattr(cmn,bucket): + # if host not in currdown: currdown.append(host) + ##END HACK # Actually do the comparison - for host in prevdown: - if host not in currdown: - del self.tickets[host] - logger.info("RT: %s no longer down." % host) + #for host in prevdown: + # if host not in currdown: + # del self.tickets[host] + # logger.info("RT: %s no longer down." % host) # Update Tickets def updateTickets(self): logger.info("Refreshing DB.") for host in self.tickets.keys(): # Put back in Q to refresh - self.qin_toCheck.put(host) + self.q_toRT.put(host) def cleanTickets(self): while 1: @@ -266,39 +311,9 @@ def main(): ch.setFormatter(formatter) logger.addHandler(ch) - bucket = Queue.Queue() - tickets = {} - a = RT(tickets, bucket) - b = RT(tickets, bucket) - c = RT(tickets, bucket) - d = RT(tickets, bucket) - e = RT(tickets, bucket) - a.start() - b.start() - c.start() - d.start() - tmp = ('planetlab-1.cs.ucy.ac.cy','planetlab-2.vuse.vanderbilt.edu', 'planetlab-11.cs.princeton.edu', 'planet03.csc.ncsu.edu', 'planetlab1.pop-rj.rnp.br', 'planet1.halifax.canet4.nodes.planet-lab.org', 'planet1.cavite.nodes.planet-lab.org', 'ds-pl3.technion.ac.il', 'planetlab2.cs.purdue.edu', 'planetlab3.millennium.berkeley.edu', 'planetlab1.unl.edu', 'planetlab1.cs.colorado.edu', 'planetlab02.cs.washington.edu', 'orbpl2.rutgers.edu', 'planetlab2.informatik.uni-erlangen.de', 'pl2.ernet.in', 'neu2.6planetlab.edu.cn', 'planetlab-2.cs.uni-paderborn.de', 'planetlab1.elet.polimi.it', 'planetlab2.iiitb.ac.in', 'server1.planetlab.iit-tech.net', 'planetlab2.iitb.ac.in', 'planetlab1.ece.ucdavis.edu', 'planetlab02.dis.unina.it', 'planetlab-1.dis.uniroma1.it', 'planetlab1.iitb.ac.in', 'pku1.6planetlab.edu.cn', 'planetlab1.warsaw.rd.tp.pl', 'planetlab2.cs.unc.edu', 'csu2.6planetlab.edu.cn', 'pl1.ernet.in', 'planetlab2.georgetown.edu', 'planetlab1.cs.uchicago.edu') - for host in tmp: - bucket.put(host) - #et = Thread(target=e.pushHosts) - #et.start() - time.sleep(15) - print tickets.keys() - time.sleep(15) - print tickets.keys() - time.sleep(15) - print tickets.keys() - #cmn = comon.Comon(cdb, bucket) - #cmn.updatebkts() - #for bucket in cmn.comonbkts.keys(): -# for host in getattr(cmn,bucket): -# alldown.put(host) -# - at = Thread(target=a.cleanTickets) - at.start() - time.sleep(15) - print tickets.keys() - os._exit(0) + tickets = rt_tickets() + soltesz.dbDump("ad_dbTickets", tickets) + if __name__ == '__main__': main()