+ findbad.py: this actively probes all machines in the PLC db, using ping,
authorStephen Soltesz <soltesz@cs.princeton.edu>
Wed, 8 Aug 2007 13:36:46 +0000 (13:36 +0000)
committerStephen Soltesz <soltesz@cs.princeton.edu>
Wed, 8 Aug 2007 13:36:46 +0000 (13:36 +0000)
ssh, and then various commands on the machine to determine the actual bootstate.
These records are saved to disk for diagnose.py
+ diagnose.py: reads entries from findbad and previous actions, merging the
two together to determine if machines have improved, or gotten worse.  All
actions to be performed are recorded and written to a diagnose_out pickle file
for action.py
+ action.py: reads the diagnose_out file from diagnose.py and performs the
actions.  It permanently records the resuls in act_all pickle file.

These three in combination are Monitor.

action.py [new file with mode: 0755]
diagnose.py [new file with mode: 0755]
findbad.py [new file with mode: 0755]

diff --git a/action.py b/action.py
new file mode 100755 (executable)
index 0000000..269007e
--- /dev/null
+++ b/action.py
@@ -0,0 +1,203 @@
+#!/usr/bin/python
+#
+# Copyright (c) 2004  The Trustees of Princeton University (Trustees).
+# 
+# Faiyaz Ahmed <faiyaza@cs.princeton.edu>
+# Stephen Soltesz <soltesz@cs.princeton.edu>
+#
+# $Id$
+
+import sys
+from threading import *
+import time
+import logging
+import Queue
+from sets import Set
+
+# Global config options
+from config import config
+from optparse import OptionParser
+parser = OptionParser()
+
+parser.set_defaults(nodelist=None, 
+                                       cachert=False, 
+                                       cachenodes=False, 
+                                       blacklist=None, 
+                                       ticketlist=None)
+
+parser.add_option("", "--nodelist", dest="nodelist",
+                                       help="Read nodes to act on from specified file")
+parser.add_option("", "--cachert", action="store_true",
+                                       help="Cache the RT database query")
+parser.add_option("", "--cachenodes", action="store_true",
+                                       help="Cache node lookup from PLC")
+parser.add_option("", "--ticketlist", dest="ticketlist",
+                                       help="Whitelist all RT tickets in this file")
+parser.add_option("", "--blacklist", dest="blacklist",
+                                       help="Blacklist all nodes in this file")
+
+config = config(parser)
+config.parse_args()
+
+# daemonize and *pid
+#from util.process import * 
+
+# RT tickets
+import rt
+# Correlates input with policy to form actions
+import policy
+import soltesz
+import plc
+
+# Log to what 
+LOG="./monitor.log"
+
+# Time to refresh DB and remove unused entries
+RTSLEEP=7200 #2hrs
+# Time between policy enforce/update
+#POLSLEEP=43200 #12hrs
+POLSLEEP=10
+
+# Global list of all running threads.  Any threads added to 
+# list will be monitored.
+runningthreads = {}
+# Seconds between checking threads
+WATCHSLEEP = 10
+# Set up Logging
+logger = logging.getLogger("monitor")
+logger.setLevel(logging.DEBUG)
+fh = logging.FileHandler(LOG, mode = 'a')
+fh.setLevel(logging.DEBUG)
+formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
+fh.setFormatter(formatter)
+logger.addHandler(fh)
+
+
+"""
+Launches threads and adds them to the runningthreads global list.
+Assigns name for thread, starts.
+"""
+def startThread(fnct, name):
+               runningthreads[name] = fnct
+               runningthreads[name].setName(name)
+               try:
+                       logger.info("Starting thread " + name)
+                       runningthreads[name].start()
+               except Exception, err:
+                       logger.error("Thread: " + name + " " + error)
+
+
+"""
+Watches threads and catches exceptions.  Each launched thread is
+watched and state is logged.
+"""
+class ThreadWatcher(Thread):
+       def __init__(self):
+               Thread.__init__(self)
+
+       def run(self):
+               while 1:
+                       self.checkThreads()
+                       time.sleep(WATCHSLEEP)
+
+       def checkThreads(self):
+               # Iterate through treads, compare with last running.
+               for thread in runningthreads.keys():
+                       # If thread found dead, remove from queue
+                       #print "found %s" % thread
+                       if not runningthreads[thread].isAlive():
+                               logger.error("***********Thread died: %s**********" %(thread))
+                               del runningthreads[thread]
+               return len(runningthreads.keys())
+
+
+class Dummy(Thread):
+       def __init__(self):
+                Thread.__init__(self)
+
+       def run(self):
+               time.sleep(5)
+
+def dict_from_nodelist(nl):
+       d = {}
+       for host in nl:
+               h = host['hostname']
+               d[h] = host
+       return d
+
+"""
+Start threads, do some housekeeping, then daemonize.
+"""
+def main():
+       # Defaults
+       global status, logger
+       global config
+
+       logger.info('Action Started')
+       print 'Action Started'
+
+       #########  GET NODES    ########################################
+       logger.info('Get Nodes from PLC')
+       print "getnode from plc"
+       l_plcnodes = soltesz.if_cached_else(True,
+                                                               "l_plcnodes", 
+                                                               lambda : plc.getNodes({'peer_id':None}))
+
+       s_plcnodenames = Set([x['hostname'] for x in l_plcnodes])
+
+       # List of nodes from a user-provided file.
+       if config.nodelist:
+               file = config.nodelist
+               nodelist = config.getListFromFile(file)
+               #for node in nodelist:
+               #       print "%s" % node
+       
+               s_usernodes = Set(nodelist)
+               # SAFE nodes are in PLC and the list 
+               s_safe_usernodes   = s_plcnodenames & s_usernodes
+               # UNSAFE nodes are in list but not in PLC. i.e. ignore them.
+               s_unsafe_usernodes = s_usernodes - s_plcnodenames
+               if len(s_unsafe_usernodes) > 0 :
+                       for node in s_unsafe_usernodes:
+                               print "WARNING: User provided: %s but not found in PLC" % node
+
+               l_nodes = filter(lambda x: x['hostname'] in s_safe_usernodes,l_plcnodes)
+       else:
+               l_nodes = l_plcnodes
+
+       print "len of l_nodes: %d" % len(l_nodes)
+       # Minus blacklisted ones..
+       l_ticket_blacklist = soltesz.if_cached_else(1,"l_ticket_blacklist",lambda : [])
+
+       l_blacklist = soltesz.if_cached_else(1, "l_blacklist", lambda : [])
+       l_nodes  = filter(lambda x : not x['hostname'] in l_blacklist, l_nodes)
+
+       #######  Get RT tickets    #########################################
+       #logger.info('Get Tickets from RT')
+       #t = soltesz.MyTimer()
+       #ad_dbTickets = soltesz.if_cached_else(config.cachert, "ad_dbTickets", rt.rt_tickets)
+       #print "Getting tickets from RT took: %f sec" % t.diff() ; del t
+
+       logger.info('Start Action thread')
+       ####### Action
+       action = policy.Action( [node['hostname'] for node in l_nodes] )
+       startThread(action,"action")
+
+
+       tw = ThreadWatcher()
+       while True:
+               if tw.checkThreads() == 0:
+                       break
+               time.sleep(WATCHSLEEP)
+
+       logger.info('Action Exitting')
+       sys.exit(0)
+       
+if __name__ == '__main__':
+       try:
+               main()
+       except KeyboardInterrupt:
+               print "Killed.  Exitting."
+               logger.info('Action Killed')
+               sys.exit(0)
diff --git a/diagnose.py b/diagnose.py
new file mode 100755 (executable)
index 0000000..70bdc38
--- /dev/null
@@ -0,0 +1,218 @@
+#!/usr/bin/python
+#
+# Copyright (c) 2004  The Trustees of Princeton University (Trustees).
+# 
+# Faiyaz Ahmed <faiyaza@cs.princeton.edu>
+# Stephen Soltesz <soltesz@cs.princeton.edu>
+#
+# $Id$
+
+import sys
+from threading import *
+import time
+import logging
+import Queue
+from sets import Set
+
+# Global config options
+from config import config
+from optparse import OptionParser
+parser = OptionParser()
+
+parser.set_defaults(nodelist=None, 
+                                       cachert=False, 
+                                       cachenodes=False, 
+                                       blacklist=None, 
+                                       ticketlist=None)
+
+parser.add_option("", "--nodelist", dest="nodelist", metavar="filename",
+                                       help="Read nodes to act on from specified file")
+parser.add_option("", "--cachert", action="store_true",
+                                       help="Cache the RT database query")
+parser.add_option("", "--cachenodes", action="store_true",
+                                       help="Cache node lookup from PLC")
+parser.add_option("", "--ticketlist", dest="ticketlist",
+                                       help="Whitelist all RT tickets in this file")
+parser.add_option("", "--blacklist", dest="blacklist",
+                                       help="Blacklist all nodes in this file")
+
+config = config(parser)
+print "bcalling parse_args"
+config.parse_args()
+
+# daemonize and *pid
+#from util.process import * 
+
+# RT tickets
+import rt
+# Correlates input with policy to form actions
+import policy
+import soltesz
+import plc
+import syncplcdb
+
+# Log to what 
+LOG="./monitor.log"
+
+# Time to refresh DB and remove unused entries
+RTSLEEP=7200 #2hrs
+# Time between policy enforce/update
+#POLSLEEP=43200 #12hrs
+POLSLEEP=10
+
+# Global list of all running threads.  Any threads added to 
+# list will be monitored.
+runningthreads = {}
+# Seconds between checking threads
+WATCHSLEEP = 10
+# Set up Logging
+logger = logging.getLogger("monitor")
+logger.setLevel(logging.DEBUG)
+fh = logging.FileHandler(LOG, mode = 'a')
+fh.setLevel(logging.DEBUG)
+formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
+fh.setFormatter(formatter)
+logger.addHandler(fh)
+
+
+"""
+Launches threads and adds them to the runningthreads global list.
+Assigns name for thread, starts.
+"""
+def startThread(fnct, name):
+               runningthreads[name] = fnct
+               runningthreads[name].setName(name)
+               try:
+                       logger.info("Starting thread " + name)
+                       runningthreads[name].start()
+               except Exception, err:
+                       logger.error("Thread: " + name + " " + error)
+
+
+"""
+Watches threads and catches exceptions.  Each launched thread is
+watched and state is logged.
+"""
+class ThreadWatcher(Thread):
+       def __init__(self):
+               Thread.__init__(self)
+
+       def run(self):
+               while 1:
+                       self.checkThreads()
+                       time.sleep(WATCHSLEEP)
+
+       def checkThreads(self):
+               # Iterate through treads, compare with last running.
+               for thread in runningthreads.keys():
+                       # If thread found dead, remove from queue
+                       #print "found %s" % thread
+                       if not runningthreads[thread].isAlive():
+                               logger.error("***********Thread died: %s**********" %(thread))
+                               del runningthreads[thread]
+               return len(runningthreads.keys())
+
+
+class Dummy(Thread):
+       def __init__(self):
+                Thread.__init__(self)
+
+       def run(self):
+               time.sleep(5)
+
+def dict_from_nodelist(nl):
+       d = {}
+       for host in nl:
+               h = host['hostname']
+               d[h] = host
+       return d
+
+
+
+"""
+Start threads, do some housekeeping, then daemonize.
+"""
+def main():
+       # Defaults
+       global status, logger
+       global config
+
+       logger.info('Diagnose Started')
+       print 'Diagnose Started'
+
+       ##########  VARIABLES   ########################################
+       # Queue between Merge and RT
+       toRT = Queue.Queue()
+
+       # Queue between RT and Diagnose
+       fromRT = Queue.Queue()
+
+       #########  GET NODES    ########################################
+       logger.info('Get Nodes from PLC')
+       print "getnode from plc"
+       l_plcnodes = soltesz.if_cached_else(config.cachenodes, "l_plcnodes",
+                               lambda : syncplcdb.create_plcdb() )
+
+       s_plcnodenames = Set([x['hostname'] for x in l_plcnodes])
+
+       # List of nodes from a user-provided file.
+       if config.nodelist:
+               file = config.nodelist
+               nodelist = config.getListFromFile(file)
+       
+               s_usernodes = Set(nodelist)
+               # SAFE nodes are in PLC and the list 
+               s_safe_usernodes   = s_plcnodenames & s_usernodes
+               # UNSAFE nodes are list but not in PLC. i.e. do not monitor.
+               s_unsafe_usernodes = s_usernodes - s_plcnodenames
+               if len(s_unsafe_usernodes) > 0 :
+                       for node in s_unsafe_usernodes:
+                               print "WARNING: User provided: %s but not found in PLC" % node
+
+               l_nodes = filter(lambda x: x['hostname'] in s_safe_usernodes,l_plcnodes)
+       else:
+               l_nodes = l_plcnodes
+
+       print "len of l_nodes: %d" % len(l_nodes)
+       # Minus blacklisted ones..
+       l_blacklist = soltesz.if_cached_else(1, "l_blacklist", lambda : [])
+       l_ticket_blacklist = soltesz.if_cached_else(1,"l_ticket_blacklist",lambda : [])
+       l_nodes  = filter(lambda x : not x['hostname'] in l_blacklist, l_nodes)
+
+       logger.info('Get Tickets from RT')
+       #######  RT tickets    #########################################
+       t = soltesz.MyTimer()
+       ad_dbTickets = soltesz.if_cached_else(config.cachert, "ad_dbTickets", rt.rt_tickets)
+       print "Getting tickets from RT took: %f sec" % t.diff() ; del t
+
+       logger.info('Start Merge/RT/Diagnose threads')
+       ####### Merge
+       # only look at nodes in l_nodes
+       merge = policy.Merge( [node['hostname'] for node in l_nodes], toRT)
+       startThread(merge,"merge")
+       ####### RT
+       rt1   = rt.RT(ad_dbTickets, toRT, fromRT, l_ticket_blacklist)
+       startThread(rt1,"rt1")
+       ####### Diagnose
+       diagnose = policy.Diagnose(fromRT)
+       startThread(diagnose,"diagnose")
+
+
+       tw = ThreadWatcher()
+       while True:
+               if tw.checkThreads() == 0:
+                       break
+               print "waiting... %s" % time.time()
+               time.sleep(WATCHSLEEP)
+
+       logger.info('Diagnose Exitting')
+       sys.exit(0)
+       
+if __name__ == '__main__':
+       try:
+               main()
+       except KeyboardInterrupt:
+               print "Killed.  Exitting."
+               logger.info('Diagnose Killed')
+               sys.exit(0)
diff --git a/findbad.py b/findbad.py
new file mode 100755 (executable)
index 0000000..d169c4a
--- /dev/null
@@ -0,0 +1,237 @@
+#!/usr/bin/python
+
+import os
+import sys
+import string
+import time
+import soltesz
+import plc
+import comon
+import threadpool
+
+from config import config
+from optparse import OptionParser
+parser = OptionParser()
+parser.set_defaults(filename="", increment=False, dbname="findbadnodes")
+parser.add_option("-f", "--nodes", dest="filename", metavar="FILE", 
+                                       help="Provide the input file for the node list")
+parser.add_option("", "--dbname", dest="dbname", metavar="FILE", 
+                                       help="Specify the name of the database to which the information is saved")
+parser.add_option("-i", "--increment", action="store_true", dest="increment", 
+                                       help="Increment round number to force refresh or retry")
+config = config(parser)
+config.parse_args()
+
+# QUERY all nodes.
+COMON_COTOPURL= "http://summer.cs.princeton.edu/status/tabulator.cgi?" + \
+                                       "table=table_nodeview&" + \
+                                   "dumpcols='name,resptime,sshstatus,uptime,lastcotop'&" + \
+                                   "formatcsv"
+                                   #"formatcsv&" + \
+                                       #"select='lastcotop!=0'"
+
+round = 1
+externalState = {'round': round, 'nodes': {}}
+count = 0
+
+def collectPingAndSSH(nodename, cohash):
+       ### RUN PING ######################
+       ping = soltesz.CMD()
+       (oval,eval) = ping.run_noexcept("ping -c 1 -q %s | grep rtt" % nodename)
+
+       values = {}
+
+       if oval == "":
+               # An error occurred
+               values['ping'] = "NOPING"
+       else:
+               values['ping'] = "PING"
+
+       ### RUN SSH ######################
+       b_getbootcd_id = True
+       ssh = soltesz.SSH('root', nodename)
+       oval = ""
+       eval = ""
+       (oval, eval) = ssh.run_noexcept('echo `uname -a ; ls /tmp/bm.log`')
+       val = oval
+       if "2.6.17" in oval or "2.6.20" in oval:
+               values['ssh'] = 'SSH'
+               if "bm.log" in oval:
+                       values['category'] = 'ALPHA'
+                       values['state'] = 'DEBUG'
+               else:
+                       values['category'] = 'ALPHA'
+                       values['state'] = 'BOOT'
+       elif "2.6.12" in oval or "2.6.10" in oval:
+               values['ssh'] = 'SSH'
+               values['category'] = 'PROD'
+               if "bm.log" in oval:
+                       values['state'] = 'DEBUG'
+               else:
+                       values['state'] = 'BOOT'
+       elif "2.4" in oval:
+               b_getbootcd_id = False
+               values['ssh'] = 'SSH'
+               values['category'] = 'OLDBOOTCD'
+               values['state'] = 'DEBUG'
+       elif oval != "":
+               values['ssh'] = 'SSH'
+               values['category'] = 'UNKNOWN'
+               if "bm.log" in oval:
+                       values['state'] = 'DEBUG'
+               else:
+                       values['state'] = 'BOOT'
+       else:
+               # An error occurred.
+               b_getbootcd_id = False
+               values['ssh'] = 'NOSSH'
+               values['category'] = 'ERROR'
+               values['state'] = 'DOWN'
+               val = eval.strip()
+
+       values['kernel'] = val
+
+       if b_getbootcd_id:
+               # try to get BootCD for all nodes that are not 2.4 nor inaccessible
+               (oval, eval) = ssh.run_noexcept('cat /mnt/cdrom/bootme/ID')
+               val = oval
+               if "BootCD" in val:
+                       values['bootcd'] = val
+                       if "v2" in val:
+                               values['category'] = 'OLDBOOTCD'
+               else:
+                       values['bootcd'] = ""
+       else:
+               values['bootcd'] = ""
+
+       # TODO: get bm.log for debug nodes.
+       # 'zcat /tmp/bm.log'
+               
+       values['comonstats'] = cohash[nodename]
+       # include output value
+       ### GET PLC NODE ######################
+       d_node = plc.getNodes({'hostname': nodename})
+       site_id = -1
+       if d_node and len(d_node) > 0:
+               pcu = d_node[0]['pcu_ids']
+               if len(pcu) > 0:
+                       values['pcu'] = "PCU"
+               else:
+                       values['pcu'] = "NOPCU"
+               site_id = d_node[0]['site_id']
+               values['plcnode'] = {'status' : 'SUCCESS', 'pcu_ids': pcu, 'site_id': site_id}
+       else:
+               values['pcu']     = "UNKNOWN"
+               values['plcnode'] = {'status' : "GN_FAILED"}
+               
+
+       ### GET PLC SITE ######################
+       d_site = plc.getSites({'site_id': site_id})
+       if d_site and len(d_site) > 0:
+               max_slices = d_site[0]['max_slices']
+               num_slices = len(d_site[0]['slice_ids'])
+               num_nodes = len(d_site[0]['node_ids'])
+               loginbase = d_site[0]['login_base']
+               values['plcsite'] = {'num_nodes' : num_nodes, 
+                                                       'max_slices' : max_slices, 
+                                                       'num_slices' : num_slices,
+                                                       'login_base' : loginbase,
+                                                       'status'     : 'SUCCESS'}
+       else:
+               values['plcsite'] = {'status' : "GS_FAILED"}
+
+       return (nodename, values)
+
+def recordPingAndSSH(request, result):
+       global externalState
+       global count
+       (nodename, values) = result
+
+       global_round = externalState['round']
+       externalState['nodes'][nodename]['values'] = values
+       externalState['nodes'][nodename]['round'] = global_round
+
+       count += 1
+       print "%d %s %s" % (count, nodename, externalState['nodes'][nodename]['values'])
+       soltesz.dbDump(config.dbname, externalState)
+
+# this will be called when an exception occurs within a thread
+def handle_exception(request, result):
+       print "Exception occured in request %s" % request.requestID
+       for i in result:
+               print "Result: %s" % i
+
+
+def checkAndRecordState(l_nodes, cohash):
+       global externalState
+       global count
+       global_round = externalState['round']
+
+       tp = threadpool.ThreadPool(20)
+
+       # CREATE all the work requests
+       for nodename in l_nodes:
+               if nodename not in externalState['nodes']:
+                       externalState['nodes'][nodename] = {'round': 0, 'values': []}
+
+               node_round   = externalState['nodes'][nodename]['round']
+               if node_round < global_round:
+                       # recreate node stats when refreshed
+                       #print "%s" % nodename
+                       req = threadpool.WorkRequest(collectPingAndSSH, [nodename, cohash], {}, 
+                                                                                None, recordPingAndSSH, handle_exception)
+                       tp.putRequest(req)
+               else:
+                       # We just skip it, since it's "up to date"
+                       count += 1
+                       print "%d %s %s" % (count, nodename, externalState['nodes'][nodename]['values'])
+                       pass
+
+       # WAIT while all the work requests are processed.
+       while 1:
+               try:
+                       time.sleep(1)
+                       tp.poll()
+               except KeyboardInterrupt:
+                       print "Interrupted!"
+                       break
+               except threadpool.NoResultsPending:
+                       print "All results collected."
+                       break
+
+
+
+def main():
+       global externalState
+
+       externalState = soltesz.if_cached_else(1, config.dbname, lambda : externalState) 
+
+       if config.increment:
+               # update global round number to force refreshes across all nodes
+               externalState['round'] += 1
+
+       cotop = comon.Comon()
+       # lastcotop measures whether cotop is actually running.  this is a better
+       # metric than sshstatus, or other values from CoMon
+       cotop_url = COMON_COTOPURL
+
+       cohash = cotop.coget(cotop_url)
+
+       if config.filename == "":
+               l_nodes = cohash.keys()
+       else:
+               l_nodes = config.getListFromFile(config.filename)
+
+       checkAndRecordState(l_nodes, cohash)
+
+       return 0
+
+
+if __name__ == '__main__':
+       try:
+               main()
+       except Exception, err:
+               print "Exception: %s" % err
+               print "Saving data... exitting."
+               soltesz.dbDump(config.dbname, externalState)
+               sys.exit(0)