Uses CoMon's ability to find 'upness' to email. Changed queueing between threads...
[monitor.git] / rt.py
1 #!/usr/bin/python2
2
3 import os, sys, shutil
4 import MySQLdb
5 import string
6 import logging
7 import Queue
8 import time 
9 import comon
10 from threading import *
11
12 # RT database access constants file
13 RT_DB_CONSTANTS_PATH='/etc/planetlab/rt_db'
14
15 #Logging
16 logger = logging.getLogger("monitor")
17
18 # seconds between ticket update
19 RTSLEEP = 7200
20
21 def stripQuotes( str ):
22         quotes= ["'",'"']
23         if str[0] in quotes:
24                 str= str[1:]
25         if str[len(str)-1] in quotes:
26                 str= str[:len(str)-1]
27         return str
28
29
30 def readConstantsFile( file_path ):
31         """
32         read a file consisting of lines of
33         NAME=VALUE
34         NAME='VALUE'
35         NAME="VALUE"
36         and return a dictionary of the values.
37
38         blank lines, and lines starting with # (comments) are skipped
39         """
40
41         contents= {}
42
43         try:
44                 input_file= file(file_path,"r")
45         except IOError, err:
46                 return None
47
48         for line in input_file:
49                 if line[0] == "#":
50                         continue
51                 line= string.strip(line)
52                 if len(line) == 0:
53                         continue
54
55                 parts= string.split(line,"=",)
56                 if len(parts) <> 2:
57                         continue
58
59                 contents[parts[0]]= stripQuotes(parts[1])
60
61         return contents
62
63
64
65 def open_rt_db():
66
67         # read plc database passwords and connect
68         rt_db_constants= readConstantsFile(RT_DB_CONSTANTS_PATH)
69         if rt_db_constants is None:
70                 print "Unable to read database access constants from %s" % \
71                           RT_DB_CONSTANTS_PATH
72                 return -1
73
74         try:
75                 rt_db = MySQLdb.connect(host=rt_db_constants['RT_DB_HOST'],
76                                 user=rt_db_constants['RT_DB_USER'],
77                                 passwd=rt_db_constants['RT_DB_PASSWORD'],
78                                 db=rt_db_constants['RT_DB_NAME'])
79         except Error:
80                 print "Failed to connect to RT database"
81                 return -1
82
83         return rt_db
84
85
86
87
88 def rt_tickets(hostname):
89         db = open_rt_db()
90 #       sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject
91 #                        FROM Tickets AS Tk
92 #                        JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
93 #                        JOIN Attachments AS At ON Tr.id=At.TransactionID
94 #                        WHERE (At.Content LIKE '%%%s%%' OR
95 #                               At.Subject LIKE '%%%s%%') AND
96 #                               (Tk.Status = 'new' OR Tk.Status = 'open') AND
97 #                               Tk.Queue = 3 OR Tk.Queue = 19 
98 #                        ORDER BY Tk.Status, Tk.LastUpdated DESC""" \
99 #                        % (hostname,hostname)
100         sql = """SELECT distinct Tk.id, Tk.Status, Tk.Subject
101                          FROM Tickets AS Tk
102                          JOIN Transactions AS Tr ON Tk.id=Tr.ObjectId
103                          JOIN Attachments AS At ON Tr.id=At.TransactionID
104                          WHERE (At.Content LIKE '%%%s%%' OR
105                                 At.Subject LIKE '%%%s%%') AND
106                                 (Tk.Status = 'new' OR Tk.Status = 'open')
107                          ORDER BY Tk.Status, Tk.LastUpdated DESC""" \
108                          % (hostname,hostname)
109
110         try:
111                 # create a 'cursor' (required by MySQLdb)
112                 c = db.cursor()
113                 c.execute(sql)
114         except Exception, err:
115                 print "Could not execute RT query %s" %err
116                 return -1
117
118         # fetch all rows (list of lists)
119         raw = c.fetchall()
120
121         # map list of lists (raw) to list of dicts (tickets) 
122         # when int gets pulls from SQL into python ints are converted to LONG to
123         # prevent overflow .. convert back
124         tickets = map(lambda x: {"ticket_id":int(x[0]),
125                                 "status":x[1],
126                                 "subj":x[2]},
127                                 raw)
128         db.close()
129
130         return tickets
131
132
133 '''
134 Finds tickets associated with hostnames.
135 The idea is if you give it an array of host names,
136 presumeably from comon's list of bad nodes, it starts
137 a few threads to query RT.  RT takes a while to return.
138
139 This is turning into a reinvention of DB design, which I dont believe in.
140 In an effort to keep things minimal, here's the basic algo:
141
142 Give list of hostnames to RT()
143 Finds tickets associate with new hostnames (not in dict(tickets)).
144 Remove nodes that have come backup. Don't care of ticket is closed after first query.
145 Another thread refresh tickets of nodes already in dict and remove nodes that have come up. 
146 '''
147 class RT(Thread):
148         def __init__(self, tickets, toCheck, sickNoTicket, target = None): 
149                 # Time of last update of ticket DB
150                 self.lastupdated = 0
151                 # Queue() is MP/MC self locking.
152                 # Check host in queue.  Queue populated from comon data of sick. 
153                 self.toCheck = toCheck
154                 # Result of rt db query.  Nodes without tickets that are sick.
155                 self.sickNoTicket = sickNoTicket 
156                 #DB of tickets.  Name -> ticket
157                 self.tickets = tickets
158                 Thread.__init__(self,target = self.getTickets)
159
160         # Takes node from toCheck, gets tickets.  
161         # Thread that actually gets the tickets.
162         def getTickets(self):
163                 while 1:
164                         host = self.toCheck.get(block = True)
165                         if host == "None": break
166                         #if self.tickets.has_key(host) == False:
167                         #logger.debug("Popping from q - %s" %host)
168                         tmp = rt_tickets(host)
169                         if tmp:
170                                 #logger.debug("RT: tickets for %s" %host)
171                                 self.tickets[host] = tmp
172                         else:
173                                 logger.debug("RT: no tix for %s - policy" %host)
174                                 self.sickNoTicket.put(host) 
175
176         # Removes hosts that are no longer down.
177         def remTickets(self):
178                 logger.debug("Removing stale entries from DB.") 
179                 prevdown = self.tickets.keys()
180
181                 currdown = []
182                 #BEGIN HACK.  This should be outside of this file. passed to class.
183                 cmn = comon.Comon(None, None)
184                 cmn.updatebkts()
185                 for bucket in cmn.comonbkts.keys():
186                         for host in getattr(cmn,bucket):
187                                 if host not in currdown: currdown.append(host)
188                 #END HACK
189
190                 # Actually do the comparison
191                 for host in prevdown:
192                         if host not in currdown:
193                                 del self.tickets[host]
194                                 logger.info("RT: %s no longer down." % host)
195
196         # Update Tickets
197         def updateTickets(self):
198                 logger.info("Refreshing DB.")
199                 for host in self.tickets.keys():
200                         # Put back in Q to refresh
201                         self.toCheck.put(host)
202
203         def cleanTickets(self):
204                 while 1:
205                         self.remTickets()
206                         self.updateTickets()
207                         time.sleep(RTSLEEP)
208         
209 def main():
210         logger.setLevel(logging.DEBUG)
211         ch = logging.StreamHandler()
212         ch.setLevel(logging.DEBUG)
213         formatter = logging.Formatter('%(message)s')
214         ch.setFormatter(formatter)
215         logger.addHandler(ch)
216
217         bucket = Queue.Queue() 
218         tickets = {}
219         a = RT(tickets, bucket)
220         b = RT(tickets, bucket)
221         c = RT(tickets, bucket)
222         d = RT(tickets, bucket)
223         e = RT(tickets, bucket)
224         a.start()
225         b.start()
226         c.start()
227         d.start()
228         tmp = ('planetlab-1.cs.ucy.ac.cy','planetlab-2.vuse.vanderbilt.edu', 'planetlab-11.cs.princeton.edu', 'planet03.csc.ncsu.edu', 'planetlab1.pop-rj.rnp.br', 'planet1.halifax.canet4.nodes.planet-lab.org', 'planet1.cavite.nodes.planet-lab.org', 'ds-pl3.technion.ac.il', 'planetlab2.cs.purdue.edu', 'planetlab3.millennium.berkeley.edu', 'planetlab1.unl.edu', 'planetlab1.cs.colorado.edu', 'planetlab02.cs.washington.edu', 'orbpl2.rutgers.edu', 'planetlab2.informatik.uni-erlangen.de', 'pl2.ernet.in', 'neu2.6planetlab.edu.cn', 'planetlab-2.cs.uni-paderborn.de', 'planetlab1.elet.polimi.it', 'planetlab2.iiitb.ac.in', 'server1.planetlab.iit-tech.net', 'planetlab2.iitb.ac.in', 'planetlab1.ece.ucdavis.edu', 'planetlab02.dis.unina.it', 'planetlab-1.dis.uniroma1.it', 'planetlab1.iitb.ac.in', 'pku1.6planetlab.edu.cn', 'planetlab1.warsaw.rd.tp.pl', 'planetlab2.cs.unc.edu', 'csu2.6planetlab.edu.cn', 'pl1.ernet.in', 'planetlab2.georgetown.edu', 'planetlab1.cs.uchicago.edu') 
229         for host in tmp:
230                 bucket.put(host)
231         #et = Thread(target=e.pushHosts)        
232         #et.start()
233         time.sleep(15)
234         print tickets.keys()
235         time.sleep(15)
236         print tickets.keys()
237         time.sleep(15)
238         print tickets.keys()
239         #cmn = comon.Comon(cdb, bucket)
240         #cmn.updatebkts()
241         #for bucket in cmn.comonbkts.keys():
242 #               for host in getattr(cmn,bucket):
243 #                       alldown.put(host)
244 #
245         at = Thread(target=a.cleanTickets)
246         at.start()
247         time.sleep(15)
248         print tickets.keys()
249         os._exit(0)
250
251 if __name__ == '__main__':
252     main()