+ some cleanup. some dirtying.
[monitor.git] / rt.py
1 #!/usr/bin/python2
2
3 import os, sys, shutil
4 import MySQLdb
5 import string
6 import logging
7 import Queue
8 import time 
9 import re
10 import comon
11 import soltesz
12 from threading import *
13
14 # TODO: merge the RT mailer from mailer.py into this file.
15
16 # RT database access constants file
17 RT_DB_CONSTANTS_PATH='/etc/planetlab/rt_db'
18
19 #Logging
20 logger = logging.getLogger("monitor")
21
22 # seconds between ticket update
23 RTSLEEP = 7200
24
25 def stripQuotes( str ):
26         quotes= ["'",'"']
27         if str[0] in quotes:
28                 str= str[1:]
29         if str[len(str)-1] in quotes:
30                 str= str[:len(str)-1]
31         return str
32
33
34 def readConstantsFile( file_path ):
35         """
36         read a file consisting of lines of
37         NAME=VALUE
38         NAME='VALUE'
39         NAME="VALUE"
40         and return a dictionary of the values.
41
42         blank lines, and lines starting with # (comments) are skipped
43         """
44
45         contents= {}
46
47         try:
48                 input_file= file(file_path,"r")
49         except IOError, err:
50                 return None
51
52         for line in input_file:
53                 if line[0] == "#":
54                         continue
55                 line= string.strip(line)
56                 if len(line) == 0:
57                         continue
58
59                 parts= string.split(line,"=",)
60                 if len(parts) <> 2:
61                         continue
62
63                 contents[parts[0]]= stripQuotes(parts[1])
64
65         return contents
66
67
68
69 def open_rt_db():
70
71         # read plc database passwords and connect
72         rt_db_constants= readConstantsFile(RT_DB_CONSTANTS_PATH)
73         if rt_db_constants is None:
74                 print "Unable to read database access constants from %s" % \
75                           RT_DB_CONSTANTS_PATH
76                 return -1
77
78         try:
79                 rt_db = MySQLdb.connect(host=rt_db_constants['RT_DB_HOST'],
80                                 user=rt_db_constants['RT_DB_USER'],
81                                 passwd=rt_db_constants['RT_DB_PASSWORD'],
82                                 db=rt_db_constants['RT_DB_NAME'])
83         except Exception, err:
84                 print "Failed to connect to RT database: %s" %err
85                 return -1
86
87         return rt_db
88
89
90
91
92 def rt_tickets():
93         db = open_rt_db()
94 #       sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject
95 #                        FROM Tickets AS Tk
96 #                        JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
97 #                        JOIN Attachments AS At ON Tr.id=At.TransactionID
98 #                        WHERE (At.Content LIKE '%%%s%%' OR
99 #                               At.Subject LIKE '%%%s%%') AND
100 #                               (Tk.Status = 'new' OR Tk.Status = 'open') AND
101 #                               Tk.Queue = 3 OR Tk.Queue = 19 
102 #                        ORDER BY Tk.Status, Tk.LastUpdated DESC""" \
103 #                        % (hostname,hostname)
104 #       sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject
105 #                        FROM Tickets AS Tk
106 #                        JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
107 #                        JOIN Attachments AS At ON Tr.id=At.TransactionID
108 #                        WHERE (At.Content LIKE '%%%s%%' OR
109 #                               At.Subject LIKE '%%%s%%') AND
110 #                               (Tk.Status = 'new' OR Tk.Status = 'open')
111 #                        ORDER BY Tk.Status, Tk.LastUpdated DESC""" \
112 #                        % (hostname,hostname)
113
114         # Queue == 10 is the spam Queue in RT.
115         sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject, At.Content
116                          FROM Tickets AS Tk, Attachments AS At 
117                          JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId  
118                          WHERE Tk.Queue != 10 AND Tk.id > 10000 AND 
119                                    Tr.id=At.TransactionID AND Tk.Status = 'open'"""
120                                    #Tr.id=At.TransactionID AND (Tk.Status = 'new' OR Tk.Status = 'open')"""
121
122         try:
123                 # create a 'cursor' (required by MySQLdb)
124                 c = db.cursor()
125                 c.execute(sql)
126         except Exception, err:
127                 print "Could not execute RT query %s" %err
128                 return -1
129
130         # fetch all rows (list of lists)
131         raw = c.fetchall()
132
133         # map list of lists (raw) to list of dicts (tickets) 
134         # when int gets pulls from SQL into python ints are converted to LONG to
135         # prevent overflow .. convert back
136         #tickets = map(lambda x: {"ticket_id":int(x[0]),
137         tickets = map(lambda x: {"ticket_id":str(x[0]),
138                                 "status":x[1],
139                                 "subj":str(x[2]),
140                                 "content":str(x[3])},
141                                 raw)
142         db.close()
143
144         return tickets
145
146 def is_host_in_rt_tickets(host, ticket_blacklist, ad_rt_tickets):
147         # ad_rt_tickets is an array of dicts, defined above.
148         if len(ad_rt_tickets) == 0:
149                 return (False, None)
150         
151         d_ticket = ad_rt_tickets[0]
152         if not ('ticket_id' in d_ticket and 'status' in d_ticket and 
153                         'subj' in d_ticket and 'content' in d_ticket):
154                 logger.debug("RT_tickets array has wrong fields!!!")
155                 return (False, None)
156
157         #logger.debug("Searching all tickets for %s" % host)
158         def search_tickets(host, ad_rt_tickets):
159                 # compile once for more efficiency
160                 re_host = re.compile(host)
161                 for x in ad_rt_tickets:
162                         if re_host.search(x['subj'], re.MULTILINE|re.IGNORECASE) or \
163                            re_host.search(x['content'], re.MULTILINE|re.IGNORECASE):
164                                 logger.debug("\t ticket %s has %s" % (x['ticket_id'], host))
165                                 print "\t ticket %s has %s" % (x['ticket_id'], host)
166                                 if x['ticket_id'] in ticket_blacklist:
167                                         return (False, x)
168                                 else:
169                                         return (True, x)
170                 print "\t noticket -- has %s" % host
171                 #logger.debug("\t noticket -- has %s" % host)
172                 return (False, None)
173
174         # This search, while O(tickets), takes less than a millisecond, 05-25-07
175         #t = soltesz.MyTimer()
176         ret = search_tickets(host, ad_rt_tickets)
177         #del t
178
179         return ret
180
181
182 '''
183 Finds tickets associated with hostnames.
184 The idea is if you give it an array of host names,
185 presumeably from comon's list of bad nodes, it starts
186 a few threads to query RT.  RT takes a while to return.
187
188 This is turning into a reinvention of DB design, which I dont believe in.
189 In an effort to keep things minimal, here's the basic algo:
190
191 Give list of hostnames to RT()
192 Finds tickets associate with new hostnames (not in dict(tickets)).
193 Remove nodes that have come backup. Don't care of ticket is closed after first query.
194 Another thread refresh tickets of nodes already in dict and remove nodes that have come up. 
195 '''
196 class RT(Thread):
197         def __init__(self, dbTickets, q_toRT, q_fromRT, l_ticket_blacklist, target = None): 
198                 # Time of last update of ticket DB
199                 self.dbTickets = dbTickets
200                 self.lastupdated = 0
201                 self.l_ticket_blacklist = l_ticket_blacklist
202                 self.q_toRT = q_toRT
203                 self.q_fromRT = q_fromRT 
204                 self.tickets = {}
205                 Thread.__init__(self,target = self.getTickets)
206
207         # Takes node from q_toRT, gets tickets.  
208         # Thread that actually gets the tickets.
209         def getTickets(self):
210                 self.count = 0
211                 while 1:
212                         diag_node = self.q_toRT.get(block = True)
213                         if diag_node != None: 
214                                 host = diag_node['nodename']
215                                 (b_host_inticket, r_ticket) = is_host_in_rt_tickets(host, \
216                                                                                                         self.l_ticket_blacklist, \
217                                                                                                         self.dbTickets)
218                                 diag_node['found_rt_ticket'] = None
219                                 if b_host_inticket:
220                                         logger.debug("RT: found tickets for %s" %host)
221                                         diag_node['found_rt_ticket'] = r_ticket['ticket_id']
222
223                                 else:
224                                         if r_ticket is not None:
225                                                 print "Ignoring ticket %s" % r_ticket['ticket_id']
226                                                 # TODO: why do i return the ticket id for a
227                                                 #               blacklisted ticket id?
228                                                 #diag_node['found_rt_ticket'] = r_ticket['ticket_id']
229                                         self.count = self.count + 1
230
231                                 self.q_fromRT.put(diag_node) 
232                         else:
233                                 print "RT processed %d nodes with noticket" % self.count
234                                 logger.debug("RT filtered %d noticket nodes" % self.count)
235                                 self.q_fromRT.put(None)
236
237                                 break
238
239         # Removes hosts that are no longer down.
240         def remTickets(self):
241                 logger.debug("Removing stale entries from DB.") 
242                 prevdown = self.tickets.keys()
243
244                 currdown = []
245                 ##BEGIN HACK.  This should be outside of this file. passed to class.
246                 #cmn = comon.Comon(None, None)
247         #       cmn.updatebkts()
248                 #for bucket in cmn.comonbkts.keys():
249                 #       for host in getattr(cmn,bucket):
250                 #               if host not in currdown: currdown.append(host)
251                 ##END HACK
252
253                 # Actually do the comparison
254                 #for host in prevdown:
255                 #       if host not in currdown:
256                 #               del self.tickets[host]
257                 #               logger.info("RT: %s no longer down." % host)
258
259         # Update Tickets
260         def updateTickets(self):
261                 logger.info("Refreshing DB.")
262                 for host in self.tickets.keys():
263                         # Put back in Q to refresh
264                         self.q_toRT.put(host)
265
266         def cleanTickets(self):
267                 while 1:
268                         self.remTickets()
269                         self.updateTickets()
270                         time.sleep(RTSLEEP)
271         
272 def main():
273         logger.setLevel(logging.DEBUG)
274         ch = logging.StreamHandler()
275         ch.setLevel(logging.DEBUG)
276         formatter = logging.Formatter('%(message)s')
277         ch.setFormatter(formatter)
278         logger.addHandler(ch)
279
280         bucket = Queue.Queue() 
281         tickets = {}
282         a = RT(tickets, bucket)
283         b = RT(tickets, bucket)
284         c = RT(tickets, bucket)
285         d = RT(tickets, bucket)
286         e = RT(tickets, bucket)
287         a.start()
288         b.start()
289         c.start()
290         d.start()
291         tmp = ('planetlab-1.cs.ucy.ac.cy','planetlab-2.vuse.vanderbilt.edu', 'planetlab-11.cs.princeton.edu', 'planet03.csc.ncsu.edu', 'planetlab1.pop-rj.rnp.br', 'planet1.halifax.canet4.nodes.planet-lab.org', 'planet1.cavite.nodes.planet-lab.org', 'ds-pl3.technion.ac.il', 'planetlab2.cs.purdue.edu', 'planetlab3.millennium.berkeley.edu', 'planetlab1.unl.edu', 'planetlab1.cs.colorado.edu', 'planetlab02.cs.washington.edu', 'orbpl2.rutgers.edu', 'planetlab2.informatik.uni-erlangen.de', 'pl2.ernet.in', 'neu2.6planetlab.edu.cn', 'planetlab-2.cs.uni-paderborn.de', 'planetlab1.elet.polimi.it', 'planetlab2.iiitb.ac.in', 'server1.planetlab.iit-tech.net', 'planetlab2.iitb.ac.in', 'planetlab1.ece.ucdavis.edu', 'planetlab02.dis.unina.it', 'planetlab-1.dis.uniroma1.it', 'planetlab1.iitb.ac.in', 'pku1.6planetlab.edu.cn', 'planetlab1.warsaw.rd.tp.pl', 'planetlab2.cs.unc.edu', 'csu2.6planetlab.edu.cn', 'pl1.ernet.in', 'planetlab2.georgetown.edu', 'planetlab1.cs.uchicago.edu') 
292         for host in tmp:
293                 bucket.put(host)
294         #et = Thread(target=e.pushHosts)        
295         #et.start()
296         time.sleep(15)
297         print tickets.keys()
298         time.sleep(15)
299         print tickets.keys()
300         time.sleep(15)
301         print tickets.keys()
302         #cmn = comon.Comon(cdb, bucket)
303         #cmn.updatebkts()
304         #for bucket in cmn.comonbkts.keys():
305 #               for host in getattr(cmn,bucket):
306 #                       alldown.put(host)
307 #
308         at = Thread(target=a.cleanTickets)
309         at.start()
310         time.sleep(15)
311         print tickets.keys()
312         os._exit(0)
313
314 if __name__ == '__main__':
315     main()