+ updated tech guide url to go directly to NodeInstallation
[monitor.git] / rt.py
1 #!/usr/bin/python2
2
3 import os, sys, shutil
4 import MySQLdb
5 import string
6 import logging
7 import Queue
8 import time 
9 import re
10 import comon
11 import soltesz
12 from threading import *
13 import config
14
15 # RT database access constants file
16 RT_DB_CONSTANTS_PATH='/etc/planetlab/rt_db'
17
18 #Logging
19 logger = logging.getLogger("monitor")
20
21 # seconds between ticket update
22 RTSLEEP = 7200
23
24 def stripQuotes( str ):
25         quotes= ["'",'"']
26         if str[0] in quotes:
27                 str= str[1:]
28         if str[len(str)-1] in quotes:
29                 str= str[:len(str)-1]
30         return str
31
32
33 def readConstantsFile( file_path ):
34         """
35         read a file consisting of lines of
36         NAME=VALUE
37         NAME='VALUE'
38         NAME="VALUE"
39         and return a dictionary of the values.
40
41         blank lines, and lines starting with # (comments) are skipped
42         """
43
44         contents= {}
45
46         try:
47                 input_file= file(file_path,"r")
48         except IOError, err:
49                 return None
50
51         for line in input_file:
52                 if line[0] == "#":
53                         continue
54                 line= string.strip(line)
55                 if len(line) == 0:
56                         continue
57
58                 parts= string.split(line,"=",)
59                 if len(parts) <> 2:
60                         continue
61
62                 contents[parts[0]]= stripQuotes(parts[1])
63
64         return contents
65
66
67
68 def open_rt_db():
69
70         # read plc database passwords and connect
71         rt_db_constants= readConstantsFile(RT_DB_CONSTANTS_PATH)
72         if rt_db_constants is None:
73                 print "Unable to read database access constants from %s" % \
74                           RT_DB_CONSTANTS_PATH
75                 return -1
76
77         try:
78                 rt_db = MySQLdb.connect(host=rt_db_constants['RT_DB_HOST'],
79                                 user=rt_db_constants['RT_DB_USER'],
80                                 passwd=rt_db_constants['RT_DB_PASSWORD'],
81                                 db=rt_db_constants['RT_DB_NAME'])
82         except Exception, err:
83                 print "Failed to connect to RT database: %s" %err
84                 return -1
85
86         return rt_db
87
88
89
90
91 def rt_tickets():
92         db = open_rt_db()
93 #       sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject
94 #                        FROM Tickets AS Tk
95 #                        JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
96 #                        JOIN Attachments AS At ON Tr.id=At.TransactionID
97 #                        WHERE (At.Content LIKE '%%%s%%' OR
98 #                               At.Subject LIKE '%%%s%%') AND
99 #                               (Tk.Status = 'new' OR Tk.Status = 'open') AND
100 #                               Tk.Queue = 3 OR Tk.Queue = 19 
101 #                        ORDER BY Tk.Status, Tk.LastUpdated DESC""" \
102 #                        % (hostname,hostname)
103 #       sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject
104 #                        FROM Tickets AS Tk
105 #                        JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
106 #                        JOIN Attachments AS At ON Tr.id=At.TransactionID
107 #                        WHERE (At.Content LIKE '%%%s%%' OR
108 #                               At.Subject LIKE '%%%s%%') AND
109 #                               (Tk.Status = 'new' OR Tk.Status = 'open')
110 #                        ORDER BY Tk.Status, Tk.LastUpdated DESC""" \
111 #                        % (hostname,hostname)
112
113         # Queue == 10 is the spam Queue in RT.
114         sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject, At.Content
115                          FROM Tickets AS Tk, Attachments AS At 
116                          JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId  
117                          WHERE Tk.Queue != 10 AND Tk.id > 10000 AND 
118                                    Tr.id=At.TransactionID AND (Tk.Status = 'new' OR Tk.Status = 'open')"""
119
120         try:
121                 # create a 'cursor' (required by MySQLdb)
122                 c = db.cursor()
123                 c.execute(sql)
124         except Exception, err:
125                 print "Could not execute RT query %s" %err
126                 return -1
127
128         # fetch all rows (list of lists)
129         raw = c.fetchall()
130
131         # map list of lists (raw) to list of dicts (tickets) 
132         # when int gets pulls from SQL into python ints are converted to LONG to
133         # prevent overflow .. convert back
134         tickets = map(lambda x: {"ticket_id":int(x[0]),
135                                 "status":x[1],
136                                 "subj":str(x[2]),
137                                 "content":str(x[3])},
138                                 raw)
139         db.close()
140
141         return tickets
142
143 def is_host_in_rt_tickets(host, ad_rt_tickets):
144         # ad_rt_tickets is an array of dicts, defined above.
145         if len(ad_rt_tickets) == 0:
146                 return (False, None)
147         
148         d_ticket = ad_rt_tickets[0]
149         if not ('ticket_id' in d_ticket and 'status' in d_ticket and 
150                         'subj' in d_ticket and 'content' in d_ticket):
151                 logger.debug("RT_tickets array has wrong fields!!!")
152                 return (False, None)
153
154         #logger.debug("Searching all tickets for %s" % host)
155         def search_tickets(host, ad_rt_tickets):
156                 # compile once for more efficiency
157                 re_host = re.compile(host)
158                 for x in ad_rt_tickets:
159                         if re_host.search(x['subj'], re.MULTILINE|re.IGNORECASE) or \
160                            re_host.search(x['content'], re.MULTILINE|re.IGNORECASE):
161                                 logger.debug("\t ticket %d has %s" % (x['ticket_id'], host))
162                                 return (True, x)
163                 logger.debug("\t noticket -- has %s" % host)
164                 return (False, None)
165
166         # This search, while O(tickets), takes less than a millisecond, 05-25-07
167         #t = soltesz.MyTimer()
168         ret = search_tickets(host, ad_rt_tickets)
169         #del t
170
171         return ret
172
173
174 '''
175 Finds tickets associated with hostnames.
176 The idea is if you give it an array of host names,
177 presumeably from comon's list of bad nodes, it starts
178 a few threads to query RT.  RT takes a while to return.
179
180 This is turning into a reinvention of DB design, which I dont believe in.
181 In an effort to keep things minimal, here's the basic algo:
182
183 Give list of hostnames to RT()
184 Finds tickets associate with new hostnames (not in dict(tickets)).
185 Remove nodes that have come backup. Don't care of ticket is closed after first query.
186 Another thread refresh tickets of nodes already in dict and remove nodes that have come up. 
187 '''
188 class RT(Thread):
189         def __init__(self, dbTickets, tickets, qin_toCheck, qout_sickNoTicket, target = None): 
190                 # Time of last update of ticket DB
191                 self.dbTickets = dbTickets
192                 self.lastupdated = 0
193                 # Check host in queue.  Queue populated from comon data of sick. 
194                 self.qin_toCheck = qin_toCheck
195                 # Result of rt db query.  Nodes without tickets that are sick.
196                 self.qout_sickNoTicket = qout_sickNoTicket 
197                 #DB of tickets.  Name -> ticket
198                 self.tickets = tickets
199                 Thread.__init__(self,target = self.getTickets)
200
201         # Takes node from qin_toCheck, gets tickets.  
202         # Thread that actually gets the tickets.
203         def getTickets(self):
204                 self.count = 0
205                 while 1:
206                         diag_node = self.qin_toCheck.get(block = True)
207                         if diag_node == "None": 
208                                 print "RT processed %d nodes with noticket" % self.count
209                                 logger.debug("RT filtered %d noticket nodes" % self.count)
210                                 self.qout_sickNoTicket.put("None")
211                                 break
212                         else:
213                                 host = diag_node['nodename']
214                                 (b_host_inticket, r_ticket) = is_host_in_rt_tickets(host, self.dbTickets)
215                                 if b_host_inticket:
216                                         logger.debug("RT: found tickets for %s" %host)
217                                         diag_node['stage'] = 'stage_rt_working'
218                                         diag_node['ticket_id'] = r_ticket['ticket_id']
219                                         self.tickets[host] = r_ticket
220                                 else:
221                                         #logger.debug("RT: no tix for %s" %host)
222                                         #print "no tix for %s" % host
223                                         self.count = self.count + 1
224
225                                 # process diag_node for either case
226                                 self.qout_sickNoTicket.put(diag_node) 
227
228         # Removes hosts that are no longer down.
229         def remTickets(self):
230                 logger.debug("Removing stale entries from DB.") 
231                 prevdown = self.tickets.keys()
232
233                 currdown = []
234                 #BEGIN HACK.  This should be outside of this file. passed to class.
235                 cmn = comon.Comon(None, None)
236                 cmn.updatebkts()
237                 for bucket in cmn.comonbkts.keys():
238                         for host in getattr(cmn,bucket):
239                                 if host not in currdown: currdown.append(host)
240                 #END HACK
241
242                 # Actually do the comparison
243                 for host in prevdown:
244                         if host not in currdown:
245                                 del self.tickets[host]
246                                 logger.info("RT: %s no longer down." % host)
247
248         # Update Tickets
249         def updateTickets(self):
250                 logger.info("Refreshing DB.")
251                 for host in self.tickets.keys():
252                         # Put back in Q to refresh
253                         self.qin_toCheck.put(host)
254
255         def cleanTickets(self):
256                 while 1:
257                         self.remTickets()
258                         self.updateTickets()
259                         time.sleep(RTSLEEP)
260         
261 def main():
262         logger.setLevel(logging.DEBUG)
263         ch = logging.StreamHandler()
264         ch.setLevel(logging.DEBUG)
265         formatter = logging.Formatter('%(message)s')
266         ch.setFormatter(formatter)
267         logger.addHandler(ch)
268
269         bucket = Queue.Queue() 
270         tickets = {}
271         a = RT(tickets, bucket)
272         b = RT(tickets, bucket)
273         c = RT(tickets, bucket)
274         d = RT(tickets, bucket)
275         e = RT(tickets, bucket)
276         a.start()
277         b.start()
278         c.start()
279         d.start()
280         tmp = ('planetlab-1.cs.ucy.ac.cy','planetlab-2.vuse.vanderbilt.edu', 'planetlab-11.cs.princeton.edu', 'planet03.csc.ncsu.edu', 'planetlab1.pop-rj.rnp.br', 'planet1.halifax.canet4.nodes.planet-lab.org', 'planet1.cavite.nodes.planet-lab.org', 'ds-pl3.technion.ac.il', 'planetlab2.cs.purdue.edu', 'planetlab3.millennium.berkeley.edu', 'planetlab1.unl.edu', 'planetlab1.cs.colorado.edu', 'planetlab02.cs.washington.edu', 'orbpl2.rutgers.edu', 'planetlab2.informatik.uni-erlangen.de', 'pl2.ernet.in', 'neu2.6planetlab.edu.cn', 'planetlab-2.cs.uni-paderborn.de', 'planetlab1.elet.polimi.it', 'planetlab2.iiitb.ac.in', 'server1.planetlab.iit-tech.net', 'planetlab2.iitb.ac.in', 'planetlab1.ece.ucdavis.edu', 'planetlab02.dis.unina.it', 'planetlab-1.dis.uniroma1.it', 'planetlab1.iitb.ac.in', 'pku1.6planetlab.edu.cn', 'planetlab1.warsaw.rd.tp.pl', 'planetlab2.cs.unc.edu', 'csu2.6planetlab.edu.cn', 'pl1.ernet.in', 'planetlab2.georgetown.edu', 'planetlab1.cs.uchicago.edu') 
281         for host in tmp:
282                 bucket.put(host)
283         #et = Thread(target=e.pushHosts)        
284         #et.start()
285         time.sleep(15)
286         print tickets.keys()
287         time.sleep(15)
288         print tickets.keys()
289         time.sleep(15)
290         print tickets.keys()
291         #cmn = comon.Comon(cdb, bucket)
292         #cmn.updatebkts()
293         #for bucket in cmn.comonbkts.keys():
294 #               for host in getattr(cmn,bucket):
295 #                       alldown.put(host)
296 #
297         at = Thread(target=a.cleanTickets)
298         at.start()
299         time.sleep(15)
300         print tickets.keys()
301         os._exit(0)
302
303 if __name__ == '__main__':
304     main()