10 from threading import *
12 # RT database access constants file
13 RT_DB_CONSTANTS_PATH='/etc/planetlab/rt_db'
16 logger = logging.getLogger("monitor")
18 # seconds between ticket update
21 def stripQuotes( str ):
25 if str[len(str)-1] in quotes:
30 def readConstantsFile( file_path ):
32 read a file consisting of lines of
36 and return a dictionary of the values.
38 blank lines, and lines starting with # (comments) are skipped
44 input_file= file(file_path,"r")
48 for line in input_file:
51 line= string.strip(line)
55 parts= string.split(line,"=",)
59 contents[parts[0]]= stripQuotes(parts[1])
67 # read plc database passwords and connect
68 rt_db_constants= readConstantsFile(RT_DB_CONSTANTS_PATH)
69 if rt_db_constants is None:
70 print "Unable to read database access constants from %s" % \
75 rt_db = MySQLdb.connect(host=rt_db_constants['RT_DB_HOST'],
76 user=rt_db_constants['RT_DB_USER'],
77 passwd=rt_db_constants['RT_DB_PASSWORD'],
78 db=rt_db_constants['RT_DB_NAME'])
80 print "Failed to connect to RT database"
88 def rt_tickets(hostname):
90 # sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject
92 # JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
93 # JOIN Attachments AS At ON Tr.id=At.TransactionID
94 # WHERE (At.Content LIKE '%%%s%%' OR
95 # At.Subject LIKE '%%%s%%') AND
96 # (Tk.Status = 'new' OR Tk.Status = 'open') AND
97 # Tk.Queue = 3 OR Tk.Queue = 19
98 # ORDER BY Tk.Status, Tk.LastUpdated DESC""" \
99 # % (hostname,hostname)
100 sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject
102 JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
103 JOIN Attachments AS At ON Tr.id=At.TransactionID
104 WHERE (At.Content LIKE '%%%s%%' OR
105 At.Subject LIKE '%%%s%%') AND
106 (Tk.Status = 'new' OR Tk.Status = 'open')
107 ORDER BY Tk.Status, Tk.LastUpdated DESC""" \
108 % (hostname,hostname)
111 # create a 'cursor' (required by MySQLdb)
114 except Exception, err:
115 print "Could not execute RT query %s" %err
118 # fetch all rows (list of lists)
121 # map list of lists (raw) to list of dicts (tickets)
122 # when int gets pulls from SQL into python ints are converted to LONG to
123 # prevent overflow .. convert back
124 tickets = map(lambda x: {"ticket_id":int(x[0]),
134 Finds tickets associated with hostnames.
135 The idea is if you give it an array of host names,
136 presumeably from comon's list of bad nodes, it starts
137 a few threads to query RT. RT takes a while to return.
139 This is turning into a reinvention of DB design, which I dont believe in.
140 In an effort to keep things minimal, here's the basic algo:
142 Give list of hostnames to RT()
143 Finds tickets associate with new hostnames (not in dict(tickets)).
144 Remove nodes that have come backup. Don't care of ticket is closed after first query.
145 Another thread refresh tickets of nodes already in dict and remove nodes that have come up.
148 def __init__(self, tickets, toCheck, sickNoTicket, target = None):
149 # Time of last update of ticket DB
151 # Queue() is MP/MC self locking.
152 # Check host in queue. Queue populated from comon data of sick.
153 self.toCheck = toCheck
154 # Result of rt db query. Nodes without tickets that are sick.
155 self.sickNoTicket = sickNoTicket
156 #DB of tickets. Name -> ticket
157 self.tickets = tickets
158 Thread.__init__(self,target = self.getTickets)
160 # Takes node from toCheck, gets tickets.
161 # Thread that actually gets the tickets.
162 def getTickets(self):
164 host = self.toCheck.get(block = True)
165 if host == "None": break
166 #if self.tickets.has_key(host) == False:
167 #logger.debug("Popping from q - %s" %host)
168 tmp = rt_tickets(host)
170 #logger.debug("RT: tickets for %s" %host)
171 self.tickets[host] = tmp
173 logger.debug("RT: no tix for %s - policy" %host)
174 self.sickNoTicket.put(host)
176 # Removes hosts that are no longer down.
177 def remTickets(self):
178 logger.debug("Removing stale entries from DB.")
179 prevdown = self.tickets.keys()
182 #BEGIN HACK. This should be outside of this file. passed to class.
183 cmn = comon.Comon(None, None)
185 for bucket in cmn.comonbkts.keys():
186 for host in getattr(cmn,bucket):
187 if host not in currdown: currdown.append(host)
190 # Actually do the comparison
191 for host in prevdown:
192 if host not in currdown:
193 del self.tickets[host]
194 logger.info("RT: %s no longer down." % host)
197 def updateTickets(self):
198 logger.info("Refreshing DB.")
199 for host in self.tickets.keys():
200 # Put back in Q to refresh
201 self.toCheck.put(host)
203 def cleanTickets(self):
210 logger.setLevel(logging.DEBUG)
211 ch = logging.StreamHandler()
212 ch.setLevel(logging.DEBUG)
213 formatter = logging.Formatter('%(message)s')
214 ch.setFormatter(formatter)
215 logger.addHandler(ch)
217 bucket = Queue.Queue()
219 a = RT(tickets, bucket)
220 b = RT(tickets, bucket)
221 c = RT(tickets, bucket)
222 d = RT(tickets, bucket)
223 e = RT(tickets, bucket)
228 tmp = ('planetlab-1.cs.ucy.ac.cy','planetlab-2.vuse.vanderbilt.edu', 'planetlab-11.cs.princeton.edu', 'planet03.csc.ncsu.edu', 'planetlab1.pop-rj.rnp.br', 'planet1.halifax.canet4.nodes.planet-lab.org', 'planet1.cavite.nodes.planet-lab.org', 'ds-pl3.technion.ac.il', 'planetlab2.cs.purdue.edu', 'planetlab3.millennium.berkeley.edu', 'planetlab1.unl.edu', 'planetlab1.cs.colorado.edu', 'planetlab02.cs.washington.edu', 'orbpl2.rutgers.edu', 'planetlab2.informatik.uni-erlangen.de', 'pl2.ernet.in', 'neu2.6planetlab.edu.cn', 'planetlab-2.cs.uni-paderborn.de', 'planetlab1.elet.polimi.it', 'planetlab2.iiitb.ac.in', 'server1.planetlab.iit-tech.net', 'planetlab2.iitb.ac.in', 'planetlab1.ece.ucdavis.edu', 'planetlab02.dis.unina.it', 'planetlab-1.dis.uniroma1.it', 'planetlab1.iitb.ac.in', 'pku1.6planetlab.edu.cn', 'planetlab1.warsaw.rd.tp.pl', 'planetlab2.cs.unc.edu', 'csu2.6planetlab.edu.cn', 'pl1.ernet.in', 'planetlab2.georgetown.edu', 'planetlab1.cs.uchicago.edu')
231 #et = Thread(target=e.pushHosts)
239 #cmn = comon.Comon(cdb, bucket)
241 #for bucket in cmn.comonbkts.keys():
242 # for host in getattr(cmn,bucket):
245 at = Thread(target=a.cleanTickets)
251 if __name__ == '__main__':