import logging
import Queue
import time
-import comon
+import re
+import database
from threading import *
import config
-# RT database access constants file
-RT_DB_CONSTANTS_PATH='/etc/planetlab/rt_db'
+# TODO: merge the RT mailer from mailer.py into this file.
#Logging
logger = logging.getLogger("monitor")
def open_rt_db():
# read plc database passwords and connect
- rt_db_constants= readConstantsFile(RT_DB_CONSTANTS_PATH)
- if rt_db_constants is None:
- print "Unable to read database access constants from %s" % \
- RT_DB_CONSTANTS_PATH
- return -1
+ #rt_db_constants= readConstantsFile(RT_DB_CONSTANTS_PATH)
+ #if rt_db_constants is None:
+ # print "Unable to read database access constants from %s" % \
+ # RT_DB_CONSTANTS_PATH
+ # return -1
try:
- rt_db = MySQLdb.connect(host=rt_db_constants['RT_DB_HOST'],
- user=rt_db_constants['RT_DB_USER'],
- passwd=rt_db_constants['RT_DB_PASSWORD'],
- db=rt_db_constants['RT_DB_NAME'])
- except Error:
- print "Failed to connect to RT database"
+ rt_db = MySQLdb.connect(host=config.RT_DB_HOST,
+ user=config.RT_DB_USER,
+ passwd=config.RT_DB_PASSWORD,
+ db=config.RT_DB_NAME)
+ except Exception, err:
+ print "Failed to connect to RT database: %s" %err
return -1
return rt_db
+def fetch_from_db(db, sql):
+ try:
+ # create a 'cursor' (required by MySQLdb)
+ c = db.cursor()
+ c.execute(sql)
+ except Exception, err:
+ print "Could not execute RT query %s" %err
+ return -1
+
+ # fetch all rows (list of lists)
+ raw = c.fetchall()
+ return raw
+
-def rt_tickets(hostname):
+def rt_tickets():
db = open_rt_db()
+ if db == -1:
+ return ""
# sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject
# FROM Tickets AS Tk
# JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
# Tk.Queue = 3 OR Tk.Queue = 19
# ORDER BY Tk.Status, Tk.LastUpdated DESC""" \
# % (hostname,hostname)
- sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject
- FROM Tickets AS Tk
- JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
- JOIN Attachments AS At ON Tr.id=At.TransactionID
- WHERE (At.Content LIKE '%%%s%%' OR
- At.Subject LIKE '%%%s%%') AND
- (Tk.Status = 'new' OR Tk.Status = 'open')
- ORDER BY Tk.Status, Tk.LastUpdated DESC""" \
- % (hostname,hostname)
-
- try:
- # create a 'cursor' (required by MySQLdb)
- c = db.cursor()
- c.execute(sql)
- except Exception, err:
- print "Could not execute RT query %s" %err
- return -1
+# sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject
+# FROM Tickets AS Tk
+# JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
+# JOIN Attachments AS At ON Tr.id=At.TransactionID
+# WHERE (At.Content LIKE '%%%s%%' OR
+# At.Subject LIKE '%%%s%%') AND
+# (Tk.Status = 'new' OR Tk.Status = 'open')
+# ORDER BY Tk.Status, Tk.LastUpdated DESC""" \
+# % (hostname,hostname)
- # fetch all rows (list of lists)
- raw = c.fetchall()
+ # Queue == 10 is the spam Queue in RT.
+# SELECT Tk.* FROM Tickets AS Tk, Attachments AS At JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId WHERE Tk.Queue != 10 AND Tk.id > 10000 AND Tr.id=At.TransactionID AND Tk.Status = 'open' ;
+#
+
+ sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject, At.Content
+ FROM Tickets AS Tk, Attachments AS At
+ JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
+ WHERE Tk.Queue != 10 AND Tk.id > 10000 AND
+ Tr.id=At.TransactionID AND Tk.Status = 'open'"""
+ #Tr.id=At.TransactionID AND (Tk.Status = 'new' OR Tk.Status = 'open')"""
+ #sqlall = """SELECT distinct Tk.id, Tk.Status, Tk.Subject, At.Content
+#FROM Tickets AS Tk, Attachments AS At
+#JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
+#WHERE Tk.Queue != 10 AND Tk.id > 10000 AND
+#Tr.id=At.TransactionID AND ( Tk.Status = 'open' OR
+#Tk.Status = 'new') """
+ sqlall = """SELECT distinct Tk.id, Tk.Status, Tk.Subject, At.Content, Us.EmailAddress, Tk.LastUpdated, Q.Name, Tk.Owner FROM Tickets AS Tk, Attachments AS At, Queues as Q, Users as Us JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId WHERE (Tk.Queue=3 OR Tk.Queue=22) AND Tk.id > 10000 AND Tr.id=At.TransactionID AND ( Tk.Status = 'open' OR Tk.Status = 'new') AND Us.id=Tk.LastUpdatedBy AND Q.id=Tk.Queue """
+
+
+ raw = fetch_from_db(db, sql)
+ if raw == -1:
+ return raw
+ tickets = map(lambda x: {"ticket_id":str(x[0]),
+ "status":x[1],
+ "subj":str(x[2]),
+ "content":str(x[3])},
+ raw)
- # map list of lists (raw) to list of dicts (tickets)
- # when int gets pulls from SQL into python ints are converted to LONG to
- # prevent overflow .. convert back
- tickets = map(lambda x: {"ticket_id":int(x[0]),
+ raw = fetch_from_db(db,sqlall)
+ if raw == -1:
+ return raw
+ tickets_all = map(lambda x: {"ticket_id":str(x[0]),
"status":x[1],
- "subj":x[2]},
+ "subj":str(x[2]),
+ "content":str(x[3]),
+ "email":str(x[4]),
+ "lastupdated":str(x[5]),
+ "queue":str(x[6]),
+ "owner":str(x[7]),
+ },
raw)
+
db.close()
+ idTickets = {}
+ for t in tickets_all:
+ idTickets[t['ticket_id']] = t
+ database.dbDump("idTickets", idTickets)
+
return tickets
+def is_host_in_rt_tickets(host, ticket_blacklist, ad_rt_tickets):
+ # ad_rt_tickets is an array of dicts, defined above.
+ if len(ad_rt_tickets) == 0:
+ return (False, None)
+
+ d_ticket = ad_rt_tickets[0]
+ if not ('ticket_id' in d_ticket and 'status' in d_ticket and
+ 'subj' in d_ticket and 'content' in d_ticket):
+ logger.debug("RT_tickets array has wrong fields!!!")
+ return (False, None)
+
+ #logger.debug("Searching all tickets for %s" % host)
+ def search_tickets(host, ad_rt_tickets):
+ # compile once for more efficiency
+ re_host = re.compile(host)
+ for x in ad_rt_tickets:
+ if re_host.search(x['subj'], re.MULTILINE|re.IGNORECASE) or \
+ re_host.search(x['content'], re.MULTILINE|re.IGNORECASE):
+ logger.debug("\t ticket %s has %s" % (x['ticket_id'], host))
+ print "\t ticket %s has %s" % (x['ticket_id'], host)
+ if x['ticket_id'] in ticket_blacklist:
+ return (False, x)
+ else:
+ return (True, x)
+ print "\t noticket -- has %s" % host
+ #logger.debug("\t noticket -- has %s" % host)
+ return (False, None)
+
+ # This search, while O(tickets), takes less than a millisecond, 05-25-07
+ #t = commands.MyTimer()
+ ret = search_tickets(host, ad_rt_tickets)
+ #del t
+
+ return ret
+
'''
Finds tickets associated with hostnames.
Another thread refresh tickets of nodes already in dict and remove nodes that have come up.
'''
class RT(Thread):
- def __init__(self, tickets, toCheck, sickNoTicket, target = None):
+ def __init__(self, dbTickets, q_toRT, q_fromRT, l_ticket_blacklist, target = None):
# Time of last update of ticket DB
+ self.dbTickets = dbTickets
self.lastupdated = 0
- # Queue() is MP/MC self locking.
- # Check host in queue. Queue populated from comon data of sick.
- self.toCheck = toCheck
- # Result of rt db query. Nodes without tickets that are sick.
- self.sickNoTicket = sickNoTicket
- #DB of tickets. Name -> ticket
- self.tickets = tickets
+ self.l_ticket_blacklist = l_ticket_blacklist
+ self.q_toRT = q_toRT
+ self.q_fromRT = q_fromRT
+ self.tickets = {}
Thread.__init__(self,target = self.getTickets)
- # Takes node from toCheck, gets tickets.
+ # Takes node from q_toRT, gets tickets.
# Thread that actually gets the tickets.
def getTickets(self):
+ self.count = 0
while 1:
- host = self.toCheck.get(block = True)
- if host == "None": break
- #if self.tickets.has_key(host) == False:
- #logger.debug("Popping from q - %s" %host)
- tmp = rt_tickets(host)
- if tmp:
- #logger.debug("RT: tickets for %s" %host)
- self.tickets[host] = tmp
+ diag_node = self.q_toRT.get(block = True)
+ if diag_node != None:
+ host = diag_node['nodename']
+ (b_host_inticket, r_ticket) = is_host_in_rt_tickets(host, \
+ self.l_ticket_blacklist, \
+ self.dbTickets)
+ diag_node['found_rt_ticket'] = None
+ if b_host_inticket:
+ logger.debug("RT: found tickets for %s" %host)
+ diag_node['found_rt_ticket'] = r_ticket['ticket_id']
+
+ else:
+ if r_ticket is not None:
+ print "Ignoring ticket %s" % r_ticket['ticket_id']
+ # TODO: why do i return the ticket id for a
+ # blacklisted ticket id?
+ #diag_node['found_rt_ticket'] = r_ticket['ticket_id']
+ self.count = self.count + 1
+
+ self.q_fromRT.put(diag_node)
else:
- logger.debug("RT: no tix for %s" %host)
- self.sickNoTicket.put(host)
+ print "RT processed %d nodes with noticket" % self.count
+ logger.debug("RT filtered %d noticket nodes" % self.count)
+ self.q_fromRT.put(None)
+
+ break
# Removes hosts that are no longer down.
def remTickets(self):
prevdown = self.tickets.keys()
currdown = []
- #BEGIN HACK. This should be outside of this file. passed to class.
- cmn = comon.Comon(None, None)
- cmn.updatebkts()
- for bucket in cmn.comonbkts.keys():
- for host in getattr(cmn,bucket):
- if host not in currdown: currdown.append(host)
- #END HACK
+ ##BEGIN HACK. This should be outside of this file. passed to class.
+ #cmn = comon.Comon(None, None)
+ # cmn.updatebkts()
+ #for bucket in cmn.comonbkts.keys():
+ # for host in getattr(cmn,bucket):
+ # if host not in currdown: currdown.append(host)
+ ##END HACK
# Actually do the comparison
- for host in prevdown:
- if host not in currdown:
- del self.tickets[host]
- logger.info("RT: %s no longer down." % host)
+ #for host in prevdown:
+ # if host not in currdown:
+ # del self.tickets[host]
+ # logger.info("RT: %s no longer down." % host)
# Update Tickets
def updateTickets(self):
logger.info("Refreshing DB.")
for host in self.tickets.keys():
# Put back in Q to refresh
- self.toCheck.put(host)
+ self.q_toRT.put(host)
def cleanTickets(self):
while 1:
ch.setFormatter(formatter)
logger.addHandler(ch)
- bucket = Queue.Queue()
- tickets = {}
- a = RT(tickets, bucket)
- b = RT(tickets, bucket)
- c = RT(tickets, bucket)
- d = RT(tickets, bucket)
- e = RT(tickets, bucket)
- a.start()
- b.start()
- c.start()
- d.start()
- tmp = ('planetlab-1.cs.ucy.ac.cy','planetlab-2.vuse.vanderbilt.edu', 'planetlab-11.cs.princeton.edu', 'planet03.csc.ncsu.edu', 'planetlab1.pop-rj.rnp.br', 'planet1.halifax.canet4.nodes.planet-lab.org', 'planet1.cavite.nodes.planet-lab.org', 'ds-pl3.technion.ac.il', 'planetlab2.cs.purdue.edu', 'planetlab3.millennium.berkeley.edu', 'planetlab1.unl.edu', 'planetlab1.cs.colorado.edu', 'planetlab02.cs.washington.edu', 'orbpl2.rutgers.edu', 'planetlab2.informatik.uni-erlangen.de', 'pl2.ernet.in', 'neu2.6planetlab.edu.cn', 'planetlab-2.cs.uni-paderborn.de', 'planetlab1.elet.polimi.it', 'planetlab2.iiitb.ac.in', 'server1.planetlab.iit-tech.net', 'planetlab2.iitb.ac.in', 'planetlab1.ece.ucdavis.edu', 'planetlab02.dis.unina.it', 'planetlab-1.dis.uniroma1.it', 'planetlab1.iitb.ac.in', 'pku1.6planetlab.edu.cn', 'planetlab1.warsaw.rd.tp.pl', 'planetlab2.cs.unc.edu', 'csu2.6planetlab.edu.cn', 'pl1.ernet.in', 'planetlab2.georgetown.edu', 'planetlab1.cs.uchicago.edu')
- for host in tmp:
- bucket.put(host)
- #et = Thread(target=e.pushHosts)
- #et.start()
- time.sleep(15)
- print tickets.keys()
- time.sleep(15)
- print tickets.keys()
- time.sleep(15)
- print tickets.keys()
- #cmn = comon.Comon(cdb, bucket)
- #cmn.updatebkts()
- #for bucket in cmn.comonbkts.keys():
-# for host in getattr(cmn,bucket):
-# alldown.put(host)
-#
- at = Thread(target=a.cleanTickets)
- at.start()
- time.sleep(15)
- print tickets.keys()
- os._exit(0)
+ tickets = rt_tickets()
+ database.dbDump("ad_dbTickets", tickets)
+
if __name__ == '__main__':
main()