From: Stephen Soltesz Date: Mon, 13 Oct 2008 18:05:07 +0000 (+0000) Subject: mass commit. updates for the new db schema in findbad, findbadpcu, nodequery, X-Git-Tag: Monitor-2.0-0~77 X-Git-Url: http://git.onelab.eu/?p=monitor.git;a=commitdiff_plain;h=19414270cf2c8429daab02fdebbd8081d9ba0db0 mass commit. updates for the new db schema in findbad, findbadpcu, nodequery, and friends. several files moved into python module dir. --- diff --git a/findbad.py b/findbad.py index 47459ad..c08fbc8 100755 --- a/findbad.py +++ b/findbad.py @@ -4,10 +4,22 @@ import os import sys import string import time -import config -import util.file +from datetime import datetime,timedelta +import threadpool +import threading +from monitor import util +from monitor.util import command +from monitor import config +from monitor.database import FindbadNodeRecordSync, FindbadNodeRecord +from monitor.sources import comon +from monitor.wrapper import plc + +import syncplcdb +from nodequery import verify,query_to_dict,node_select +import traceback +print "starting sqlfindbad.py" # QUERY all nodes. COMON_COTOPURL= "http://summer.cs.princeton.edu/status/tabulator.cgi?" + \ "table=table_nodeview&" + \ @@ -16,259 +28,261 @@ COMON_COTOPURL= "http://summer.cs.princeton.edu/status/tabulator.cgi?" + \ #"formatcsv&" + \ #"select='lastcotop!=0'" -import threading +api = plc.getAuthAPI() plc_lock = threading.Lock() round = 1 -externalState = {'round': round, 'nodes': {}} +global_round = round count = 0 - -import database -import moncommands -import comon -import threadpool -import syncplcdb -from nodequery import verify,query_to_dict,node_select -import traceback -import plc -api = plc.getAuthAPI() - def collectPingAndSSH(nodename, cohash): ### RUN PING ###################### - ping = moncommands.CMD() + ping = command.CMD() (oval,errval) = ping.run_noexcept("ping -c 1 -q %s | grep rtt" % nodename) - values = {} + try: + values = {} - if oval == "": - # An error occurred - values['ping'] = "NOPING" - else: - values['ping'] = "PING" + if oval == "": + # An error occurred + values['ping'] = "NOPING" + else: + values['ping'] = "PING" - try: - for port in [22, 806]: - ssh = moncommands.SSH('root', nodename, port) - - (oval, errval) = ssh.run_noexcept2(""" <<\EOF - echo "{" - echo ' "kernel":"'`uname -a`'",' - echo ' "bmlog":"'`ls /tmp/bm.log`'",' - echo ' "bootcd":"'`cat /mnt/cdrom/bootme/ID`'",' - echo ' "nm":"'`ps ax | grep nm.py | grep -v grep`'",' - echo ' "princeton_comon":"'`ls -d /vservers/princeton_comon`'",' - - ID=`grep princeton_comon /etc/passwd | awk -F : '{if ( $3 > 500 ) { print $3}}'` - - echo ' "princeton_comon_running":"'`ls -d /proc/virtual/$ID`'",' - echo ' "princeton_comon_procs":"'`vps ax | grep $ID | grep -v grep | wc -l`'",' - echo "}" -EOF """) - - if len(oval) > 0: - values.update(eval(oval)) - values['sshport'] = port - break + try: + for port in [22, 806]: + ssh = command.SSH('root', nodename, port) + + (oval, errval) = ssh.run_noexcept2(""" <<\EOF + echo "{" + echo ' "kernel":"'`uname -a`'",' + echo ' "bmlog":"'`ls /tmp/bm.log`'",' + echo ' "bootcd":"'`cat /mnt/cdrom/bootme/ID`'",' + echo ' "nm":"'`ps ax | grep nm.py | grep -v grep`'",' + echo ' "readonlyfs":"'`touch /var/log/monitor 2>&1`'",' + echo ' "dns":"'`host boot.planet-lab.org 2>&1`'",' + echo ' "princeton_comon":"'`ls -d /vservers/princeton_comon`'",' + + ID=`grep princeton_comon /etc/passwd | awk -F : '{if ( $3 > 500 ) { print $3}}'` + + echo ' "princeton_comon_running":"'`ls -d /proc/virtual/$ID`'",' + echo ' "princeton_comon_procs":"'`vps ax | grep $ID | grep -v grep | wc -l`'",' + echo "}" +EOF """) + + values['ssherror'] = errval + if len(oval) > 0: + #print "OVAL: %s" % oval + values.update(eval(oval)) + values['sshport'] = port + break + else: + values.update({'kernel': "", 'bmlog' : "", 'bootcd' : '', + 'nm' : '', + 'readonlyfs' : '', + 'dns' : '', + 'princeton_comon' : "", + 'princeton_comon_running' : "", + 'princeton_comon_procs' : "", 'sshport' : None}) + except: + print traceback.print_exc() + sys.exit(1) + + ### RUN SSH ###################### + b_getbootcd_id = True + #ssh = command.SSH('root', nodename) + #oval = "" + #errval = "" + #(oval, errval) = ssh.run_noexcept('echo `uname -a ; ls /tmp/bm.log`') + + oval = values['kernel'] + if "2.6.17" in oval or "2.6.2" in oval: + values['ssh'] = 'SSH' + values['category'] = 'ALPHA' + if "bm.log" in values['bmlog']: + values['state'] = 'DEBUG' else: - values.update({'kernel': "", 'bmlog' : "", 'bootcd' : '', 'nm' : - '', 'princeton_comon' : '', 'princeton_comon_running' : '', - 'princeton_comon_procs' : '', 'sshport' : None}) - except: - print traceback.print_exc() - sys.exit(1) - - ### RUN SSH ###################### - b_getbootcd_id = True - #ssh = moncommands.SSH('root', nodename) - #oval = "" - #errval = "" - #(oval, errval) = ssh.run_noexcept('echo `uname -a ; ls /tmp/bm.log`') - - oval = values['kernel'] - if "2.6.17" in oval or "2.6.2" in oval: - values['ssh'] = 'SSH' - values['category'] = 'ALPHA' - if "bm.log" in values['bmlog']: - values['state'] = 'DEBUG' - else: - values['state'] = 'BOOT' - elif "2.6.12" in oval or "2.6.10" in oval: - values['ssh'] = 'SSH' - values['category'] = 'PROD' - if "bm.log" in values['bmlog']: - values['state'] = 'DEBUG' - else: - values['state'] = 'BOOT' - - # NOTE: on 2.6.8 kernels, with 4.2 bootstrapfs, the chroot command fails. I have no idea why. - elif "2.4" in oval or "2.6.8" in oval: - b_getbootcd_id = False - values['ssh'] = 'SSH' - values['category'] = 'OLDBOOTCD' - values['state'] = 'DEBUG' - elif oval != "": - values['ssh'] = 'SSH' - values['category'] = 'UNKNOWN' - if "bm.log" in values['bmlog']: + values['state'] = 'BOOT' + elif "2.6.12" in oval or "2.6.10" in oval: + values['ssh'] = 'SSH' + values['category'] = 'PROD' + if "bm.log" in values['bmlog']: + values['state'] = 'DEBUG' + else: + values['state'] = 'BOOT' + + # NOTE: on 2.6.8 kernels, with 4.2 bootstrapfs, the chroot command fails. I have no idea why. + elif "2.4" in oval or "2.6.8" in oval: + b_getbootcd_id = False + values['ssh'] = 'SSH' + values['category'] = 'OLDBOOTCD' values['state'] = 'DEBUG' + elif oval != "": + values['ssh'] = 'SSH' + values['category'] = 'UNKNOWN' + if "bm.log" in values['bmlog']: + values['state'] = 'DEBUG' + else: + values['state'] = 'BOOT' else: - values['state'] = 'BOOT' - else: - # An error occurred. - b_getbootcd_id = False - values['ssh'] = 'NOSSH' - values['category'] = 'ERROR' - values['state'] = 'DOWN' - val = errval.strip() - values['kernel'] = val - - #values['kernel'] = val - - if b_getbootcd_id: - # try to get BootCD for all nodes that are not 2.4 nor inaccessible - #(oval, errval) = ssh.run_noexcept('cat /mnt/cdrom/bootme/ID') - oval = values['bootcd'] - if "BootCD" in oval: - values['bootcd'] = oval - if "v2" in oval and \ - ( nodename is not "planetlab1.cs.unc.edu" and \ - nodename is not "planetlab2.cs.unc.edu" ): - values['category'] = 'OLDBOOTCD' + # An error occurred. + b_getbootcd_id = False + values['ssh'] = 'NOSSH' + values['category'] = 'ERROR' + values['state'] = 'DOWN' + val = errval.strip() + values['ssherror'] = val + values['kernel'] = "" + + #values['kernel'] = val + + if b_getbootcd_id: + # try to get BootCD for all nodes that are not 2.4 nor inaccessible + #(oval, errval) = ssh.run_noexcept('cat /mnt/cdrom/bootme/ID') + oval = values['bootcd'] + if "BootCD" in oval: + values['bootcd'] = oval + if "v2" in oval and \ + ( nodename is not "planetlab1.cs.unc.edu" and \ + nodename is not "planetlab2.cs.unc.edu" ): + values['category'] = 'OLDBOOTCD' + else: + values['bootcd'] = "" else: values['bootcd'] = "" - else: - values['bootcd'] = "" - - # TODO: get bm.log for debug nodes. - # 'zcat /tmp/bm.log' - - #(oval, errval) = ssh.run_noexcept('ps ax | grep nm.py | grep -v grep') - oval = values['nm'] - if "nm.py" in oval: - values['nm'] = "Y" - else: - values['nm'] = "N" - - continue_slice_check = True - #(oval, errval) = ssh.run_noexcept('ls -d /vservers/princeton_comon') - oval = values['princeton_comon'] - if "princeton_comon" in oval: - values['princeton_comon'] = "Y" - else: - values['princeton_comon'] = "N" - continue_slice_check = False - - if continue_slice_check: - #(oval, errval) = ssh.run_noexcept('ID=`grep princeton_comon /etc/passwd | awk -F : "{if ( \\\$3 > 500 ) { print \\\$3}}"`; ls -d /proc/virtual/$ID') - oval = values['princeton_comon_running'] - if len(oval) > len('/proc/virtual/'): - values['princeton_comon_running'] = "Y" - else: - values['princeton_comon_running'] = "N" - continue_slice_check = False - else: - values['princeton_comon_running'] = "-" - - if continue_slice_check: - #(oval, errval) = ssh.run_noexcept('ID=`grep princeton_comon /etc/passwd | awk -F : "{if ( \\\$3 > 500 ) { print \\\$3}}"`; vps ax | grep $ID | grep -v grep | wc -l') - oval = values['princeton_comon_procs'] - values['princeton_comon_procs'] = oval - else: - values['princeton_comon_procs'] = "-" + # TODO: get bm.log for debug nodes. + # 'zcat /tmp/bm.log' - if nodename in cohash: - values['comonstats'] = cohash[nodename] - else: - values['comonstats'] = {'resptime': '-1', - 'uptime': '-1', - 'sshstatus': '-1', - 'lastcotop': '-1', - 'cpuspeed' : "null", - 'disksize' : 'null', - 'memsize' : 'null'} - # include output value - ### GET PLC NODE ###################### - b_except = False - plc_lock.acquire() - - try: - d_node = plc.getNodes({'hostname': nodename}, ['pcu_ids', 'site_id', 'date_created', 'last_updated', 'last_contact', 'boot_state', 'nodegroup_ids']) - except: - b_except = True - traceback.print_exc() - - plc_lock.release() - if b_except: return (None, None) + #(oval, errval) = ssh.run_noexcept('ps ax | grep nm.py | grep -v grep') + oval = values['nm'] + if "nm.py" in oval: + values['nm'] = "Y" + else: + values['nm'] = "N" - site_id = -1 - if d_node and len(d_node) > 0: - pcu = d_node[0]['pcu_ids'] - if len(pcu) > 0: - values['pcu'] = "PCU" + continue_slice_check = True + #(oval, errval) = ssh.run_noexcept('ls -d /vservers/princeton_comon') + oval = values['princeton_comon'] + if "princeton_comon" in oval: + values['princeton_comon'] = True else: - values['pcu'] = "NOPCU" - site_id = d_node[0]['site_id'] - last_contact = d_node[0]['last_contact'] - nodegroups = [ i['name'] for i in api.GetNodeGroups(d_node[0]['nodegroup_ids']) ] - values['plcnode'] = {'status' : 'SUCCESS', - 'pcu_ids': pcu, - 'boot_state' : d_node[0]['boot_state'], - 'site_id': site_id, - 'nodegroups' : nodegroups, - 'last_contact': last_contact, - 'date_created': d_node[0]['date_created'], - 'last_updated': d_node[0]['last_updated']} - else: - values['pcu'] = "UNKNOWN" - values['plcnode'] = {'status' : "GN_FAILED"} - + values['princeton_comon'] = False + continue_slice_check = False - ### GET PLC SITE ###################### - b_except = False - plc_lock.acquire() + if continue_slice_check: + #(oval, errval) = ssh.run_noexcept('ID=`grep princeton_comon /etc/passwd | awk -F : "{if ( \\\$3 > 500 ) { print \\\$3}}"`; ls -d /proc/virtual/$ID') + oval = values['princeton_comon_running'] + if len(oval) > len('/proc/virtual/'): + values['princeton_comon_running'] = True + else: + values['princeton_comon_running'] = False + continue_slice_check = False + else: + values['princeton_comon_running'] = False + + if continue_slice_check: + #(oval, errval) = ssh.run_noexcept('ID=`grep princeton_comon /etc/passwd | awk -F : "{if ( \\\$3 > 500 ) { print \\\$3}}"`; vps ax | grep $ID | grep -v grep | wc -l') + oval = values['princeton_comon_procs'] + values['princeton_comon_procs'] = int(oval) + else: + values['princeton_comon_procs'] = None - try: - d_site = plc.getSites({'site_id': site_id}, - ['max_slices', 'slice_ids', 'node_ids', 'login_base']) + + if nodename in cohash: + values['comonstats'] = cohash[nodename] + else: + values['comonstats'] = {'resptime': '-1', + 'uptime': '-1', + 'sshstatus': '-1', + 'lastcotop': '-1', + 'cpuspeed' : "null", + 'disksize' : 'null', + 'memsize' : 'null'} + # include output value + ### GET PLC NODE ###################### + plc_lock.acquire() + d_node = None + try: + d_node = plc.getNodes({'hostname': nodename}, ['pcu_ids', 'site_id', 'date_created', + 'last_updated', 'last_contact', 'boot_state', 'nodegroup_ids'])[0] + except: + traceback.print_exc() + plc_lock.release() + values['plcnode'] = d_node + + ### GET PLC PCU ###################### + site_id = -1 + d_pcu = None + if d_node: + pcu = d_node['pcu_ids'] + if len(pcu) > 0: + d_pcu = pcu[0] + + site_id = d_node['site_id'] + + values['pcu'] = d_pcu + + ### GET PLC SITE ###################### + plc_lock.acquire() + d_site = None + values['loginbase'] = "" + try: + d_site = plc.getSites({'site_id': site_id}, + ['max_slices', 'slice_ids', 'node_ids', 'login_base'])[0] + values['loginbase'] = d_site['login_base'] + except: + traceback.print_exc() + plc_lock.release() + + values['plcsite'] = d_site + values['date_checked'] = time.time() except: - b_except = True - traceback.print_exc() - - plc_lock.release() - if b_except: return (None, None) - - if d_site and len(d_site) > 0: - max_slices = d_site[0]['max_slices'] - num_slices = len(d_site[0]['slice_ids']) - num_nodes = len(d_site[0]['node_ids']) - loginbase = d_site[0]['login_base'] - values['plcsite'] = {'num_nodes' : num_nodes, - 'max_slices' : max_slices, - 'num_slices' : num_slices, - 'login_base' : loginbase, - 'status' : 'SUCCESS'} - else: - values['plcsite'] = {'status' : "GS_FAILED"} - - values['checked'] = time.time() + print traceback.print_exc() return (nodename, values) def recordPingAndSSH(request, result): - global externalState + global global_round global count (nodename, values) = result - if values is not None: - global_round = externalState['round'] - externalState['nodes'][nodename]['values'] = values - externalState['nodes'][nodename]['round'] = global_round + try: + if values is not None: + fbsync = FindbadNodeRecordSync.findby_or_create(hostname="global", + if_new_set={'round' : global_round}) + global_round = fbsync.round + fbnodesync = FindbadNodeRecordSync.findby_or_create(hostname=nodename, + if_new_set={'round' : global_round}) + + fbrec = FindbadNodeRecord( + date_checked=datetime.fromtimestamp(values['date_checked']), + hostname=nodename, + loginbase=values['loginbase'], + kernel_version=values['kernel'], + bootcd_version=values['bootcd'], + nm_status=values['nm'], + fs_status=values['readonlyfs'], + dns_status=values['dns'], + princeton_comon_dir=values['princeton_comon'], + princeton_comon_running=values['princeton_comon_running'], + princeton_comon_procs=values['princeton_comon_procs'], + plc_node_stats = values['plcnode'], + plc_site_stats = values['plcsite'], + plc_pcuid = values['pcu'], + comon_stats = values['comonstats'], + ping_status = (values['ping'] == "PING"), + ssh_portused = values['sshport'], + ssh_status = (values['ssh'] == "SSH"), + ssh_error = values['ssherror'], + observed_status = values['state'], + ) + fbnodesync.round = global_round - count += 1 - print "%d %s %s" % (count, nodename, externalState['nodes'][nodename]['values']) - if count % 20 == 0: - database.dbDump(config.dbname, externalState) + count += 1 + print "%d %s %s" % (count, nodename, values) + except: + print "ERROR:" + print traceback.print_exc() # this will be called when an exception occurs within a thread def handle_exception(request, result): @@ -278,18 +292,16 @@ def handle_exception(request, result): def checkAndRecordState(l_nodes, cohash): - global externalState + global global_round global count - global_round = externalState['round'] tp = threadpool.ThreadPool(20) # CREATE all the work requests for nodename in l_nodes: - if nodename not in externalState['nodes']: - externalState['nodes'][nodename] = {'round': 0, 'values': []} + fbnodesync = FindbadNodeRecordSync.findby_or_create(hostname=nodename, if_new_set={'round':0}) - node_round = externalState['nodes'][nodename]['round'] + node_round = fbnodesync.round if node_round < global_round: # recreate node stats when refreshed #print "%s" % nodename @@ -299,8 +311,8 @@ def checkAndRecordState(l_nodes, cohash): else: # We just skip it, since it's "up to date" count += 1 - print "%d %s %s" % (count, nodename, externalState['nodes'][nodename]['values']) - pass + #print "%d %s %s" % (count, nodename, externalState['nodes'][nodename]['values']) + print "%d %s %s" % (count, nodename, node_round) # WAIT while all the work requests are processed. begin = time.time() @@ -311,7 +323,6 @@ def checkAndRecordState(l_nodes, cohash): # if more than two hours if time.time() - begin > (60*60*1.5): print "findbad.py has run out of time!!!!!!" - database.dbDump(config.dbname, externalState) os._exit(1) except KeyboardInterrupt: print "Interrupted!" @@ -320,18 +331,20 @@ def checkAndRecordState(l_nodes, cohash): print "All results collected." break - database.dbDump(config.dbname, externalState) - - + print FindbadNodeRecordSync.query.count() + print FindbadNodeRecord.query.count() def main(): - global externalState + global global_round - externalState = database.if_cached_else(1, config.dbname, lambda : externalState) + fbsync = FindbadNodeRecordSync.findby_or_create(hostname="global", + if_new_set={'round' : global_round}) + global_round = fbsync.round if config.increment: # update global round number to force refreshes across all nodes - externalState['round'] += 1 + global_round += 1 + fbsync.round = global_round cotop = comon.Comon() # lastcotop measures whether cotop is actually running. this is a better @@ -360,8 +373,9 @@ def main(): # perform this query after the above options, so that the filter above # does not break. if config.nodeselect: - fb = database.dbLoad("findbad") - l_nodes = node_select(config.nodeselect, fb['nodes'].keys(), fb) + plcnodes = api.GetNodes({'peer_id' : None}, ['hostname']) + plcnodes = [ node['hostname'] for node in plcnodes ] + l_nodes = node_select(config.nodeselect, plcnodes, None) print "fetching %s hosts" % len(l_nodes) @@ -371,7 +385,7 @@ def main(): if __name__ == '__main__': - import parser as parsermodule + from monitor import parser as parsermodule parser = parsermodule.getParser(['nodesets']) @@ -393,5 +407,7 @@ if __name__ == '__main__': print traceback.print_exc() print "Exception: %s" % err print "Saving data... exitting." - database.dbDump(config.dbname, externalState) sys.exit(0) + print "sleeping" + #print "final commit" + #time.sleep(10) diff --git a/findbadpcu.py b/findbadpcu.py index ca65344..48b1761 100755 --- a/findbadpcu.py +++ b/findbadpcu.py @@ -5,56 +5,27 @@ import sys import string import time import socket -import util.file -import plc import sets - - import signal import traceback -from nodequery import pcu_select +from datetime import datetime,timedelta +import threadpool +import threading -#old_handler = signal.getsignal(signal.SIGCHLD) - -#def sig_handler(signum, stack): -# """ Handle SIGCHLD signal """ -# global old_handler -# if signum == signal.SIGCHLD: -# try: -# os.wait() -# except: -# pass -# if old_handler != signal.SIG_DFL: -# old_handler(signum, stack) -# -#orig_sig_handler = signal.signal(signal.SIGCHLD, sig_handler) - - -# QUERY all nodes. -COMON_COTOPURL= "http://summer.cs.princeton.edu/status/tabulator.cgi?" + \ - "table=table_nodeview&" + \ - "dumpcols='name,resptime,sshstatus,uptime,lastcotop'&" + \ - "formatcsv" - #"formatcsv&" + \ - #"select='lastcotop!=0'" +import monitor +from monitor.pcu import reboot +from monitor import config +from monitor.database import FindbadPCURecordSync, FindbadPCURecord +from monitor import util +from monitor.wrapper import plc +import syncplcdb +from nodequery import pcu_select -import threading plc_lock = threading.Lock() -round = 1 -externalState = {'round': round, 'nodes': {'a': None}} +global_round = 1 errorState = {} count = 0 -import reboot -from reboot import pcu_name - -import database -import moncommands -import plc -import comon -import threadpool -import syncplcdb - def nmap_portstatus(status): ps = {} l_nmap = status.split() @@ -71,14 +42,14 @@ def nmap_portstatus(status): def get_pcu(pcuname): plc_lock.acquire() try: - print "GetPCU from PLC %s" % pcuname + #print "GetPCU from PLC %s" % pcuname l_pcu = plc.GetPCUs({'pcu_id' : pcuname}) - print l_pcu + #print l_pcu if len(l_pcu) > 0: l_pcu = l_pcu[0] except: try: - print "GetPCU from file %s" % pcuname + #print "GetPCU from file %s" % pcuname l_pcus = database.dbLoad("pculist") for i in l_pcus: if i['pcu_id'] == pcuname: @@ -185,14 +156,17 @@ def collectPingAndSSH(pcuname, cohash): continue_probe = True errors = None - values = {} + values = {'reboot' : 'novalue'} ### GET PCU ###################### try: b_except = False try: v = get_plc_pcu_values(pcuname) + if v['hostname'] is not None: v['hostname'] = v['hostname'].strip() + if v['ip'] is not None: v['ip'] = v['ip'].strip() + if v is not None: - values.update(v) + values['plc_pcu_stats'] = v else: continue_probe = False except: @@ -202,23 +176,18 @@ def collectPingAndSSH(pcuname, cohash): if b_except or not continue_probe: return (None, None, None) - if values['hostname'] is not None: - values['hostname'] = values['hostname'].strip() - - if values['ip'] is not None: - values['ip'] = values['ip'].strip() #### COMPLETE ENTRY ####################### values['complete_entry'] = [] #if values['protocol'] is None or values['protocol'] is "": # values['complete_entry'] += ["protocol"] - if values['model'] is None or values['model'] is "": + if values['plc_pcu_stats']['model'] is None or values['plc_pcu_stats']['model'] is "": values['complete_entry'] += ["model"] # Cannot continue due to this condition continue_probe = False - if values['password'] is None or values['password'] is "": + if values['plc_pcu_stats']['password'] is None or values['plc_pcu_stats']['password'] is "": values['complete_entry'] += ["password"] # Cannot continue due to this condition continue_probe = False @@ -226,23 +195,23 @@ def collectPingAndSSH(pcuname, cohash): if len(values['complete_entry']) > 0: continue_probe = False - if values['hostname'] is None or values['hostname'] is "": + if values['plc_pcu_stats']['hostname'] is None or values['plc_pcu_stats']['hostname'] is "": values['complete_entry'] += ["hostname"] - if values['ip'] is None or values['ip'] is "": + if values['plc_pcu_stats']['ip'] is None or values['plc_pcu_stats']['ip'] is "": values['complete_entry'] += ["ip"] # If there are no nodes associated with this PCU, then we cannot continue. - if len(values['node_ids']) == 0: + if len(values['plc_pcu_stats']['node_ids']) == 0: continue_probe = False values['complete_entry'] += ['NoNodeIds'] #### DNS and IP MATCH ####################### - if values['hostname'] is not None and values['hostname'] is not "" and \ - values['ip'] is not None and values['ip'] is not "": + if values['plc_pcu_stats']['hostname'] is not None and values['plc_pcu_stats']['hostname'] is not "" and \ + values['plc_pcu_stats']['ip'] is not None and values['plc_pcu_stats']['ip'] is not "": #print "Calling socket.gethostbyname(%s)" % values['hostname'] try: - ipaddr = socket.gethostbyname(values['hostname']) - if ipaddr == values['ip']: + ipaddr = socket.gethostbyname(values['plc_pcu_stats']['hostname']) + if ipaddr == values['plc_pcu_stats']['ip']: values['dnsmatch'] = "DNS-OK" else: values['dnsmatch'] = "DNS-MISMATCH" @@ -250,21 +219,21 @@ def collectPingAndSSH(pcuname, cohash): except Exception, err: values['dnsmatch'] = "DNS-NOENTRY" - values['hostname'] = values['ip'] + values['plc_pcu_stats']['hostname'] = values['plc_pcu_stats']['ip'] #print err else: - if values['ip'] is not None and values['ip'] is not "": + if values['plc_pcu_stats']['ip'] is not None and values['plc_pcu_stats']['ip'] is not "": values['dnsmatch'] = "NOHOSTNAME" - values['hostname'] = values['ip'] + values['plc_pcu_stats']['hostname'] = values['plc_pcu_stats']['ip'] else: values['dnsmatch'] = "NO-DNS-OR-IP" - values['hostname'] = "No_entry_in_DB" + values['plc_pcu_stats']['hostname'] = "No_entry_in_DB" continue_probe = False #### RUN NMAP ############################### if continue_probe: - nmap = moncommands.CMD() - (oval,eval) = nmap.run_noexcept("nmap -oG - -P0 -p22,23,80,443,5869,9100,16992 %s | grep Host:" % pcu_name(values)) + nmap = util.command.CMD() + (oval,eval) = nmap.run_noexcept("nmap -oG - -P0 -p22,23,80,443,5869,9100,16992 %s | grep Host:" % reboot.pcu_name(values['plc_pcu_stats'])) # NOTE: an empty / error value for oval, will still work. (values['portstatus'], continue_probe) = nmap_portstatus(oval) else: @@ -272,20 +241,13 @@ def collectPingAndSSH(pcuname, cohash): ###### DRY RUN ############################ - if 'node_ids' in values and len(values['node_ids']) > 0: - rb_ret = reboot.reboot_test(values['nodenames'][0], values, continue_probe, 1, True) + if 'node_ids' in values['plc_pcu_stats'] and len(values['plc_pcu_stats']['node_ids']) > 0: + rb_ret = reboot.reboot_test(values['plc_pcu_stats']['nodenames'][0], values, continue_probe, 1, True) else: rb_ret = "Not_Run" # No nodes to test" values['reboot'] = rb_ret - ### GET PLC SITE ###################### - v = get_plc_site_values(values['site_id']) - if v is not None: - values.update(v) - else: - values['plcsite'] = {'status' : "GS_FAILED"} - except: print "____________________________________" print values @@ -294,24 +256,35 @@ def collectPingAndSSH(pcuname, cohash): errors['traceback'] = traceback.format_exc() print errors['traceback'] - values['checked'] = time.time() + values['date_checked'] = time.time() return (pcuname, values, errors) def recordPingAndSSH(request, result): global errorState - global externalState global count + global global_round (nodename, values, errors) = result if values is not None: - global_round = externalState['round'] - pcu_id = "id_%s" % nodename - externalState['nodes'][pcu_id]['values'] = values - externalState['nodes'][pcu_id]['round'] = global_round - + pcu_id = int(nodename) + fbsync = FindbadPCURecordSync.findby_or_create(plc_pcuid=0, + if_new_set={'round': global_round}) + global_round = fbsync.round + fbnodesync = FindbadPCURecordSync.findby_or_create(plc_pcuid=pcu_id, + if_new_set={'round' : global_round}) + + fbrec = FindbadPCURecord( + date_checked=datetime.fromtimestamp(values['date_checked']), + plc_pcuid=pcu_id, + plc_pcu_stats=values['plc_pcu_stats'], + dns_status=values['dnsmatch'], + port_status=values['portstatus'], + entry_complete=" ".join(values['complete_entry']), + reboot_trial_status="%s" % values['reboot'], + ) + fbnodesync.round = global_round count += 1 - print "%d %s %s" % (count, nodename, externalState['nodes'][pcu_id]['values']) - database.dbDump(config.dbname, externalState) + print "%d %s %s" % (count, nodename, values) if errors is not None: pcu_id = "id_%s" % nodename @@ -326,21 +299,17 @@ def handle_exception(request, result): def checkAndRecordState(l_pcus, cohash): - global externalState + global global_round global count - global_round = externalState['round'] tp = threadpool.ThreadPool(10) # CREATE all the work requests for pcuname in l_pcus: - pcu_id = "id_%s" % pcuname - if pcuname not in externalState['nodes']: - #print type(externalState['nodes']) - - externalState['nodes'][pcu_id] = {'round': 0, 'values': []} + pcu_id = int(pcuname) + fbnodesync = FindbadPCURecordSync.findby_or_create(plc_pcuid=pcu_id, if_new_set={'round' : 0}) - node_round = externalState['nodes'][pcu_id]['round'] + node_round = fbnodesync.round if node_round < global_round: # recreate node stats when refreshed #print "%s" % nodename @@ -350,8 +319,7 @@ def checkAndRecordState(l_pcus, cohash): else: # We just skip it, since it's "up to date" count += 1 - print "%d %s %s" % (count, pcu_id, externalState['nodes'][pcu_id]['values']) - pass + print "%d %s %s" % (count, pcu_id, node_round) # WAIT while all the work requests are processed. begin = time.time() @@ -362,7 +330,6 @@ def checkAndRecordState(l_pcus, cohash): # if more than two hours if time.time() - begin > (60*60*1): print "findbadpcus.py has run out of time!!!!!!" - database.dbDump(config.dbname, externalState) os._exit(1) except KeyboardInterrupt: print "Interrupted!" @@ -371,18 +338,24 @@ def checkAndRecordState(l_pcus, cohash): print "All results collected." break + print FindbadPCURecordSync.query.count() + print FindbadPCURecord.query.count() def main(): - global externalState + global global_round - l_pcus = database.if_cached_else_refresh(1, config.refresh, "pculist", lambda : plc.GetPCUs()) - externalState = database.if_cached_else(1, config.dbname, lambda : externalState) + l_pcus = monitor.database.if_cached_else_refresh(1, config.refresh, "pculist", lambda : plc.GetPCUs()) cohash = {} + fbsync = FindbadPCURecordSync.findby_or_create(plc_pcuid=0, if_new_set={'round' : global_round}) + + global_round = fbsync.round + if config.increment: # update global round number to force refreshes across all nodes - externalState['round'] += 1 + global_round += 1 + fbsync.round = global_round if config.site is not None: api = plc.getAuthAPI() @@ -422,7 +395,7 @@ if __name__ == '__main__': formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') fh.setFormatter(formatter) logger.addHandler(fh) - import parser as parsermodule + from monitor import parser as parsermodule parser = parsermodule.getParser() parser.set_defaults(nodelist=None, increment=False, @@ -463,5 +436,4 @@ if __name__ == '__main__': traceback.print_exc() print "Exception: %s" % err print "Saving data... exitting." - database.dbDump(config.dbname, externalState) sys.exit(0) diff --git a/const.py b/monitor/const.py similarity index 100% rename from const.py rename to monitor/const.py diff --git a/monitor/database/dborm.py b/monitor/database/dborm.py index 5bcbefc..3e47ea2 100644 --- a/monitor/database/dborm.py +++ b/monitor/database/dborm.py @@ -1,9 +1,9 @@ -import pkg_resources -pkg_resources.require("SQLAlchemy>=0.4.9") +#import pkg_resources +#pkg_resources.require("SQLAlchemy>=0.4.9") import sqlalchemy import elixir import monitor.config as config -elixir.metadata.bind = sqlalchemy.create_engine(config.databaseuri, echo=True) +elixir.metadata.bind = sqlalchemy.create_engine(config.databaseuri, echo=False) elixir.session = sqlalchemy.orm.scoped_session(sqlalchemy.orm.sessionmaker(autoflush=True,autocommit=True)) from infovacuum.model import * diff --git a/monitor/database/dbpickle.py b/monitor/database/dbpickle.py index e127791..6f3a87d 100644 --- a/monitor/database/dbpickle.py +++ b/monitor/database/dbpickle.py @@ -1,6 +1,10 @@ import os import sys import pickle +import inspect +import shutil +from monitor import config + noserial=False try: from util.PHPSerialize import * @@ -9,11 +13,6 @@ except: #print >>sys.stderr, "PHPSerial db type not allowed." noserial=True -import inspect -import shutil -import config -import config as monitorconfig - DEBUG= 0 PICKLE_PATH=config.MONITOR_DATA_ROOT diff --git a/parser.py b/monitor/parser.py similarity index 100% rename from parser.py rename to monitor/parser.py diff --git a/reboot.py b/monitor/pcu/reboot.py similarity index 99% rename from reboot.py rename to monitor/pcu/reboot.py index f3f7f32..0fb0534 100755 --- a/reboot.py +++ b/monitor/pcu/reboot.py @@ -11,13 +11,13 @@ import urllib2 import urllib import threading, popen2 import array, struct -import plc +from monitor.wrapper import plc import base64 from subprocess import PIPE, Popen import ssh.pxssh as pxssh import ssh.pexpect as pexpect import socket -import moncommands +from monitor.util import command # Use our versions of telnetlib and pyssh sys.path.insert(0, os.path.dirname(sys.argv[0])) @@ -559,7 +559,7 @@ class APC(PCUControl): class IntelAMT(PCUControl): def run(self, node_port, dryrun): - cmd = moncommands.CMD() + cmd = command.CMD() #[cmd_str = "IntelAMTSDK/Samples/RemoteControl/remoteControl" cmd_str = "cmdamt/remoteControl" @@ -624,7 +624,7 @@ class HPiLO(PCUControl): class HPiLOHttps(PCUControl): def run(self, node_port, dryrun): - locfg = moncommands.CMD() + locfg = command.CMD() cmd = "cmdhttps/locfg.pl -s %s -f %s -u %s -p '%s' | grep 'MESSAGE' | grep -v 'No error'" % ( self.host, "iloxml/Get_Network.xml", self.username, self.password) @@ -635,7 +635,7 @@ class HPiLOHttps(PCUControl): return sout.strip() if not dryrun: - locfg = moncommands.CMD() + locfg = command.CMD() cmd = "cmdhttps/locfg.pl -s %s -f %s -u %s -p '%s' | grep 'MESSAGE' | grep -v 'No error'" % ( self.host, "iloxml/Reset_Server.xml", self.username, self.password) diff --git a/mailer.py b/monitor/wrapper/mailer.py similarity index 99% rename from mailer.py rename to monitor/wrapper/mailer.py index 46fdcae..cf09be1 100755 --- a/mailer.py +++ b/monitor/wrapper/mailer.py @@ -7,7 +7,7 @@ # $Id: mailer.py,v 1.10 2007/08/08 13:28:06 soltesz Exp $ from emailTxt import * import smtplib -import config +from monitor import config import calendar import logging import os diff --git a/rt.py b/monitor/wrapper/rt.py similarity index 100% rename from rt.py rename to monitor/wrapper/rt.py diff --git a/nodebad.py b/nodebad.py index 8d7650c..57f23c0 100755 --- a/nodebad.py +++ b/nodebad.py @@ -4,27 +4,21 @@ import os import sys import string import time +from datetime import datetime,timedelta +from nodequery import verify,query_to_dict,node_select -import database -import comon -import threadpool import syncplcdb -from nodequery import verify,query_to_dict,node_select from nodecommon import * -from datetime import datetime,timedelta -import config -from sqlobject import connectionForURI,sqlhub -connection = connectionForURI(config.sqlobjecturi) -sqlhub.processConnection = connection -from infovacuum.model_findbadrecord import * -from infovacuum.model_historyrecord import * +from monitor import config +from monitor.wrapper import plc +from monitor.const import MINUP +from monitor.database import FindbadNodeRecord, HistoryNodeRecord -import plc -api = plc.getAuthAPI() from unified_model import * -from const import MINUP + +api = plc.getAuthAPI() round = 1 count = 0 @@ -49,20 +43,18 @@ def checkAndRecordState(l_nodes, l_plcnodes): if not d_node: continue - try: - pf = HistoryNodeRecord.by_hostname(nodename) - except: - pf = HistoryNodeRecord(hostname=nodename) - + pf = HistoryNodeRecord.findby_or_create(hostname=nodename) pf.last_checked = datetime.now() try: # Find the most recent record - noderec = FindbadNodeRecord.select(FindbadNodeRecord.q.hostname==nodename, - orderBy='date_checked').reversed()[0] + noderec = FindbadNodeRecord.query.filter(FindbadNodeRecord.hostname==nodename).order_by(FindbadNodeRecord.date_checked.desc()).first() + print "NODEREC: ", noderec.date_checked except: - # or create an empty one. - noderec = FindbadNodeRecord(hostname=nodename) + print "COULD NOT FIND %s" % nodename + import traceback + print traceback.print_exc() + continue node_state = noderec.observed_status if noderec.plc_node_stats: @@ -86,10 +78,15 @@ def checkAndRecordState(l_nodes, l_plcnodes): count += 1 print "%d %35s %s since(%s)" % (count, nodename, pf.status, diff_time(time.mktime(pf.last_changed.timetuple()))) + # NOTE: this commits all pending operations to the DB. Do not remove, or + # replace with another operations that also commits all pending ops, such + # as session.commit() or flush() or something + print HistoryNodeRecord.query.count() + return True if __name__ == '__main__': - import parser as parsermodule + from monitor import parser as parsermodule parser = parsermodule.getParser(['nodesets']) parser.set_defaults(filename=None, node=None, nodeselect=False, nodegroup=None, cachenodes=False) parser = parsermodule.getParser(['defaults'], parser) diff --git a/nodecommon.py b/nodecommon.py index f318949..a3117d8 100644 --- a/nodecommon.py +++ b/nodecommon.py @@ -1,12 +1,15 @@ -import struct -import reboot import time +import struct +from monitor.pcu import reboot + from monitor import util -import plc -from datetime import datetime from monitor import database +from monitor.wrapper import plc + +from datetime import datetime from unified_model import PersistFlags + esc = struct.pack('i', 27) RED = esc + "[1;31m" GREEN = esc + "[1;32m" diff --git a/nodequery.py b/nodequery.py index fa46423..5e182e1 100755 --- a/nodequery.py +++ b/nodequery.py @@ -2,19 +2,19 @@ import sys -import database +from monitor import database from nodecommon import * from unified_model import Record import glob import os -import reboot import traceback import time import re import string -import plc +from monitor.pcu import reboot +from monitor.wrapper import plc api = plc.getAuthAPI() from monitor.database import FindbadNodeRecord, FindbadNodeRecordSync @@ -303,30 +303,31 @@ def node_select(str_query, nodelist=None, fbdb=None): if fbdb is not None: fb = fbdb - for node in fb['nodes'].keys(): - if nodelist is not None: - if node not in nodelist: continue + for node in nodelist: + #if nodelist is not None: + # if node not in nodelist: continue try: - fb_noderec = FindbadNodeRecord.select(FindbadNodeRecord.q.hostname==node, - orderBy='date_checked').reversed()[0] + fb_noderec = None + fb_noderec = FindbadNodeRecord.query.filter(FindbadNodeRecord.hostname==node).order_by(FindbadNodeRecord.date_checked.desc()).first() except: + print traceback.print_exc() continue - - fb_nodeinfo = fb_noderec.toDict() + if fb_noderec: + fb_nodeinfo = fb_noderec.to_dict() - #fb_nodeinfo['pcu'] = color_pcu_state(fb_nodeinfo) - #if 'plcnode' in fb_nodeinfo: - # fb_nodeinfo.update(fb_nodeinfo['plcnode']) + #fb_nodeinfo['pcu'] = color_pcu_state(fb_nodeinfo) + #if 'plcnode' in fb_nodeinfo: + # fb_nodeinfo.update(fb_nodeinfo['plcnode']) - #if verifyDBrecord(dict_query, fb_nodeinfo): - if verify(dict_query, fb_nodeinfo): - #print node #fb_nodeinfo - hostnames.append(node) - else: - #print "NO MATCH", node - pass + #if verifyDBrecord(dict_query, fb_nodeinfo): + if verify(dict_query, fb_nodeinfo): + #print node #fb_nodeinfo + hostnames.append(node) + else: + #print "NO MATCH", node + pass return hostnames @@ -335,7 +336,7 @@ def main(): global fb global fbpcu - import parser as parsermodule + from monitor import parser as parsermodule parser = parsermodule.getParser() parser.set_defaults(node=None, fromtime=None, select=None, list=None, @@ -370,8 +371,9 @@ def main(): os.chdir("..") fb = archive.load(file[:-4]) else: - fbnodes = FindbadNodeRecord.select(FindbadNodeRecord.q.hostname, orderBy='date_checked',distinct=True).reversed() - fb = database.dbLoad("findbad") + #fbnodes = FindbadNodeRecord.select(FindbadNodeRecord.q.hostname, orderBy='date_checked',distinct=True).reversed() + #fb = database.dbLoad("findbad") + fb = None fbpcu = database.dbLoad("findbadpcus") reboot.fb = fbpcu @@ -379,7 +381,11 @@ def main(): if config.nodelist: nodelist = util.file.getListFromFile(config.nodelist) else: - nodelist = fb['nodes'].keys() + # NOTE: list of nodes should come from findbad db. Otherwise, we + # don't know for sure that there's a record in the db.. + plcnodes = database.dbLoad("l_plcnodes") + nodelist = [ node['hostname'] for node in plcnodes ] + #nodelist = ['planetlab-1.cs.princeton.edu'] pculist = None if config.select is not None and config.pcuselect is not None: @@ -397,13 +403,12 @@ def main(): for node in nodelist: config.node = node - if node not in fb['nodes']: + if node not in nodelist: continue try: # Find the most recent record - fb_noderec = FindbadNodeRecord.select(FindbadNodeRecord.q.hostname==node, - orderBy='date_checked').reversed()[0] + fb_noderec = FindbadNodeRecord.query.filter(FindbadNodeRecord.hostname==node).order_by(FindbadNodeRecord.date_checked.desc()).first() except: print traceback.print_exc() pass #fb_nodeinfo = fb['nodes'][node]['values'] @@ -414,7 +419,7 @@ def main(): if config.daysdown: daysdown_print_nodeinfo(fb_nodeinfo, node) else: - fb_nodeinfo = fb_noderec.toDict() + fb_nodeinfo = fb_noderec.to_dict() if config.select: if config.fields: fields = config.fields.split(",") diff --git a/pcubad.py b/pcubad.py index 38cf897..1fd3371 100755 --- a/pcubad.py +++ b/pcubad.py @@ -4,30 +4,22 @@ import os import sys import string import time - -from reboot import pcu_name - -import database -import comon -import threadpool -import syncplcdb -from nodequery import verify,query_to_dict,node_select -import parser as parsermodule -from nodecommon import * from datetime import datetime,timedelta -import config -from sqlobject import connectionForURI,sqlhub -connection = connectionForURI(config.sqlobjecturi) -sqlhub.processConnection = connection -from infovacuum.model_findbadrecord import * -from infovacuum.model_historyrecord import * +from monitor import database +from monitor.pcu import reboot +from monitor import parser as parsermodule +from monitor import config +from monitor.database import HistoryPCURecord, FindbadPCURecord +from monitor.wrapper import plc +from monitor.const import MINUP -import plc -api = plc.getAuthAPI() +from nodecommon import * +from nodequery import verify,query_to_dict,node_select +import syncplcdb from unified_model import * -from const import MINUP +api = plc.getAuthAPI() def main(config): @@ -61,18 +53,17 @@ def checkAndRecordState(l_pcus, l_plcpcus): if not d_pcu: continue - try: - pf = HistoryPCURecord.by_pcuid(d_pcu['pcu_id']) - except: - pf = HistoryPCURecord(plc_pcuid=pcuname) - + pf = HistoryPCURecord.findby_or_create(plc_pcuid=d_pcu['pcu_id']) pf.last_checked = datetime.now() try: # Find the most recent record - pcurec = FindbadPCURecord.select(FindbadPCURecord.q.plc_pcuid==pcuname, - orderBy='date_checked').reversed()[0] + pcurec = FindbadPCURecord.query.filter(FindbadPCURecord.plc_pcuid==pcuname).order_by(FindbadPCURecord.date_checked.desc()).first() + print "NODEREC: ", pcurec.date_checked except: + print "COULD NOT FIND FB record for %s" % reboot.pcu_name(pcu) + import traceback + print traceback.print_exc() # don't have the info to create a new entry right now, so continue. continue @@ -97,7 +88,12 @@ def checkAndRecordState(l_pcus, l_plcpcus): pf.status = "error" count += 1 - print "%d %35s %s since(%s)" % (count, pcu_name(d_pcu), pf.status, diff_time(time.mktime(pf.last_changed.timetuple()))) + print "%d %35s %s since(%s)" % (count, reboot.pcu_name(d_pcu), pf.status, diff_time(time.mktime(pf.last_changed.timetuple()))) + + # NOTE: this commits all pending operations to the DB. Do not remove, or + # replace with another operations that also commits all pending ops, such + # as session.commit() or flush() or something + print HistoryPCURecord.query.count() return True diff --git a/syncplcdb.py b/syncplcdb.py index de6a474..778ec55 100755 --- a/syncplcdb.py +++ b/syncplcdb.py @@ -1,9 +1,9 @@ #!/usr/bin/python -import plc -import config -import database import sys +from monitor.wrapper import plc +from monitor import database +from monitor import config def dsites_from_lsites(l_sites): d_sites = {} diff --git a/testapi.py b/testapi.py index 5e7daa8..7253fe3 100755 --- a/testapi.py +++ b/testapi.py @@ -1,9 +1,10 @@ #!/usr/bin/python -import plc import sys import traceback +from monitor.wrapper import plc + api = plc.getAuthAPI() loginbase = sys.argv[1] # "princeton" diff --git a/unified_model.py b/unified_model.py index 844ae5b..f070a59 100755 --- a/unified_model.py +++ b/unified_model.py @@ -2,14 +2,14 @@ from monitor import database -import plc -import mailer +from monitor.wrapper import plc +from monitor.wrapper import mailer import time from model import * -from const import * +from monitor.const import * from monitor import util -import config +from monitor import config def gethostlist(hostlist_file): return util.file.getListFromFile(hostlist_file)