-#
-# Copyright (c) 2004 The Trustees of Princeton University (Trustees).
-#
-# Faiyaz Ahmed <faiyaza@cs.princeton.edu>
-#
-# $Id: policy.py,v 1.13 2007/05/16 01:53:46 faiyaza Exp $
-#
-# Policy Engine.
-
-#from monitor import *
-from threading import *
+#!/usr/bin/python
+
+# This script is used to manipulate the operational state of nodes in
+# different node groups. These are basically set operations on nodes via the
+# PLC api.
+#
+# Take the ng name as an argument....
+# optionally,
+# * get a list of nodes in the given nodegroup.
+# * set some or all in the set to rins.
+# * restart them all.
+# * do something else to them all.
+#
+
+import os
import time
-import logging
-import mailer
-import emailTxt
-import pickle
-import Queue
-import plc
+import traceback
import sys
-import reboot
-import soltesz
-import string
-from config import config
-config = config()
-
-DAT="./monitor.dat"
-
-logger = logging.getLogger("monitor")
-
-# Time to enforce policy
-POLSLEEP = 7200
-
-# Where to email the summary
-SUMTO = "soltesz@cs.princeton.edu"
-TECHEMAIL="tech-%s@sites.planet-lab.org"
-PIEMAIL="pi-%s@sites.planet-lab.org"
-SLICEMAIL="%s@slices.planet-lab.org"
-PLCEMAIL="support@planet-lab.org"
-
-#Thresholds (DAYS)
-SPERDAY = 86400
-PITHRESH = 7 * SPERDAY
-SLICETHRESH = 7 * SPERDAY
-# Days before attempting rins again
-RINSTHRESH = 5 * SPERDAY
-
-# Days before calling the node dead.
-DEADTHRESH = 30 * SPERDAY
-# Minimum number of nodes up before squeezing
-MINUP = 2
-
-TECH=1
-PI=2
-USER=4
-
-# IF:
-# no SSH, down.
-# bad disk, down
-# DNS, kinda down (sick)
-# clock, kinda down (sick)
-# Full disk, going to be down
-
-# Actions:
-# Email
-# suspend slice creation
-# kill slices
-
-class PLC: pass
-
-class Policy(Thread):
- def __init__(self, comonthread, sickNoTicket, emailed):
- self.comon = comonthread
-
- # the hostname to loginbase mapping
- self.plcdb_hn2lb = soltesz.dbLoad("plcdb_hn2lb")
-
- # Actions taken on nodes.
- self.cache_all = soltesz.if_cached_else(1, "act_all", lambda : {})
- self.act_all= soltesz.if_cached_else(1, "act_all", lambda : {})
-
- # A dict of actions to specific functions. PICKLE doesnt' like lambdas.
- self.actions = {}
- self.actions['suspendslices'] = lambda hn: plc.suspendSlices(hn)
- self.actions['nocreate'] = lambda hn: plc.removeSliceCreation(hn);
- self.actions['rins'] = lambda hn: plc.nodeBootState(hn, "rins")
- self.actions['noop'] = lambda hn: hn
-
- self.bootcds = soltesz.dbLoad("bootcds")
- self.emailed = emailed # host - > (time of email, type of email)
-
- # all sick nodes w/o tickets
- # from thread
- self.sickNoTicket = sickNoTicket
-
-
- # sick nodes with no tickets
- # sickdb{loginbase: [{hostname1: [buckets]}, {...}]}
- self.sickdb = {}
- Thread.__init__(self)
-
- def mergePreviousActions(self):
- """
- look at the sick node_records as reported by comon, and then look at the
- node_records in act_all. There are four cases:
- 1) problem in comon but not in act_all
- this ok, b/c it just means it's a new problem
- 2) problem in comon and in act_all
- we need to figure out the mis-match. Did the problem get better
- or worse? Reset the stage clock to 'initial', if it's better,
- continue if it's gotten worse. Hard to make this judgement here, though.
- 3) no problem in comon, problem in act_all
- this may mean that the node is operational again, or that monitor
- knows how to define a problem that comon does not. For now, if
- comon does not report a problem, monitor obeys. Ultimately,
- however, we want to catch problems that comon can't see.
- 4) no problem in comon, no problem in act_all
- there won't be a record in either db, so there's no code.
-
- TODO: this is where back-offs will be acknowledged. If the nodes get
- better, it should be possible to 're-enable' the site, or slice, etc.
- """
- sorted_sites = self.sickdb.keys()
- sorted_sites.sort()
- # look at all problems reported by comon
- for loginbase in sorted_sites:
- rec_nodedict = self.sickdb[loginbase]
- sorted_nodes = rec_nodedict.keys()
- sorted_nodes.sort()
- #for rec_node in rec_nodelist:
- for nodename in sorted_nodes:
- rec_node = rec_nodedict[nodename]
- hn = nodename
- x = self.sickdb[loginbase][hn]
- if hn in self.act_all:
- y = self.act_all[hn][0]
- if x['bucket'][0] != y['bucket'][0]:
- # 2a) mismatch, need a policy for how to resolve
- print "COMON and MONITOR have a mismatch: %s vs %s" % \
- (x['bucket'], y['bucket'])
- else:
- # 2b) ok, b/c they agree that there's still a problem..
- pass
-
- # for now, overwrite the comon entry for the one in act_all
- self.sickdb[loginbase][hn] = y
- # delete the entry from cache_all to keep it out of case 3)
- del self.cache_all[hn]
- else:
- # 1) ok, b/c it's a new problem.
- pass
-
- # 3) nodes that remin in cache_all were not identified by comon as
- # down. Do we keep them or not?
- for hn in self.cache_all.keys():
- y = self.act_all[hn][0]
- if 'monitor' in y['bucket']:
- loginbase = self.plcdb_hn2lb[hn]
- if loginbase not in self.sickdb:
- self.sickdb[loginbase] = {}
- self.sickdb[loginbase][hn] = y
- else:
- del self.cache_all[hn]
-
- print "len of cache_all: %d" % len(self.cache_all.keys())
+from optparse import OptionParser
+
+import bootman # debug nodes
+
+from monitor import util
+from monitor import const
+from monitor import reboot
+from monitor import config
+from monitor import database
+from monitor import parser as parsermodule
+from monitor.common import *
+from monitor.model import *
+from monitor.wrapper import plc
+from monitor.wrapper import plccache
+from monitor.wrapper.emailTxt import mailtxt
+from monitor.database.info.model import *
+
+from nodequery import verify,query_to_dict,node_select
+
+api = plc.getAuthAPI()
+
+
+class SiteInterface(HistorySiteRecord):
+ @classmethod
+ def get_or_make(cls, if_new_set={}, **kwargs):
+ if 'hostname' in kwargs:
+ kwargs['loginbase'] = plccache.plcdb_hn2lb[kwargs['hostname']]
+ del kwargs['hostname']
+ res = HistorySiteRecord.findby_or_create(if_new_set, **kwargs)
+ return SiteInterface(res)
+
+ def __init__(self, sitehist):
+ self.db = sitehist
+
+ def getRecentActions(self, **kwargs):
+ # TODO: make query only return records within a certin time range,
+ # i.e. greater than 0.5 days ago. or 5 days, etc.
+
+ #print "kwargs: ", kwargs
+
+ recent_actions = []
+ if 'loginbase' in kwargs:
+ recent_actions = ActionRecord.query.filter_by(loginbase=kwargs['loginbase']).order_by(ActionRecord.date_created.desc())
+ elif 'hostname' in kwargs:
+ recent_actions = ActionRecord.query.filter_by(hostname=kwargs['hostname']).order_by(ActionRecord.date_created.desc())
+ return recent_actions
+
+ def increasePenalty(self):
+ #act = ActionRecord(loginbase=self.db.loginbase, action='penalty', action_type='increase_penalty',)
+ self.db.penalty_level += 1
+ # NOTE: this is to prevent overflow or index errors in applyPenalty.
+ # there's probably a better approach to this.
+ if self.db.penalty_level >= 2:
+ self.db.penalty_level = 2
+ self.db.penalty_applied = True
+
+ def applyPenalty(self):
+ penalty_map = []
+ penalty_map.append( { 'name': 'noop', 'enable' : lambda site: None,
+ 'disable' : lambda site: None } )
+ penalty_map.append( { 'name': 'nocreate', 'enable' : lambda site: plc.removeSiteSliceCreation(site),
+ 'disable' : lambda site: plc.enableSiteSliceCreation(site) } )
+ penalty_map.append( { 'name': 'suspendslices', 'enable' : lambda site: plc.suspendSiteSlices(site),
+ 'disable' : lambda site: plc.enableSiteSlices(site) } )
+
+ for i in range(len(penalty_map)-1,self.db.penalty_level,-1):
+ print "\tdisabling %s on %s" % (penalty_map[i]['name'], self.db.loginbase)
+ penalty_map[i]['disable'](self.db.loginbase)
+
+ for i in range(0,self.db.penalty_level+1):
+ print "\tapplying %s on %s" % (penalty_map[i]['name'], self.db.loginbase)
+ penalty_map[i]['enable'](self.db.loginbase)
return
- def accumSickSites(self):
- """
- Take all sick nodes, find their sites, and put in
- sickdb[loginbase] = [diag_node1, diag_node2, ...]
- """
- while 1:
- diag_node = self.sickNoTicket.get(block = True)
- if diag_node == "None":
- break
-
- #for bucket in self.comon.comon_buckets.keys():
- # if (hostname in getattr(self.comon, bucket)):
- # buckets_per_node.append(bucket)
-
- #########################################################
- # TODO: this will break with more than one comon bucket!!
- nodename = diag_node['nodename']
- loginbase = self.plcdb_hn2lb[nodename] # plc.siteId(node)
-
- if loginbase not in self.sickdb:
- self.sickdb[loginbase] = {}
- #self.sickdb[loginbase][nodename] = []
- #else:
- #if nodename not in self.sickdb[loginbase]:
- # self.sickdb[loginbase][nodename] = []
-
- #self.sickdb[loginbase][nodename].append(diag_node)
- self.sickdb[loginbase][nodename] = diag_node
- # TODO: this will break with more than one comon bucket!!
- #########################################################
-
-
- def __actOnDebug(self, node):
- """
- If in debug, set the node to rins, reboot via PCU/POD
- """
- daysdown = self.comon.codata[node]['sshstatus'] // (60*60*24)
- logger.info("POLICY: Node %s in dbg. down for %s" %(node,daysdown))
- plc.nodeBootState(node, "rins")
- # TODO: only reboot if BootCD > 3.0
- # if bootcd[node] > 3.0:
- # if NODE_KEY in planet.cnf:
- # plc.nodeBootState(node, "rins")
- # reboot.reboot(node)
- # else:
- # email to update planet.cnf file
-
- # If it has a PCU
- reboot.reboot(node)
- # else:
- # email upgrade bootcd message, and treat as down.
- # Log it
- self.actionlogdb[node] = ['rins', daysdown, time.time()]
-
- def __emailSite(self, loginbase, roles, message, args):
- """
- loginbase is the unique site abbreviation, prepended to slice names.
- roles contains TECH, PI, USER roles, and derive email aliases.
- record contains {'message': [<subj>,<body>], 'args': {...}}
- """
- args.update({'loginbase':loginbase})
- # build targets
+ def pausePenalty(self):
+ act = ActionRecord(loginbase=self.db.loginbase,
+ action='penalty',
+ action_type='pause_penalty',)
+
+ def clearPenalty(self):
+ #act = ActionRecord(loginbase=self.db.loginbase, action='penalty', action_type='clear_penalty',)
+ self.db.penalty_level = 0
+ self.db.penalty_applied = False
+
+ def getTicketStatus(self):
+ if self.db.message_id != 0:
+ rtstatus = mailer.getTicketStatus(self.db.message_id)
+ self.db.message_status = rtstatus['Status']
+ self.db.message_queue = rtstatus['Queue']
+ self.db.message_created = datetime.fromtimestamp(rtstatus['Created'])
+
+ def setTicketStatus(self, status):
+ print 'SETTING status %s' % status
+ if self.db.message_id != 0:
+ rtstatus = mailer.setTicketStatus(self.db.message_id, status)
+
+ def getContacts(self):
contacts = []
- if TECH & roles:
- contacts += [TECHEMAIL % loginbase]
- elif PI & roles:
- contacts += [PIEMAIL % loginbase]
- elif USER & roles:
- slices = plc.slices(loginbase)
- if len(slices) >= 1:
- for slice in slices:
- contacts += [SLICEMAIL % slice]
- else:
- print "Received no slices for site: %s" % loginbase
+ if self.db.penalty_level >= 0:
+ contacts += plc.getTechEmails(self.db.loginbase)
- try:
- subject = message[0] % args
- body = message[1] % args
- mailer.email(subject, body, contacts)
- except Exception, err:
- print "exception on message:"
- print message
+ if self.db.penalty_level >= 1:
+ contacts += plc.getPIEmails(self.db.loginbase)
- return
+ if self.db.penalty_level >= 2:
+ contacts += plc.getSliceUserEmails(self.db.loginbase)
- def format_diaginfo(self, diag_node):
- info = diag_node['info']
- hlist = " %s %s %s\n" % (info[0], info[2], info[1]) # (node, version, daysdown)
- return hlist
-
- def __actOnSite(self, loginbase, rec_diaglist):
- i_nodes_actedon = 0
- i_nodes_emailed = 0
- b_squeeze = config.squeeze
-
- action_argslist = []
- for diag_node in rec_diaglist:
- #print "calling actOnNode(%s)" % diag_node['nodename']
- action_args = self.__actOnNode(diag_node)
- action_argslist += [action_args]
-
- #print "getSiteNodes(%s)" % loginbase
- nodelist = plc.getSiteNodes(loginbase)
- if len(nodelist) - len(action_argslist) < 2:
- print "SITE: %20s : < 2 nodes !!" % loginbase
- # TODO: check how long this has occurred.
- # then plc.removeSliceCreation(nodename)
- # There may be a similar act_1,act_2,wait db for sites?
- else:
- #print "SITE: goodNodesUp(%s) > 2 && %d bad" % \
- # (loginbase, len(action_argslist))
- b_squeeze = False
-
- # create 'args' for email
- #print "Create email args..."
- email_args = {}
- email_args['hostname_list'] = ""
- for action_args in action_argslist:
- email_args['hostname_list'] += action_args['msg_format']
- email_args['hostname'] = action_args['nodename']
-
- # Send email, perform node action
- # TODO: only send one email per site for a given problem...
- if len(action_argslist) > 0:
- action_args = action_argslist[0]
- #for action_args in action_argslist:
- # TODO: perform the most severe action?
- if b_squeeze:
- act_key = action_args['action']
- self.actions[act_key](email_args['hostname'])
- i_nodes_actedon += 1
- #print "Send email..."
- if action_args['message'] != None:
- self.__emailSite(loginbase, action_args['email'],
- action_args['message'], email_args)
- if config.mail: i_nodes_emailed += 1
+ return contacts
+
+ def sendMessage(self, type, **kwargs):
+
+ # NOTE: evidently changing an RT message's subject opens the ticket.
+ # the logic in this policy depends up a ticket only being 'open'
+ # if a user has replied to it.
+ # So, to preserve these semantics, we check the status before
+ # sending, then after sending, reset the status to the
+ # previous status.
+ # There is a very tiny race here, where a user sends a reply
+ # within the time it takes to check, send, and reset.
+ # This sucks. It's almost certainly fragile.
+
+ #
+ # TODO: catch any errors here, and add an ActionRecord that contains
+ # those errors.
- return (i_nodes_actedon, i_nodes_emailed)
-
- def __actOnNode(self, diag_node):
- nodename = diag_node['nodename']
- message = diag_node['message']
- info = diag_node['info']
- args = {}
-
- # TODO: a node should only be in one category, right?
- # - This is a constraint that should be enforced. It may be possible
- # for a node to fall into the wrong set.
- # - Also, it is necessary to remove a node from an action set, if it
- # comes back up, or enters another state between checks.
- # TODO: check that the reason a node ends up in a 'bad' state has or
- # hasn't changed. If it's changed, then probably the process should
- # start over, or at leat be acknowledged. I'm not sure that this is
- # the right place for this operation.
-
- args['nodename'] = nodename
- args['msg_format'] = self.format_diaginfo(diag_node)
- current_time = time.time()
-
- #k1 = self.act_1week.keys()
- #k2 = self.act_2weeks.keys()
- #k3 = self.act_waitforever.keys()
- #print "lengths: %d %d %d" % (len(k1), len(k2), len(k3))
-
- delta = current_time - diag_node['time']
-
- if 'waitforever' in diag_node['stage']:
- # TODO: define what to do in the 'forever' state
- # TODO: there should probably be a periodic email sent after this,
- # to the site, or to us...
- args['action'] = 'noop'
- args['message'] = None
-
- elif 'actintwoweeks' in diag_node['stage'] or delta >= 14 * SPERDAY:
- #nodename in self.act_2weeks:
- args['email'] = TECH | PI | USER
- args['action'] = 'suspendslices'
- args['message'] = message[2]
- args['stage'] = 'stage_waitforever'
- # TODO: This will lose original 'time'
- diag_node.update(args)
-
- elif 'actinoneweek' in diag_node['stage'] or delta >= 7 * SPERDAY:
- # nodename in self.act_1week:
- args['email'] = TECH | PI
-
- args['action'] = 'nocreate'
- # args['action'] = 'rins'
- args['message'] = message[1]
- args['stage'] = 'stage_actintwoweeks'
- diag_node.update(args)
+ args = {'loginbase' : self.db.loginbase, 'penalty_level' : self.db.penalty_level}
+ args.update(kwargs)
+
+ hostname = None
+ if 'hostname' in args:
+ hostname = args['hostname']
+
+ if hasattr(mailtxt, type):
+
+ message = getattr(mailtxt, type)
+ viart = True
+ if 'viart' in kwargs:
+ viart = kwargs['viart']
+
+ if viart:
+ self.getTicketStatus() # get current message status
+
+ m = Message(message[0] % args, message[1] % args, viart, self.db.message_id)
+
+ contacts = self.getContacts()
+ contacts = [config.cc_email] # TODO: remove after testing...
+
+ print "sending message: %s to site %s for host %s" % (type, self.db.loginbase, hostname)
+
+ ret = m.send(contacts)
+ if viart:
+ self.db.message_id = ret
+ # reset to previous status, since a new subject 'opens' RT tickets.
+ self.setTicketStatus(self.db.message_status)
+
+ # NOTE: only make a record of it if it's in RT.
+ act = ActionRecord(loginbase=self.db.loginbase, hostname=hostname, action='notice',
+ action_type=type, message_id=self.db.message_id)
else:
- # the node is bad, but there's no previous record of it.
- args['email'] = TECH
- args['action'] = 'noop'
- args['message'] = message[0]
- args['stage'] = 'stage_actinoneweek'
- diag_node.update(args)
-
- print "%s" % diag_node['log'],
- print "%15s" % args['action']
-
- if nodename not in self.act_all: self.act_all[nodename] = []
- self.act_all[nodename].insert(0,diag_node)
-
- return args
-
- def lappend_once(list, element):
- if element not in list:
- list.append(element)
- def sappend_once(string, element, separator=','):
- if element not in string:
- return ("%s%c%s" % (string, separator, element),1)
- else:
- return (string,0)
-
- def analyseSites(self):
- i_sites = 0
- i_sites_diagnosed = 0
- i_nodes_diagnosed = 0
- i_nodes_actedon = 0
- i_sites_emailed = 0
- l_allsites = []
-
- sorted_sites = self.sickdb.keys()
- sorted_sites.sort()
- for loginbase in sorted_sites:
- rec_nodedict = self.sickdb[loginbase]
- #print "calling diagnoseSite(%s)" % loginbase
- rec_diaglist = self.__diagnoseSite(loginbase, rec_nodedict)
- l_allsites += [loginbase]
-
-
- if len(rec_diaglist) > 0:
- i_nodes_diagnosed += len(rec_diaglist)
- i_sites_diagnosed += 1
-
- #print "calling actOnSite(%s)" % loginbase
- (na,ne) = self.__actOnSite(loginbase, rec_diaglist)
-
- i_sites += 1
- i_nodes_actedon += na
- i_sites_emailed += ne
-
- return {'sites': i_sites,
- 'sites_diagnosed': i_sites_diagnosed,
- 'nodes_diagnosed': i_nodes_diagnosed,
- 'sites_emailed': i_sites_emailed,
- 'nodes_actedon': i_nodes_actedon,
- 'allsites':l_allsites}
-
-
- def __diagnoseSite(self, loginbase, rec_nodedict):
- """
- rec_sitelist is a sickdb entry:
- """
- diag_list = []
- sorted_nodes = rec_nodedict.keys()
- sorted_nodes.sort()
- for nodename in sorted_nodes:
- rec_node = rec_nodedict[nodename]
- diag_node = self.__diagnoseNode(loginbase, rec_node)
- if diag_node != None:
- diag_list += [ diag_node ]
- return diag_list
-
- def __getDaysDown(self, nodename):
- daysdown = -1
- if self.comon.codata[nodename]['sshstatus'] != "null":
- daysdown = int(self.comon.codata[nodename]['sshstatus']) // (60*60*24)
- return daysdown
-
- def __getStrDaysDown(self, nodename):
- daysdown = self.__getDaysDown(nodename)
- if daysdown > 0:
- return "(%d days down)"%daysdown
- else:
- return ""
-
- def __getCDVersion(self, nodename):
- cdversion = ""
- if nodename in self.bootcds:
- cdversion = self.bootcds[nodename]
- return cdversion
-
- def __diagnoseNode(self, loginbase, rec_node):
- # TODO: change the format of the hostname in this
- # record to something more natural.
- nodename = rec_node['nodename']
- buckets = rec_node['bucket']
- diag_record = None
-
- # xyz as determined by monitor
- # down as determined by comon
- if rec_node['stage'] == "stage_rt_working":
- # err, this can be used as a counter of some kind..
- # but otherwise, no diagnosis is necessary, return None, implies that
- # it gets skipped.
- print "DIAG: %20s : %-40s ticket %d" % \
- (loginbase, nodename, rec_node['ticket_id'])
-
- elif "down" in buckets:
- diag_record = {}
- diag_record.update(rec_node)
- diag_record['nodename'] = nodename
- diag_record['message'] = emailTxt.mailtxt.newdown
- diag_record['args'] = {'nodename': nodename}
- s_daysdown = self.__getStrDaysDown(nodename)
- diag_record['info'] = (nodename, s_daysdown, "")
- diag_record['bucket'] = ["down"]
- diag_record['log'] = "DOWN: %20s : %-40s == %20s" % \
- (loginbase, nodename, diag_record['info']),
-
- elif "dbg" in buckets:
- # V2 boot cds as determined by monitor
- s_daysdown = self.__getStrDaysDown(nodename)
- s_cdversion = self.__getCDVersion(nodename)
- diag_record = {}
- diag_record.update(rec_node)
- diag_record['nodename'] = nodename
- diag_record['info'] = (nodename, s_daysdown, s_cdversion)
-
- if nodename in self.bootcds and "v2" in self.bootcds[nodename]:
- diag_record['log'] = "BTCD: %20s : %-40s == %20s" % \
- (loginbase, nodename, self.bootcds[nodename]),
- diag_record['message'] = emailTxt.mailtxt.newbootcd
- diag_record['args'] = {'nodename': nodename}
- # TODO: figure a better 'bucket' scheme, for merge()
- #diag_record['bucket'] = ["monitor"]
- else:
- print "DEBG: %20s : %-40s" % \
- (loginbase, nodename)
- return None
-
- msg = ("dbg mode",
- "Comon reports the node in debug mode, %s" % \
- "but monitor does not know what to do yet.")
- # TODO: replace with a real action
- diag_record['message'] = [msg, msg, msg]
- diag_record['bucket'] = ["dbg"]
- diag_record['args'] = {'nodename': nodename}
- elif "ssh" in buckets:
- pass
- elif "clock_drift" in buckets:
- pass
- elif "dns" in buckets:
- pass
- elif "filerw" in buckets:
- pass
- else:
- print "Unknown buckets!!!! %s" % buckets
- sys.exit(1)
-
- return diag_record
-
-
- def __actOnFilerw(self, node):
- """
- Report to PLC when node needs disk checked.
- """
- target = [PLCEMAIL]
- logger.info("POLICY: Emailing PLC for " + node)
- tmp = emailTxt.mailtxt.filerw
- sbj = tmp[0] % {'hostname': node}
- msg = tmp[1] % {'hostname': node}
- mailer.email(sbj, msg, target)
- self.actionlogdb[node] = ["filerw", None, time.time()]
-
-
- def __actOnDNS(self, node):
- """
- """
-
-
- def __policy(self, node, loginbase, bucket):
- # ...and spam 'em
- target = [TECHEMAIL % loginbase]
- tmp = emailTxt.mailtxt.down
- sbj = tmp[0] % {'hostname': node}
- msg = tmp[1] % {'hostname': node, 'days': daysdown}
- mailer.email(sbj, msg, target)
-
-
- """
- Prints, logs, and emails status of up nodes, down nodes, and buckets.
- """
- def status(self):
- sub = "Monitor Summary"
- msg = "\nThe following nodes were acted upon: \n\n"
- for (node, (type, date)) in self.emailed.items():
- # Print only things acted on today.
- if (time.gmtime(time.time())[2] == time.gmtime(date)[2]):
- msg +="%s\t(%s)\t%s\n" %(node, type, time.ctime(date))
- msg +="\n\nThe following sites have been 'squeezed':\n\n"
- for (loginbase, (date, type)) in self.squeezed.items():
- # Print only things acted on today.
- if (time.gmtime(time.time())[2] == time.gmtime(date)[2]):
- msg +="%s\t(%s)\t%s\n" %(loginbase, type, time.ctime(date))
- mailer.email(sub, msg, [SUMTO])
- logger.info(msg)
- return
-
- """
- Store/Load state of emails. When, where, what.
- """
- def emailedStore(self, action):
+ print "+-- WARNING! ------------------------------"
+ print "| No such message name in emailTxt.mailtxt: %s" % type
+ print "+------------------------------------------"
+
+ return
+
+ def closeTicket(self):
+ # TODO: close the rt ticket before overwriting the message_id
+ mailer.closeTicketViaRT(self.db.message_id, "Ticket Closed by Monitor")
+ act = ActionRecord(loginbase=self.db.loginbase, action='notice',
+ action_type='end_notice', message_id=self.db.message_id)
+ self.db.message_id = 0
+ self.db.message_status = "new"
+
+ def runBootManager(self, hostname):
+ print "attempting BM reboot of %s" % hostname
+ ret = ""
+ try:
+ ret = bootman.restore(self, hostname)
+ err = ""
+ except:
+ err = traceback.format_exc()
+ print err
+
+ act = ActionRecord(loginbase=self.db.loginbase,
+ hostname=hostname,
+ action='reboot',
+ action_type='bootmanager_restore',
+ error_string=err)
+ return ret
+
+ def attemptReboot(self, hostname):
+ print "attempting PCU reboot of %s" % hostname
+ ret = reboot.reboot_str(hostname)
+ if ret == 0 or ret == "0":
+ ret = ""
+ act = ActionRecord(loginbase=self.db.loginbase,
+ hostname=hostname,
+ action='reboot',
+ action_type='first_try_reboot',
+ error_string=ret)
+
+def logic():
+
+ plc.nodeBootState(host, 'rins')
+ node_end_record(host)
+
+
+
+
+def main(hostnames, sitenames):
+ # commands:
+ i = 1
+ node_count = 1
+ site_count = 1
+ #print "hosts: %s" % hostnames
+ for host in hostnames:
try:
- if action == "LOAD":
- f = open(DAT, "r+")
- logger.info("POLICY: Found and reading " + DAT)
- self.emailed.update(pickle.load(f))
- if action == "WRITE":
- f = open(DAT, "w")
- #logger.debug("Writing " + DAT)
- pickle.dump(self.emailed, f)
- f.close()
- except Exception, err:
- logger.info("POLICY: Problem with DAT, %s" %err)
-
- """
- Returns True if more than MINUP nodes are up at a site.
- """
- def enoughUp(self, loginbase):
- allsitenodes = plc.getSiteNodes([loginbase])
- if len(allsitenodes) == 0:
- logger.info("Node not in db")
- return
-
- numnodes = len(allsitenodes)
- sicknodes = []
- # Get all sick nodes from comon
- for bucket in self.comon.comon_buckets.keys():
- for host in getattr(self.comon, bucket):
- sicknodes.append(host)
- # Diff.
- for node in allsitenodes:
- if node in sicknodes:
- numnodes -= 1
-
- if numnodes < MINUP:
- logger.info(\
-"POLICY: site with %s has nodes %s up." %(loginbase, numnodes))
- return False
- else:
- return True
+ lb = plccache.plcdb_hn2lb[host]
+ except:
+ print "unknown host in plcdb_hn2lb %s" % host
+ continue
+
+ nodeblack = BlacklistRecord.get_by(hostname=host)
+
+ if nodeblack and not nodeblack.expired():
+ print "skipping %s due to blacklist. will expire %s" % (host, nodeblack.willExpire() )
+ continue
+
+ sitehist = SiteInterface.get_or_make(loginbase=lb)
+
+ recent_actions = sitehist.getRecentActions(hostname=host)
+
+ nodehist = HistoryNodeRecord.findby_or_create(hostname=host)
+
+ print "%s %s" % ( nodehist.hostname, nodehist.status)
+ if nodehist.status == 'good' and \
+ changed_lessthan(nodehist.last_changed, 1.0) and \
+ not found_within(recent_actions, 'online_notice', 0.5):
+ # NOTE: there is a narrow window in which this command must be
+ # evaluated, otherwise the notice will not go out. this is not ideal.
+ sitehist.sendMessage('online_notice', hostname=host)
+ print "send message for host %s online" % host
+
+ pass
+
+ if ( nodehist.status == 'offline' or nodehist.status == 'down' ) and \
+ changed_greaterthan(nodehist.last_changed,1.0) and \
+ not found_between(recent_actions, 'first_try_reboot', 3.5, 1):
+
+ sitehist.attemptReboot(host)
+ print "send message for host %s first_try_reboot" % host
+ pass
+
+ # NOTE: non-intuitive is that found_between(first_try_reboot, 3.5, 1)
+ # will be false for a day after the above condition is satisfied
+ if ( nodehist.status == 'offline' or nodehist.status == 'down' ) and \
+ changed_greaterthan(nodehist.last_changed,1.5) and \
+ found_between(recent_actions, 'first_try_reboot', 3.5, 1) and \
+ not found_within(recent_actions, 'pcufailed_notice', 3.5):
+ # found_within(recent_actions, 'first_try_reboot', 3.5) and \
+
+ # send pcu failure message
+ #act = ActionRecord(**kwargs)
+ sitehist.sendMessage('pcufailed_notice', hostname=host)
+ print "send message for host %s PCU Failure" % host
+ pass
+
+ if nodehist.status == 'monitordebug' and \
+ changed_greaterthan(nodehist.last_changed, 1) and \
+ not found_between(recent_actions, 'bootmanager_restore', 0.5, 0):
+ # send down node notice
+ # delay 0.5 days before retrying...
+
+ print "send message for host %s bootmanager_restore" % host
+ sitehist.runBootManager(host)
+ # sitehist.sendMessage('retry_bootman', hostname=host)
+
+ if nodehist.status == 'down' and \
+ changed_greaterthan(nodehist.last_changed, 2) and \
+ not found_within(recent_actions, 'down_notice', 3.5):
+ # send down node notice
+
+ sitehist.sendMessage('down_notice', hostname=host)
+ print "send message for host %s offline" % host
+ pass
+
+ node_count = node_count + 1
+
+ for site in sitenames:
+ sitehist = SiteInterface.get_or_make(loginbase=site)
+ # TODO: make query only return records within a certin time range,
+ # i.e. greater than 0.5 days ago. or 5 days, etc.
+ recent_actions = sitehist.getRecentActions(loginbase=site)
+
+ #sitehist.sendMessage('test_notice', host)
+
+ print "%s %s" % ( sitehist.db.loginbase , sitehist.db.status)
+ if sitehist.db.status == 'down':
+ if not found_within(recent_actions, 'pause_penalty', 30) and \
+ not found_within(recent_actions, 'increase_penalty', 7) and \
+ changed_greaterthan(sitehist.db.last_changed, 7):
+
+ # TODO: catch errors
+ sitehist.increasePenalty()
+ #sitehist.applyPenalty()
+ sitehist.sendMessage('increase_penalty')
+
+ print "send message for site %s penalty increase" % site
+
+ if sitehist.db.status == 'good':
+ # clear penalty
+ # NOTE: because 'all clear' should have an indefinite status, we
+ # have a boolean value rather than a 'recent action'
+ if sitehist.db.penalty_applied:
+ # send message that penalties are cleared.
+
+ sitehist.clearPenalty()
+ #sitehist.applyPenalty()
+ sitehist.sendMessage('clear_penalty')
+ sitehist.closeTicket()
+
+ print "send message for site %s penalty cleared" % site
+
+ # find all ticket ids for site ( could be on the site record? )
+ # determine if there are penalties within the last 30 days?
+ # if so, add a 'pause_penalty' action.
+ if sitehist.db.message_id != 0 and sitehist.db.message_status == 'open' and sitehist.db.penalty_level > 0:
+ # pause escalation
+ print "Pausing penalties for %s" % site
+ sitehist.pausePenalty()
+
+ site_count = site_count + 1
+
+ session.flush()
+
+ return
+
+
+if __name__ == "__main__":
+ parser = parsermodule.getParser(['nodesets'])
+ parser.set_defaults( timewait=0,
+ skip=0,
+ rins=False,
+ reboot=False,
+ findbad=False,
+ force=False,
+ nosetup=False,
+ verbose=False,
+ quiet=False,
+ )
+
+ parser.add_option("", "--stopselect", dest="stopselect", metavar="",
+ help="The select string that must evaluate to true for the node to be considered 'done'")
+ parser.add_option("", "--findbad", dest="findbad", action="store_true",
+ help="Re-run findbad on the nodes we're going to check before acting.")
+ parser.add_option("", "--force", dest="force", action="store_true",
+ help="Force action regardless of previous actions/logs.")
+ parser.add_option("", "--rins", dest="rins", action="store_true",
+ help="Set the boot_state to 'rins' for all nodes.")
+ parser.add_option("", "--reboot", dest="reboot", action="store_true",
+ help="Actively try to reboot the nodes, keeping a log of actions.")
+
+ parser.add_option("", "--verbose", dest="verbose", action="store_true",
+ help="Extra debug output messages.")
+ parser.add_option("", "--nosetup", dest="nosetup", action="store_true",
+ help="Do not perform the orginary setup phase.")
+ parser.add_option("", "--skip", dest="skip",
+ help="Number of machines to skip on the input queue.")
+ parser.add_option("", "--timewait", dest="timewait",
+ help="Minutes to wait between iterations of 10 nodes.")
+
+ parser = parsermodule.getParser(['defaults'], parser)
+ config = parsermodule.parse_args(parser)
+
+# # COLLECT nodegroups, nodes and node lists
+# if config.nodegroup:
+# ng = api.GetNodeGroups({'name' : config.nodegroup})
+# nodelist = api.GetNodes(ng[0]['node_ids'])
+# hostnames = [ n['hostname'] for n in nodelist ]
+
+ fbquery = HistoryNodeRecord.query.all()
+ hostnames = [ n.hostname for n in fbquery ]
- def print_stats(self, key, stats):
- print "%20s : %d" % (key, stats[key])
-
- def run(self):
- self.accumSickSites()
- print "merge"
- self.mergePreviousActions()
- print "Accumulated %d sick sites" % len(self.sickdb.keys())
- logger.debug("Accumulated %d sick sites" % len(self.sickdb.keys()))
-
- #l1_before = len(self.act_1week.keys())
- #l2_before = len(self.act_2weeks.keys())
- #lwf_before = len(self.act_waitforever.keys())
-
- print "analyse"
- stats = self.analyseSites()
- print "DONE"
-
- self.print_stats("sites", stats)
- self.print_stats("sites_diagnosed", stats)
- self.print_stats("nodes_diagnosed", stats)
- self.print_stats("sites_emailed", stats)
- self.print_stats("nodes_actedon", stats)
- print string.join(stats['allsites'], ",")
-
- #l1 = len(self.act_1week.keys())
- #l2 = len(self.act_2weeks.keys())
- #lwf = len(self.act_waitforever.keys())
- #print "act_1week: %d diff: %d" % (l1, abs(l1-l1_before))
- #print "act_2weeks: %d diff: %d" % (l2, abs(l2-l2_before))
- #print "act_waitforever: %d diff: %d" % (lwf, abs(lwf-lwf_before))
-
- #self.__actOnDown()
-
- if config.policysavedb:
- print "Saving Databases... act_all"
- #soltesz.dbDump("policy.eventlog", self.eventlog)
- soltesz.dbDump("act_all", self.act_all)
-
-
-
-def main():
- logger.setLevel(logging.DEBUG)
- ch = logging.StreamHandler()
- ch.setLevel(logging.DEBUG)
- formatter = logging.Formatter('%(message)s')
- ch.setFormatter(formatter)
- logger.addHandler(ch)
-
- #print NodesDebug()
- #tmp = Queue.Queue()
- #a = Policy(None, tmp)
- #a.emailedStore("LOAD")
- #print a.emailed
-
- #print plc.slices([plc.siteId(["alice.cs.princeton.edu"])])
- os._exit(0)
-if __name__ == '__main__':
- import os
- import plc
+ fbquery = HistorySiteRecord.query.all()
+ sitenames = [ s.loginbase for s in fbquery ]
+
+ if config.site:
+ # TODO: replace with calls to local db. the api fails so often that
+ # these calls should be regarded as unreliable.
+ site = api.GetSites(config.site)
+ l_nodes = api.GetNodes(site[0]['node_ids'], ['hostname'])
+ filter_hostnames = [ n['hostname'] for n in l_nodes ]
+
+ hostnames = filter(lambda x: x in filter_hostnames, hostnames)
+ sitenames = [config.site]
+
+ if config.node:
+ hostnames = [ config.node ]
+ sitenames = [ plccache.plcdb_hn2lb[config.node] ]
+
try:
- main()
+ main(hostnames, sitenames)
except KeyboardInterrupt:
- print "Killed. Exitting."
- logger.info('Monitor Killed')
- os._exit(0)
+ print "Killed by interrupt"
+ sys.exit(0)
+ except:
+ #email_exception()
+ print traceback.print_exc();
+ print "Continuing..."