added advanced query
[monitor.git] / findbad.py
1 #!/usr/bin/python
2
3 import os
4 import sys
5 import string
6 import time
7 from datetime import datetime,timedelta
8 import threadpool
9 import threading
10
11 from monitor.util import file
12 from pcucontrol.util import command
13 from monitor import config
14
15 from monitor.database.info.model import FindbadNodeRecord, session
16
17 from monitor.sources import comon
18 from monitor.wrapper import plc, plccache
19 from monitor.scanapi import *
20
21 from nodequery import verify,query_to_dict,node_select
22 import traceback
23 from monitor.common import nmap_port_status
24
25 #print "starting sqlfindbad.py"
26 # QUERY all nodes.
27 COMON_COTOPURL= "http://summer.cs.princeton.edu/status/tabulator.cgi?" + \
28                                 "table=table_nodeview&" + \
29                                 "dumpcols='name,resptime,sshstatus,uptime,lastcotop,cpuspeed,memsize,disksize'&" + \
30                                 "formatcsv"
31                                     #"formatcsv&" + \
32                                         #"select='lastcotop!=0'"
33
34 api = plc.getAuthAPI()
35 plc_lock = threading.Lock()
36 round = 1
37 global_round = round
38 count = 0
39
40 # this will be called when an exception occurs within a thread
41 def handle_exception(request, result):
42         print "Exception occured in request %s" % request.requestID
43         for i in result:
44                 print "Result: %s" % i
45                 
46
47 def checkAndRecordState(l_nodes, cohash):
48         global global_round
49         global count
50
51         tp = threadpool.ThreadPool(20)
52         scannode = ScanNodeInternal(global_round)
53
54         # CREATE all the work requests
55         for nodename in l_nodes:
56                 #fbnodesync = FindbadNodeRecordSync.findby_or_create(hostname=nodename, if_new_set={'round':0})
57                 #node_round   = fbnodesync.round
58                 node_round = global_round - 1
59                 #fbnodesync.flush()
60
61                 if node_round < global_round or config.force:
62                         # recreate node stats when refreshed
63                         #print "%s" % nodename
64                         req = threadpool.WorkRequest(scannode.collectInternal, [nodename, cohash], {}, 
65                                                                                  None, scannode.record, handle_exception)
66                         tp.putRequest(req)
67                 else:
68                         # We just skip it, since it's "up to date"
69                         count += 1
70                         #print "%d %s %s" % (count, nodename, externalState['nodes'][nodename]['values'])
71                         print "%d %s %s" % (count, nodename, node_round)
72
73         # WAIT while all the work requests are processed.
74         begin = time.time()
75         while 1:
76                 try:
77                         time.sleep(1)
78                         tp.poll()
79                         # if more than two hours
80                         if time.time() - begin > (60*60*1.5):
81                                 print "findbad.py has run out of time!!!!!!"
82                                 os._exit(1)
83                 except KeyboardInterrupt:
84                         print "Interrupted!"
85                         break
86                 except threadpool.NoResultsPending:
87                         print "All results collected."
88                         break
89
90         #print FindbadNodeRecordSync.query.count()
91         print FindbadNodeRecord.query.count()
92         session.flush()
93
94 def main():
95         global global_round
96
97         #fbsync = FindbadNodeRecordSync.findby_or_create(hostname="global", 
98         #                                                                                               if_new_set={'round' : global_round})
99         #global_round = fbsync.round
100
101         if config.increment:
102                 # update global round number to force refreshes across all nodes
103                 global_round += 1
104
105         cotop = comon.Comon()
106         # lastcotop measures whether cotop is actually running.  this is a better
107         # metric than sshstatus, or other values from CoMon
108         cotop_url = COMON_COTOPURL
109
110         # history information for all nodes
111         cohash = {}
112         #cohash = cotop.coget(cotop_url)
113         l_nodes = plccache.l_nodes
114         if config.nodelist:
115                 f_nodes = file.getListFromFile(config.nodelist)
116                 l_nodes = filter(lambda x: x['hostname'] in f_nodes, l_nodes)
117         elif config.node:
118                 f_nodes = [config.node]
119                 l_nodes = filter(lambda x: x['hostname'] in f_nodes, l_nodes)
120         elif config.nodegroup:
121                 ng = api.GetNodeGroups({'name' : config.nodegroup})
122                 l_nodes = plccache.GetNodesByIds(ng[0]['node_ids'])
123         elif config.site:
124                 site = plccache.GetSitesByName([config.site])
125                 l_nodes = plccache.GetNodesByIds(site[0]['node_ids'])
126         elif config.sitelist:
127                 site_list = config.sitelist.split(',')
128                 sites = plccache.GetSitesByName(site_list)
129                 node_ids = []
130                 for s in sites:
131                         node_ids += s['node_ids']
132                 l_nodes = plccache.GetNodesByIds(node_ids)
133                 
134         l_nodes = [node['hostname'] for node in l_nodes]
135
136         # perform this query after the above options, so that the filter above
137         # does not break.
138         if config.nodeselect:
139                 plcnodes = plccache.l_nodes
140                 plcnodes = [ node['hostname'] for node in plcnodes ]
141                 l_nodes = node_select(config.nodeselect, plcnodes, None)
142
143         print "fetching %s hosts" % len(l_nodes)
144
145         checkAndRecordState(l_nodes, cohash)
146
147         if config.increment:
148                 # update global round number to force refreshes across all nodes
149                 #fbsync.round = global_round
150                 #fbsync.flush()
151                 pass
152
153         return 0
154
155
156 if __name__ == '__main__':
157         from monitor import parser as parsermodule
158
159         parser = parsermodule.getParser(['nodesets'])
160
161         parser.set_defaults( increment=False, dbname="findbad", cachenodes=False, 
162                                                 force=False,)
163         parser.add_option("", "--cachenodes", action="store_true",
164                                                 help="Cache node lookup from PLC")
165         parser.add_option("", "--dbname", dest="dbname", metavar="FILE", 
166                                                 help="Specify the name of the database to which the information is saved")
167         parser.add_option("-i", "--increment", action="store_true", dest="increment", 
168                                                 help="Increment round number to force refresh or retry")
169         parser.add_option("", "--force", action="store_true", dest="force", 
170                                                 help="Force probe without incrementing global 'round'.")
171
172         parser = parsermodule.getParser(['defaults'], parser)
173         
174         cfg = parsermodule.parse_args(parser)
175
176         try:
177                 main()
178         except Exception, err:
179                 print traceback.print_exc()
180                 from monitor.common import email_exception
181                 email_exception()
182                 print "Exception: %s" % err
183                 print "Saving data... exitting."
184                 sys.exit(0)
185         print "sleeping"