modified findbad and findbadpcu to use scanapi. need to combine these files.
[monitor.git] / findbad.py
1 #!/usr/bin/python
2
3 import os
4 import sys
5 import string
6 import time
7 from datetime import datetime,timedelta
8 import threadpool
9 import threading
10
11 from monitor import util
12 from monitor.util import command
13 from monitor import config
14
15 from monitor.database.info.model import FindbadNodeRecordSync, FindbadNodeRecord, session
16
17 from monitor.sources import comon
18 from monitor.wrapper import plc, plccache
19 from monitor.scanapi import *
20
21 from nodequery import verify,query_to_dict,node_select
22 import traceback
23 from nodecommon import nmap_port_status
24
25 #print "starting sqlfindbad.py"
26 # QUERY all nodes.
27 COMON_COTOPURL= "http://summer.cs.princeton.edu/status/tabulator.cgi?" + \
28                                 "table=table_nodeview&" + \
29                                 "dumpcols='name,resptime,sshstatus,uptime,lastcotop,cpuspeed,memsize,disksize'&" + \
30                                 "formatcsv"
31                                     #"formatcsv&" + \
32                                         #"select='lastcotop!=0'"
33
34 api = plc.getAuthAPI()
35 plc_lock = threading.Lock()
36 round = 1
37 global_round = round
38 count = 0
39
40 # this will be called when an exception occurs within a thread
41 def handle_exception(request, result):
42         print "Exception occured in request %s" % request.requestID
43         for i in result:
44                 print "Result: %s" % i
45                 
46
47 def checkAndRecordState(l_nodes, cohash):
48         global global_round
49         global count
50
51         tp = threadpool.ThreadPool(20)
52         scannode = ScanNodeInternal(global_round)
53
54         # CREATE all the work requests
55         for nodename in l_nodes:
56                 fbnodesync = FindbadNodeRecordSync.findby_or_create(hostname=nodename, if_new_set={'round':0})
57                 node_round   = fbnodesync.round
58                 fbnodesync.flush()
59
60                 if node_round < global_round or config.force:
61                         # recreate node stats when refreshed
62                         #print "%s" % nodename
63                         req = threadpool.WorkRequest(scannode.collectInternal, [nodename, cohash], {}, 
64                                                                                  None, scannode.record, handle_exception)
65                         tp.putRequest(req)
66                 else:
67                         # We just skip it, since it's "up to date"
68                         count += 1
69                         #print "%d %s %s" % (count, nodename, externalState['nodes'][nodename]['values'])
70                         print "%d %s %s" % (count, nodename, node_round)
71
72         # WAIT while all the work requests are processed.
73         begin = time.time()
74         while 1:
75                 try:
76                         time.sleep(1)
77                         tp.poll()
78                         # if more than two hours
79                         if time.time() - begin > (60*60*1.5):
80                                 print "findbad.py has run out of time!!!!!!"
81                                 os._exit(1)
82                 except KeyboardInterrupt:
83                         print "Interrupted!"
84                         break
85                 except threadpool.NoResultsPending:
86                         print "All results collected."
87                         break
88
89         print FindbadNodeRecordSync.query.count()
90         print FindbadNodeRecord.query.count()
91         session.flush()
92
93 def main():
94         global global_round
95
96         fbsync = FindbadNodeRecordSync.findby_or_create(hostname="global", 
97                                                                                                         if_new_set={'round' : global_round})
98         global_round = fbsync.round
99
100         if config.increment:
101                 # update global round number to force refreshes across all nodes
102                 global_round += 1
103
104         cotop = comon.Comon()
105         # lastcotop measures whether cotop is actually running.  this is a better
106         # metric than sshstatus, or other values from CoMon
107         cotop_url = COMON_COTOPURL
108
109         # history information for all nodes
110         cohash = {}
111         #cohash = cotop.coget(cotop_url)
112         l_nodes = plccache.l_nodes
113         if config.nodelist:
114                 f_nodes = util.file.getListFromFile(config.nodelist)
115                 l_nodes = filter(lambda x: x['hostname'] in f_nodes, l_nodes)
116         elif config.node:
117                 f_nodes = [config.node]
118                 l_nodes = filter(lambda x: x['hostname'] in f_nodes, l_nodes)
119         elif config.nodegroup:
120                 ng = api.GetNodeGroups({'name' : config.nodegroup})
121                 l_nodes = api.GetNodes(ng[0]['node_ids'])
122         elif config.site:
123                 site = api.GetSites(config.site)
124                 l_nodes = api.GetNodes(site[0]['node_ids'], ['hostname'])
125                 
126         l_nodes = [node['hostname'] for node in l_nodes]
127
128         # perform this query after the above options, so that the filter above
129         # does not break.
130         if config.nodeselect:
131                 plcnodes = api.GetNodes({'peer_id' : None}, ['hostname'])
132                 plcnodes = [ node['hostname'] for node in plcnodes ]
133                 l_nodes = node_select(config.nodeselect, plcnodes, None)
134
135         print "fetching %s hosts" % len(l_nodes)
136
137         checkAndRecordState(l_nodes, cohash)
138
139         if config.increment:
140                 # update global round number to force refreshes across all nodes
141                 fbsync.round = global_round
142                 fbsync.flush()
143
144         return 0
145
146
147 if __name__ == '__main__':
148         from monitor import parser as parsermodule
149
150         parser = parsermodule.getParser(['nodesets'])
151
152         parser.set_defaults( increment=False, dbname="findbad", cachenodes=False, 
153                                                 force=False,)
154         parser.add_option("", "--cachenodes", action="store_true",
155                                                 help="Cache node lookup from PLC")
156         parser.add_option("", "--dbname", dest="dbname", metavar="FILE", 
157                                                 help="Specify the name of the database to which the information is saved")
158         parser.add_option("-i", "--increment", action="store_true", dest="increment", 
159                                                 help="Increment round number to force refresh or retry")
160         parser.add_option("", "--force", action="store_true", dest="force", 
161                                                 help="Force probe without incrementing global 'round'.")
162
163         parser = parsermodule.getParser(['defaults'], parser)
164         
165         cfg = parsermodule.parse_args(parser)
166
167         try:
168                 main()
169         except Exception, err:
170                 print traceback.print_exc()
171                 print "Exception: %s" % err
172                 print "Saving data... exitting."
173                 sys.exit(0)
174         print "sleeping"
175         #print "final commit"
176         #time.sleep(10)