10 from threading import *
13 # RT database access constants file
14 RT_DB_CONSTANTS_PATH='/etc/planetlab/rt_db'
17 logger = logging.getLogger("monitor")
19 # seconds between ticket update
22 def stripQuotes( str ):
26 if str[len(str)-1] in quotes:
31 def readConstantsFile( file_path ):
33 read a file consisting of lines of
37 and return a dictionary of the values.
39 blank lines, and lines starting with # (comments) are skipped
45 input_file= file(file_path,"r")
49 for line in input_file:
52 line= string.strip(line)
56 parts= string.split(line,"=",)
60 contents[parts[0]]= stripQuotes(parts[1])
68 # read plc database passwords and connect
69 rt_db_constants= readConstantsFile(RT_DB_CONSTANTS_PATH)
70 if rt_db_constants is None:
71 print "Unable to read database access constants from %s" % \
76 rt_db = MySQLdb.connect(host=rt_db_constants['RT_DB_HOST'],
77 user=rt_db_constants['RT_DB_USER'],
78 passwd=rt_db_constants['RT_DB_PASSWORD'],
79 db=rt_db_constants['RT_DB_NAME'])
80 except Exception, err:
81 print "Failed to connect to RT database: %s" %err
89 def rt_tickets(hostname):
91 # sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject
93 # JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
94 # JOIN Attachments AS At ON Tr.id=At.TransactionID
95 # WHERE (At.Content LIKE '%%%s%%' OR
96 # At.Subject LIKE '%%%s%%') AND
97 # (Tk.Status = 'new' OR Tk.Status = 'open') AND
98 # Tk.Queue = 3 OR Tk.Queue = 19
99 # ORDER BY Tk.Status, Tk.LastUpdated DESC""" \
100 # % (hostname,hostname)
101 sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject
103 JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
104 JOIN Attachments AS At ON Tr.id=At.TransactionID
105 WHERE (At.Content LIKE '%%%s%%' OR
106 At.Subject LIKE '%%%s%%') AND
107 (Tk.Status = 'new' OR Tk.Status = 'open')
108 ORDER BY Tk.Status, Tk.LastUpdated DESC""" \
109 % (hostname,hostname)
112 # create a 'cursor' (required by MySQLdb)
115 except Exception, err:
116 print "Could not execute RT query %s" %err
119 # fetch all rows (list of lists)
122 # map list of lists (raw) to list of dicts (tickets)
123 # when int gets pulls from SQL into python ints are converted to LONG to
124 # prevent overflow .. convert back
125 tickets = map(lambda x: {"ticket_id":int(x[0]),
135 Finds tickets associated with hostnames.
136 The idea is if you give it an array of host names,
137 presumeably from comon's list of bad nodes, it starts
138 a few threads to query RT. RT takes a while to return.
140 This is turning into a reinvention of DB design, which I dont believe in.
141 In an effort to keep things minimal, here's the basic algo:
143 Give list of hostnames to RT()
144 Finds tickets associate with new hostnames (not in dict(tickets)).
145 Remove nodes that have come backup. Don't care of ticket is closed after first query.
146 Another thread refresh tickets of nodes already in dict and remove nodes that have come up.
149 def __init__(self, tickets, toCheck, sickNoTicket, target = None):
150 # Time of last update of ticket DB
152 # Queue() is MP/MC self locking.
153 # Check host in queue. Queue populated from comon data of sick.
154 self.toCheck = toCheck
155 # Result of rt db query. Nodes without tickets that are sick.
156 self.sickNoTicket = sickNoTicket
157 #DB of tickets. Name -> ticket
158 self.tickets = tickets
159 Thread.__init__(self,target = self.getTickets)
161 # Takes node from toCheck, gets tickets.
162 # Thread that actually gets the tickets.
163 def getTickets(self):
165 host = self.toCheck.get(block = True)
166 if host == "None": break
167 #if self.tickets.has_key(host) == False:
168 #logger.debug("Popping from q - %s" %host)
169 tmp = rt_tickets(host)
171 #logger.debug("RT: tickets for %s" %host)
172 self.tickets[host] = tmp
174 logger.debug("RT: no tix for %s" %host)
175 self.sickNoTicket.put(host)
177 # Removes hosts that are no longer down.
178 def remTickets(self):
179 logger.debug("Removing stale entries from DB.")
180 prevdown = self.tickets.keys()
183 #BEGIN HACK. This should be outside of this file. passed to class.
184 cmn = comon.Comon(None, None)
186 for bucket in cmn.comonbkts.keys():
187 for host in getattr(cmn,bucket):
188 if host not in currdown: currdown.append(host)
191 # Actually do the comparison
192 for host in prevdown:
193 if host not in currdown:
194 del self.tickets[host]
195 logger.info("RT: %s no longer down." % host)
198 def updateTickets(self):
199 logger.info("Refreshing DB.")
200 for host in self.tickets.keys():
201 # Put back in Q to refresh
202 self.toCheck.put(host)
204 def cleanTickets(self):
211 logger.setLevel(logging.DEBUG)
212 ch = logging.StreamHandler()
213 ch.setLevel(logging.DEBUG)
214 formatter = logging.Formatter('%(message)s')
215 ch.setFormatter(formatter)
216 logger.addHandler(ch)
218 bucket = Queue.Queue()
220 a = RT(tickets, bucket)
221 b = RT(tickets, bucket)
222 c = RT(tickets, bucket)
223 d = RT(tickets, bucket)
224 e = RT(tickets, bucket)
229 tmp = ('planetlab-1.cs.ucy.ac.cy','planetlab-2.vuse.vanderbilt.edu', 'planetlab-11.cs.princeton.edu', 'planet03.csc.ncsu.edu', 'planetlab1.pop-rj.rnp.br', 'planet1.halifax.canet4.nodes.planet-lab.org', 'planet1.cavite.nodes.planet-lab.org', 'ds-pl3.technion.ac.il', 'planetlab2.cs.purdue.edu', 'planetlab3.millennium.berkeley.edu', 'planetlab1.unl.edu', 'planetlab1.cs.colorado.edu', 'planetlab02.cs.washington.edu', 'orbpl2.rutgers.edu', 'planetlab2.informatik.uni-erlangen.de', 'pl2.ernet.in', 'neu2.6planetlab.edu.cn', 'planetlab-2.cs.uni-paderborn.de', 'planetlab1.elet.polimi.it', 'planetlab2.iiitb.ac.in', 'server1.planetlab.iit-tech.net', 'planetlab2.iitb.ac.in', 'planetlab1.ece.ucdavis.edu', 'planetlab02.dis.unina.it', 'planetlab-1.dis.uniroma1.it', 'planetlab1.iitb.ac.in', 'pku1.6planetlab.edu.cn', 'planetlab1.warsaw.rd.tp.pl', 'planetlab2.cs.unc.edu', 'csu2.6planetlab.edu.cn', 'pl1.ernet.in', 'planetlab2.georgetown.edu', 'planetlab1.cs.uchicago.edu')
232 #et = Thread(target=e.pushHosts)
240 #cmn = comon.Comon(cdb, bucket)
242 #for bucket in cmn.comonbkts.keys():
243 # for host in getattr(cmn,bucket):
246 at = Thread(target=a.cleanTickets)
252 if __name__ == '__main__':