X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=rt.py;h=0ea0a5502ebc37f3d5548a77477a7f1640289e5e;hb=6d46ab9b534b60675a3dcb11fcb664589a3691f8;hp=4e56869e63c6572ca46971c318732d575540ed75;hpb=55ec8f3eb860029ecf552d8c0a3cc6dbd2f66b68;p=monitor.git diff --git a/rt.py b/rt.py index 4e56869..0ea0a55 100644 --- a/rt.py +++ b/rt.py @@ -6,11 +6,12 @@ import string import logging import Queue import time -import comon +import re +import database from threading import * +import monitorconfig -# RT database access constants file -RT_DB_CONSTANTS_PATH='/etc/planetlab/rt_db' +# TODO: merge the RT mailer from mailer.py into this file. #Logging logger = logging.getLogger("monitor") @@ -65,39 +66,26 @@ def readConstantsFile( file_path ): def open_rt_db(): # read plc database passwords and connect - rt_db_constants= readConstantsFile(RT_DB_CONSTANTS_PATH) - if rt_db_constants is None: - print "Unable to read database access constants from %s" % \ - RT_DB_CONSTANTS_PATH - return -1 + #rt_db_constants= readConstantsFile(RT_DB_CONSTANTS_PATH) + #if rt_db_constants is None: + # print "Unable to read database access constants from %s" % \ + # RT_DB_CONSTANTS_PATH + # return -1 try: - rt_db = MySQLdb.connect(host=rt_db_constants['RT_DB_HOST'], - user=rt_db_constants['RT_DB_USER'], - passwd=rt_db_constants['RT_DB_PASSWORD'], - db=rt_db_constants['RT_DB_NAME']) - except Error: - print "Failed to connect to RT database" + rt_db = MySQLdb.connect(host=monitorconfig.RT_DB_HOST, + user=monitorconfig.RT_DB_USER, + passwd=monitorconfig.RT_DB_PASSWORD, + db=monitorconfig.RT_DB_NAME) + except Exception, err: + print "Failed to connect to RT database: %s" %err return -1 return rt_db - -def rt_tickets(hostname): - db = open_rt_db() - sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject - FROM Tickets AS Tk - JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId - JOIN Attachments AS At ON Tr.id=At.TransactionID - WHERE (At.Content LIKE '%%%s%%' OR - At.Subject LIKE '%%%s%%') AND - (Tk.Status = 'new' OR Tk.Status = 'open') AND - Tk.Queue = 3 - ORDER BY Tk.Status, Tk.LastUpdated DESC""" \ - % (hostname,hostname) - +def fetch_from_db(db, sql): try: # create a 'cursor' (required by MySQLdb) c = db.cursor() @@ -108,18 +96,119 @@ def rt_tickets(hostname): # fetch all rows (list of lists) raw = c.fetchall() + return raw + - # map list of lists (raw) to list of dicts (tickets) - # when int gets pulls from SQL into python ints are converted to LONG to - # prevent overflow .. convert back - tickets = map(lambda x: {"ticket_id":int(x[0]), +def rt_tickets(): + db = open_rt_db() + if db == -1: + return "" +# sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject +# FROM Tickets AS Tk +# JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId +# JOIN Attachments AS At ON Tr.id=At.TransactionID +# WHERE (At.Content LIKE '%%%s%%' OR +# At.Subject LIKE '%%%s%%') AND +# (Tk.Status = 'new' OR Tk.Status = 'open') AND +# Tk.Queue = 3 OR Tk.Queue = 19 +# ORDER BY Tk.Status, Tk.LastUpdated DESC""" \ +# % (hostname,hostname) +# sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject +# FROM Tickets AS Tk +# JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId +# JOIN Attachments AS At ON Tr.id=At.TransactionID +# WHERE (At.Content LIKE '%%%s%%' OR +# At.Subject LIKE '%%%s%%') AND +# (Tk.Status = 'new' OR Tk.Status = 'open') +# ORDER BY Tk.Status, Tk.LastUpdated DESC""" \ +# % (hostname,hostname) + + # Queue == 10 is the spam Queue in RT. +# SELECT Tk.* FROM Tickets AS Tk, Attachments AS At JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId WHERE Tk.Queue != 10 AND Tk.id > 10000 AND Tr.id=At.TransactionID AND Tk.Status = 'open' ; +# + + sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject, At.Content + FROM Tickets AS Tk, Attachments AS At + JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId + WHERE Tk.Queue != 10 AND Tk.id > 10000 AND + Tr.id=At.TransactionID AND Tk.Status = 'open'""" + #Tr.id=At.TransactionID AND (Tk.Status = 'new' OR Tk.Status = 'open')""" + #sqlall = """SELECT distinct Tk.id, Tk.Status, Tk.Subject, At.Content +#FROM Tickets AS Tk, Attachments AS At +#JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId +#WHERE Tk.Queue != 10 AND Tk.id > 10000 AND +#Tr.id=At.TransactionID AND ( Tk.Status = 'open' OR +#Tk.Status = 'new') """ + sqlall = """SELECT distinct Tk.id, Tk.Status, Tk.Subject, At.Content, Us.EmailAddress, Tk.LastUpdated, Q.Name, Tk.Owner FROM Tickets AS Tk, Attachments AS At, Queues as Q, Users as Us JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId WHERE (Tk.Queue=3 OR Tk.Queue=22) AND Tk.id > 10000 AND Tr.id=At.TransactionID AND ( Tk.Status = 'open' OR Tk.Status = 'new') AND Us.id=Tk.LastUpdatedBy AND Q.id=Tk.Queue """ + + + raw = fetch_from_db(db, sql) + if raw == -1: + return raw + tickets = map(lambda x: {"ticket_id":str(x[0]), "status":x[1], - "subj":x[2]}, + "subj":str(x[2]), + "content":str(x[3])}, raw) + + raw = fetch_from_db(db,sqlall) + if raw == -1: + return raw + tickets_all = map(lambda x: {"ticket_id":str(x[0]), + "status":x[1], + "subj":str(x[2]), + "content":str(x[3]), + "email":str(x[4]), + "lastupdated":str(x[5]), + "queue":str(x[6]), + "owner":str(x[7]), + }, + raw) + db.close() + idTickets = {} + for t in tickets_all: + idTickets[t['ticket_id']] = t + database.dbDump("idTickets", idTickets) + return tickets +def is_host_in_rt_tickets(host, ticket_blacklist, ad_rt_tickets): + # ad_rt_tickets is an array of dicts, defined above. + if len(ad_rt_tickets) == 0: + return (False, None) + + d_ticket = ad_rt_tickets[0] + if not ('ticket_id' in d_ticket and 'status' in d_ticket and + 'subj' in d_ticket and 'content' in d_ticket): + logger.debug("RT_tickets array has wrong fields!!!") + return (False, None) + + #logger.debug("Searching all tickets for %s" % host) + def search_tickets(host, ad_rt_tickets): + # compile once for more efficiency + re_host = re.compile(host) + for x in ad_rt_tickets: + if re_host.search(x['subj'], re.MULTILINE|re.IGNORECASE) or \ + re_host.search(x['content'], re.MULTILINE|re.IGNORECASE): + logger.debug("\t ticket %s has %s" % (x['ticket_id'], host)) + print "\t ticket %s has %s" % (x['ticket_id'], host) + if x['ticket_id'] in ticket_blacklist: + return (False, x) + else: + return (True, x) + print "\t noticket -- has %s" % host + #logger.debug("\t noticket -- has %s" % host) + return (False, None) + + # This search, while O(tickets), takes less than a millisecond, 05-25-07 + #t = commands.MyTimer() + ret = search_tickets(host, ad_rt_tickets) + #del t + + return ret + ''' Finds tickets associated with hostnames. @@ -136,27 +225,47 @@ Remove nodes that have come backup. Don't care of ticket is closed after first q Another thread refresh tickets of nodes already in dict and remove nodes that have come up. ''' class RT(Thread): - def __init__(self, tickets, bucket, target = None): + def __init__(self, dbTickets, q_toRT, q_fromRT, l_ticket_blacklist, target = None): # Time of last update of ticket DB + self.dbTickets = dbTickets self.lastupdated = 0 - # Queue() is MP/MC self locking - self.bucket = bucket - #DB of tickets. Name -> ticket - self.tickets = tickets + self.l_ticket_blacklist = l_ticket_blacklist + self.q_toRT = q_toRT + self.q_fromRT = q_fromRT + self.tickets = {} Thread.__init__(self,target = self.getTickets) - # Takes node from alldownq, gets tickets. + # Takes node from q_toRT, gets tickets. # Thread that actually gets the tickets. def getTickets(self): + self.count = 0 while 1: - host = self.bucket.get(block = True) - if host == "None": break - #if self.tickets.has_key(host) == False: - logger.debug("Popping from q - %s" %host) - tmp = rt_tickets(host) - if tmp: - logger.debug("Found tickets for %s" %host) - self.tickets[host] = tmp + diag_node = self.q_toRT.get(block = True) + if diag_node != None: + host = diag_node['nodename'] + (b_host_inticket, r_ticket) = is_host_in_rt_tickets(host, \ + self.l_ticket_blacklist, \ + self.dbTickets) + diag_node['found_rt_ticket'] = None + if b_host_inticket: + logger.debug("RT: found tickets for %s" %host) + diag_node['found_rt_ticket'] = r_ticket['ticket_id'] + + else: + if r_ticket is not None: + print "Ignoring ticket %s" % r_ticket['ticket_id'] + # TODO: why do i return the ticket id for a + # blacklisted ticket id? + #diag_node['found_rt_ticket'] = r_ticket['ticket_id'] + self.count = self.count + 1 + + self.q_fromRT.put(diag_node) + else: + print "RT processed %d nodes with noticket" % self.count + logger.debug("RT filtered %d noticket nodes" % self.count) + self.q_fromRT.put(None) + + break # Removes hosts that are no longer down. def remTickets(self): @@ -164,26 +273,26 @@ class RT(Thread): prevdown = self.tickets.keys() currdown = [] - #BEGIN HACK. This should be outside of this file. passed to class. - cmn = comon.Comon(None, None) - cmn.updatebkts() - for bucket in cmn.comonbkts.keys(): - for host in getattr(cmn,bucket): - if host not in currdown: currdown.append(host) - #END HACK + ##BEGIN HACK. This should be outside of this file. passed to class. + #cmn = comon.Comon(None, None) + # cmn.updatebkts() + #for bucket in cmn.comonbkts.keys(): + # for host in getattr(cmn,bucket): + # if host not in currdown: currdown.append(host) + ##END HACK # Actually do the comparison - for host in prevdown: - if host not in currdown: - del self.tickets[host] - logger.info("%s no longer down" % host) + #for host in prevdown: + # if host not in currdown: + # del self.tickets[host] + # logger.info("RT: %s no longer down." % host) # Update Tickets def updateTickets(self): logger.info("Refreshing DB.") for host in self.tickets.keys(): # Put back in Q to refresh - self.bucket.put(host) + self.q_toRT.put(host) def cleanTickets(self): while 1: @@ -199,39 +308,9 @@ def main(): ch.setFormatter(formatter) logger.addHandler(ch) - bucket = Queue.Queue() - tickets = {} - a = RT(tickets, bucket) - b = RT(tickets, bucket) - c = RT(tickets, bucket) - d = RT(tickets, bucket) - e = RT(tickets, bucket) - a.start() - b.start() - c.start() - d.start() - tmp = ('planetlab-2.vuse.vanderbilt.edu', 'planetlab-11.cs.princeton.edu', 'planet03.csc.ncsu.edu', 'planetlab1.pop-rj.rnp.br', 'planet1.halifax.canet4.nodes.planet-lab.org', 'planet1.cavite.nodes.planet-lab.org', 'ds-pl3.technion.ac.il', 'planetlab2.cs.purdue.edu', 'planetlab3.millennium.berkeley.edu', 'planetlab1.unl.edu', 'planetlab1.cs.colorado.edu', 'planetlab02.cs.washington.edu', 'orbpl2.rutgers.edu', 'planetlab2.informatik.uni-erlangen.de', 'pl2.ernet.in', 'neu2.6planetlab.edu.cn', 'planetlab-2.cs.uni-paderborn.de', 'planetlab1.elet.polimi.it', 'planetlab2.iiitb.ac.in', 'server1.planetlab.iit-tech.net', 'planetlab2.iitb.ac.in', 'planetlab1.ece.ucdavis.edu', 'planetlab02.dis.unina.it', 'planetlab-1.dis.uniroma1.it', 'planetlab1.iitb.ac.in', 'pku1.6planetlab.edu.cn', 'planetlab1.warsaw.rd.tp.pl', 'planetlab2.cs.unc.edu', 'csu2.6planetlab.edu.cn', 'pl1.ernet.in', 'planetlab2.georgetown.edu', 'planetlab1.cs.uchicago.edu') - for host in tmp: - bucket.put(host) - #et = Thread(target=e.pushHosts) - #et.start() - time.sleep(15) - print tickets.keys() - time.sleep(15) - print tickets.keys() - time.sleep(15) - print tickets.keys() - #cmn = comon.Comon(cdb, bucket) - #cmn.updatebkts() - #for bucket in cmn.comonbkts.keys(): -# for host in getattr(cmn,bucket): -# alldown.put(host) -# - at = Thread(target=a.cleanTickets) - at.start() - time.sleep(15) - print tickets.keys() - os._exit(0) + tickets = rt_tickets() + database.dbDump("ad_dbTickets", tickets) + if __name__ == '__main__': main()