X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=policy.py;h=309e03b9c4960e74957de9e90c3f5d8be3d3dd1a;hb=7549e937bf2902c238b2af8ef819c7d223cb4a9c;hp=d6e51e6be79c6d88d5c3cedbc84477fd5a1efb55;hpb=56565f0af4cf94c47b3cbae369ecfcb8eb535ea6;p=monitor.git diff --git a/policy.py b/policy.py index d6e51e6..309e03b 100644 --- a/policy.py +++ b/policy.py @@ -3,7 +3,7 @@ # # Faiyaz Ahmed # -# $Id: policy.py,v 1.12 2007/04/06 17:38:14 faiyaza Exp $ +# $Id: policy.py,v 1.14 2007/06/29 12:42:22 soltesz Exp $ # # Policy Engine. @@ -16,8 +16,12 @@ import emailTxt import pickle import Queue import plc +import sys import reboot -import config +import soltesz +import string +from config import config +config = config() DAT="./monitor.dat" @@ -27,7 +31,7 @@ logger = logging.getLogger("monitor") POLSLEEP = 7200 # Where to email the summary -SUMTO = "faiyaza@cs.princeton.edu" +SUMTO = "soltesz@cs.princeton.edu" TECHEMAIL="tech-%s@sites.planet-lab.org" PIEMAIL="pi-%s@sites.planet-lab.org" SLICEMAIL="%s@slices.planet-lab.org" @@ -45,6 +49,10 @@ DEADTHRESH = 30 * SPERDAY # Minimum number of nodes up before squeezing MINUP = 2 +TECH=1 +PI=2 +USER=4 + # IF: # no SSH, down. # bad disk, down @@ -57,59 +65,461 @@ MINUP = 2 # suspend slice creation # kill slices +class PLC: pass class Policy(Thread): def __init__(self, comonthread, sickNoTicket, emailed): - self.cmn = comonthread - # host - > (time of email, type of email) - self.emailed = emailed + self.comon = comonthread + + # the hostname to loginbase mapping + self.plcdb_hn2lb = soltesz.dbLoad("plcdb_hn2lb") + + # Actions taken on nodes. + self.cache_all = soltesz.if_cached_else(1, "act_all", lambda : {}) + self.act_all= soltesz.if_cached_else(1, "act_all", lambda : {}) + + # A dict of actions to specific functions. PICKLE doesnt' like lambdas. + self.actions = {} + self.actions['suspendslices'] = lambda hn: plc.suspendSlices(hn) + self.actions['nocreate'] = lambda hn: plc.removeSliceCreation(hn); + self.actions['rins'] = lambda hn: plc.nodeBootState(hn, "rins") + self.actions['noop'] = lambda hn: hn + + self.bootcds = soltesz.dbLoad("bootcds") + self.emailed = emailed # host - > (time of email, type of email) + # all sick nodes w/o tickets # from thread self.sickNoTicket = sickNoTicket - # Actions taken on nodes. - # actionlogdb{node: [action, date]} - self.actionlogdb = {} - # Actions taken on sites. - # sitelogdb{site: [action, daysdown, date]} - self.sitelogdb = {} + + # sick nodes with no tickets # sickdb{loginbase: [{hostname1: [buckets]}, {...}]} self.sickdb = {} Thread.__init__(self) + def mergePreviousActions(self): + """ + look at the sick node_records as reported by comon, and then look at the + node_records in act_all. There are four cases: + 1) problem in comon but not in act_all + this ok, b/c it just means it's a new problem + 2) problem in comon and in act_all + we need to figure out the mis-match. Did the problem get better + or worse? Reset the stage clock to 'initial', if it's better, + continue if it's gotten worse. Hard to make this judgement here, though. + 3) no problem in comon, problem in act_all + this may mean that the node is operational again, or that monitor + knows how to define a problem that comon does not. For now, if + comon does not report a problem, monitor obeys. Ultimately, + however, we want to catch problems that comon can't see. + 4) no problem in comon, no problem in act_all + there won't be a record in either db, so there's no code. + + TODO: this is where back-offs will be acknowledged. If the nodes get + better, it should be possible to 're-enable' the site, or slice, etc. + """ + sorted_sites = self.sickdb.keys() + sorted_sites.sort() + # look at all problems reported by comon + for loginbase in sorted_sites: + rec_nodedict = self.sickdb[loginbase] + sorted_nodes = rec_nodedict.keys() + sorted_nodes.sort() + #for rec_node in rec_nodelist: + for nodename in sorted_nodes: + rec_node = rec_nodedict[nodename] + hn = nodename + x = self.sickdb[loginbase][hn] + if hn in self.act_all: + y = self.act_all[hn][0] + if x['bucket'][0] != y['bucket'][0]: + # 2a) mismatch, need a policy for how to resolve + print "COMON and MONITOR have a mismatch: %s vs %s" % \ + (x['bucket'], y['bucket']) + else: + # 2b) ok, b/c they agree that there's still a problem.. + pass + + # for now, overwrite the comon entry for the one in act_all + self.sickdb[loginbase][hn] = y + # delete the entry from cache_all to keep it out of case 3) + del self.cache_all[hn] + else: + # 1) ok, b/c it's a new problem. + pass + + # 3) nodes that remin in cache_all were not identified by comon as + # down. Do we keep them or not? + for hn in self.cache_all.keys(): + y = self.act_all[hn][0] + if 'monitor' in y['bucket']: + loginbase = self.plcdb_hn2lb[hn] + if loginbase not in self.sickdb: + self.sickdb[loginbase] = {} + self.sickdb[loginbase][hn] = y + else: + del self.cache_all[hn] + + print "len of cache_all: %d" % len(self.cache_all.keys()) + + return def accumSickSites(self): """ Take all sick nodes, find their sites, and put in - sickdb{loginbase: [{hostname1: [buckets]}, {...}]} + sickdb[loginbase] = [diag_node1, diag_node2, ...] """ - while self.sickNoTicket.empty() == False: - node = self.sickNoTicket.get(block = True) - bkts= [] - for bkt in self.cmn.comonbkts.keys(): - if (node in getattr(self.cmn, bkt)): - bkts.append("%s" % bkt) - self.sickdb[plc.siteId(node)] = {node: bkts} + while 1: + diag_node = self.sickNoTicket.get(block = True) + if diag_node == "None": + break + + #for bucket in self.comon.comon_buckets.keys(): + # if (hostname in getattr(self.comon, bucket)): + # buckets_per_node.append(bucket) + + ######################################################### + # TODO: this will break with more than one comon bucket!! + nodename = diag_node['nodename'] + loginbase = self.plcdb_hn2lb[nodename] # plc.siteId(node) + + if loginbase not in self.sickdb: + self.sickdb[loginbase] = {} + #self.sickdb[loginbase][nodename] = [] + #else: + #if nodename not in self.sickdb[loginbase]: + # self.sickdb[loginbase][nodename] = [] + + #self.sickdb[loginbase][nodename].append(diag_node) + self.sickdb[loginbase][nodename] = diag_node + # TODO: this will break with more than one comon bucket!! + ######################################################### def __actOnDebug(self, node): """ If in debug, set the node to rins, reboot via PCU/POD """ - daysdown = self.cmn.codata[node]['sshstatus'] // (60*60*24) + daysdown = self.comon.codata[node]['sshstatus'] // (60*60*24) logger.info("POLICY: Node %s in dbg. down for %s" %(node,daysdown)) plc.nodeBootState(node, "rins") + # TODO: only reboot if BootCD > 3.0 + # if bootcd[node] > 3.0: + # if NODE_KEY in planet.cnf: + # plc.nodeBootState(node, "rins") + # reboot.reboot(node) + # else: + # email to update planet.cnf file + # If it has a PCU reboot.reboot(node) + # else: + # email upgrade bootcd message, and treat as down. # Log it self.actionlogdb[node] = ['rins', daysdown, time.time()] + def __emailSite(self, loginbase, roles, message, args): + """ + loginbase is the unique site abbreviation, prepended to slice names. + roles contains TECH, PI, USER roles, and derive email aliases. + record contains {'message': [,], 'args': {...}} + """ + args.update({'loginbase':loginbase}) + # build targets + contacts = [] + if TECH & roles: + contacts += [TECHEMAIL % loginbase] + elif PI & roles: + contacts += [PIEMAIL % loginbase] + elif USER & roles: + slices = plc.slices(loginbase) + if len(slices) >= 1: + for slice in slices: + contacts += [SLICEMAIL % slice] + else: + print "Received no slices for site: %s" % loginbase + + try: + subject = message[0] % args + body = message[1] % args + mailer.emailViaRT(subject, body, contacts) + except Exception, err: + print "exception on message:" + print message + + return + + def format_diaginfo(self, diag_node): + info = diag_node['info'] + hlist = " %s %s %s\n" % (info[0], info[2], info[1]) # (node, version, daysdown) + return hlist + + def __actOnSite(self, loginbase, rec_diaglist): + i_nodes_actedon = 0 + i_nodes_emailed = 0 + b_squeeze = config.squeeze + + action_argslist = [] + for diag_node in rec_diaglist: + #print "calling actOnNode(%s)" % diag_node['nodename'] + action_args = self.__actOnNode(diag_node) + action_argslist += [action_args] + + #print "getSiteNodes(%s)" % loginbase + nodelist = plc.getSiteNodes(loginbase) + if len(nodelist) - len(action_argslist) < 2: + print "SITE: %20s : < 2 nodes !!" % loginbase + # TODO: check how long this has occurred. + # then plc.removeSliceCreation(nodename) + # There may be a similar act_1,act_2,wait db for sites? + else: + #print "SITE: goodNodesUp(%s) > 2 && %d bad" % \ + # (loginbase, len(action_argslist)) + b_squeeze = False + + # create 'args' for email + #print "Create email args..." + email_args = {} + email_args['hostname_list'] = "" + for action_args in action_argslist: + email_args['hostname_list'] += action_args['msg_format'] + email_args['hostname'] = action_args['nodename'] + + # Send email, perform node action + # TODO: only send one email per site for a given problem... + if len(action_argslist) > 0: + action_args = action_argslist[0] + #for action_args in action_argslist: + # TODO: perform the most severe action? + if b_squeeze: + act_key = action_args['action'] + self.actions[act_key](email_args['hostname']) + i_nodes_actedon += 1 + #print "Send email..." + if action_args['message'] != None: + self.__emailSite(loginbase, action_args['email'], + action_args['message'], email_args) + if config.mail: i_nodes_emailed += 1 + + return (i_nodes_actedon, i_nodes_emailed) + + def __actOnNode(self, diag_node): + nodename = diag_node['nodename'] + message = diag_node['message'] + info = diag_node['info'] + args = {} + + # TODO: a node should only be in one category, right? + # - This is a constraint that should be enforced. It may be possible + # for a node to fall into the wrong set. + # - Also, it is necessary to remove a node from an action set, if it + # comes back up, or enters another state between checks. + # TODO: check that the reason a node ends up in a 'bad' state has or + # hasn't changed. If it's changed, then probably the process should + # start over, or at leat be acknowledged. I'm not sure that this is + # the right place for this operation. + + args['nodename'] = nodename + args['msg_format'] = self.format_diaginfo(diag_node) + current_time = time.time() + + #k1 = self.act_1week.keys() + #k2 = self.act_2weeks.keys() + #k3 = self.act_waitforever.keys() + #print "lengths: %d %d %d" % (len(k1), len(k2), len(k3)) + + delta = current_time - diag_node['time'] + + if 'waitforever' in diag_node['stage']: + # TODO: define what to do in the 'forever' state + # TODO: there should probably be a periodic email sent after this, + # to the site, or to us... + args['action'] = 'noop' + args['message'] = None + + elif 'actintwoweeks' in diag_node['stage'] or delta >= 14 * SPERDAY: + #nodename in self.act_2weeks: + args['email'] = TECH | PI | USER + args['action'] = 'suspendslices' + args['message'] = message[2] + args['stage'] = 'stage_waitforever' + # TODO: This will lose original 'time' + diag_node.update(args) + + elif 'actinoneweek' in diag_node['stage'] or delta >= 7 * SPERDAY: + # nodename in self.act_1week: + args['email'] = TECH | PI + + args['action'] = 'nocreate' + # args['action'] = 'rins' + args['message'] = message[1] + args['stage'] = 'stage_actintwoweeks' + diag_node.update(args) + + else: + # the node is bad, but there's no previous record of it. + args['email'] = TECH + args['action'] = 'noop' + args['message'] = message[0] + args['stage'] = 'stage_actinoneweek' + diag_node.update(args) - def __actOnDown(self, node): + print "%s" % diag_node['log'], + print "%15s" % args['action'] + + if nodename not in self.act_all: self.act_all[nodename] = [] + self.act_all[nodename].insert(0,diag_node) + + return args + + def lappend_once(list, element): + if element not in list: + list.append(element) + def sappend_once(string, element, separator=','): + if element not in string: + return ("%s%c%s" % (string, separator, element),1) + else: + return (string,0) + + def analyseSites(self): + i_sites = 0 + i_sites_diagnosed = 0 + i_nodes_diagnosed = 0 + i_nodes_actedon = 0 + i_sites_emailed = 0 + l_allsites = [] + + sorted_sites = self.sickdb.keys() + sorted_sites.sort() + for loginbase in sorted_sites: + rec_nodedict = self.sickdb[loginbase] + #print "calling diagnoseSite(%s)" % loginbase + rec_diaglist = self.__diagnoseSite(loginbase, rec_nodedict) + l_allsites += [loginbase] + + + if len(rec_diaglist) > 0: + i_nodes_diagnosed += len(rec_diaglist) + i_sites_diagnosed += 1 + + #print "calling actOnSite(%s)" % loginbase + (na,ne) = self.__actOnSite(loginbase, rec_diaglist) + + i_sites += 1 + i_nodes_actedon += na + i_sites_emailed += ne + + return {'sites': i_sites, + 'sites_diagnosed': i_sites_diagnosed, + 'nodes_diagnosed': i_nodes_diagnosed, + 'sites_emailed': i_sites_emailed, + 'nodes_actedon': i_nodes_actedon, + 'allsites':l_allsites} + + + def __diagnoseSite(self, loginbase, rec_nodedict): """ - If down (not debug), do the same as actOnDebug for now + rec_sitelist is a sickdb entry: """ - self.__actOnDebug(node) + diag_list = [] + sorted_nodes = rec_nodedict.keys() + sorted_nodes.sort() + for nodename in sorted_nodes: + rec_node = rec_nodedict[nodename] + diag_node = self.__diagnoseNode(loginbase, rec_node) + if diag_node != None: + diag_list += [ diag_node ] + return diag_list + + def __getDaysDown(self, nodename): + daysdown = -1 + if self.comon.codata[nodename]['sshstatus'] != "null": + daysdown = int(self.comon.codata[nodename]['sshstatus']) // (60*60*24) + return daysdown + + def __getStrDaysDown(self, nodename): + daysdown = self.__getDaysDown(nodename) + if daysdown > 0: + return "(%d days down)"%daysdown + else: + return "" + + def __getCDVersion(self, nodename): + cdversion = "" + if nodename in self.bootcds: + cdversion = self.bootcds[nodename] + return cdversion + + def __diagnoseNode(self, loginbase, rec_node): + # TODO: change the format of the hostname in this + # record to something more natural. + nodename = rec_node['nodename'] + buckets = rec_node['bucket'] + diag_record = None + + # xyz as determined by monitor + # down as determined by comon + if rec_node['stage'] == "stage_rt_working": + # err, this can be used as a counter of some kind.. + # but otherwise, no diagnosis is necessary, return None, implies that + # it gets skipped. + print "DIAG: %20s : %-40s ticket %s" % \ + (loginbase, nodename, rec_node['ticket_id']) + + elif "down" in buckets: + diag_record = {} + diag_record.update(rec_node) + diag_record['nodename'] = nodename + diag_record['message'] = emailTxt.mailtxt.newdown + diag_record['args'] = {'nodename': nodename} + s_daysdown = self.__getStrDaysDown(nodename) + diag_record['info'] = (nodename, s_daysdown, "") + diag_record['bucket'] = ["down"] + diag_record['log'] = "DOWN: %20s : %-40s == %20s" % \ + (loginbase, nodename, diag_record['info']), + + elif "dbg" in buckets: + # V2 boot cds as determined by monitor + s_daysdown = self.__getStrDaysDown(nodename) + s_cdversion = self.__getCDVersion(nodename) + diag_record = {} + diag_record.update(rec_node) + diag_record['nodename'] = nodename + diag_record['info'] = (nodename, s_daysdown, s_cdversion) + + if nodename in self.bootcds and "v2" in self.bootcds[nodename]: + diag_record['log'] = "BTCD: %20s : %-40s == %20s" % \ + (loginbase, nodename, self.bootcds[nodename]), + diag_record['message'] = emailTxt.mailtxt.newbootcd + diag_record['args'] = {'nodename': nodename} + # TODO: figure a better 'bucket' scheme, for merge() + #diag_record['bucket'] = ["monitor"] + else: + print "DEBG: %20s : %-40s" % \ + (loginbase, nodename) + return None + + msg = ("dbg mode", + "Comon reports the node in debug mode, %s" % \ + "but monitor does not know what to do yet.") + # TODO: replace with a real action + diag_record['message'] = [msg, msg, msg] + diag_record['bucket'] = ["dbg"] + diag_record['args'] = {'nodename': nodename} + elif "ssh" in buckets: + pass + elif "clock_drift" in buckets: + pass + elif "dns" in buckets: + pass + elif "filerw" in buckets: + pass + else: + print "Unknown buckets!!!! %s" % buckets + sys.exit(1) + + return diag_record def __actOnFilerw(self, node): @@ -130,7 +540,7 @@ class Policy(Thread): """ - def __policy(self, node, loginbase, bkt): + def __policy(self, node, loginbase, bucket): # ...and spam 'em target = [TECHEMAIL % loginbase] tmp = emailTxt.mailtxt.down @@ -139,108 +549,6 @@ class Policy(Thread): mailer.email(sbj, msg, target) - - - def actOnSick(self): - """ - Acts on sick nodes. - """ - global TECHEMAIL, PIEMAIL - - # Princeton Backdoor - if loginbase == "princeton": return - - # Send appropriate message for node if in appropriate bucket. - # If we know where to send a message - if not loginbase: - logger.info("POLICY: loginbase for %s not found" %node) - # And we didn't email already. - else: - # If first email, send to Tech - target = [TECHEMAIL % loginbase] - - # If disk is foobarred, PLC should check it. - if (node in self.cmn.filerw) and \ - (node not in self.emailed.keys()): - self.__actOnFilerw(node) - return - - # If in dbg, set to rins, then reboot. Inform PLC. - if (node in self.cmn.dbg): - self.__actOnDebug(node) - - if (node in self.emailed.keys()) and \ - (node not in self.cmn.filerw) and \ - (node not in self.cmn.clock_drift): - # If we emailed before, how long ago? - delta = time.time() - self.emailed[node][1] - if delta < SPERDAY: - logger.info("POLICY: already acted on %s today." % node) - return - - logger.info("POLICY: acted %s on %s days ago" % (node, - delta // SPERDAY)) - - # If no luck with tech, email PI - if (delta >= SPERDAY): - target.append(PIEMAIL % loginbase) - - if (delta >= 7 * SPERDAY): - #remove slice creation if enough nodes arent up - if not self.enoughUp(loginbase): - slices = plc.slices(loginbase) - if len(slices) >= 1: - for slice in slices: - target.append(SLICEMAIL % slice) - logger.info("POLICY: Removing slice creation from %s" % loginbase) - tmp = emailTxt.mailtxt.removedSliceCreation - sbj = tmp[0] - msg = tmp[1] % {'loginbase': loginbase} - plc.removeSliceCreation(node) - mailer.email(sbj, msg, target) - self.squeezed[loginbase] = (time.time(), "creation") - self.emailed[node] = ("creation", time.time()) - logger.info("POLICY: Emailing (%s) %s - %s"\ - %("creation", node, target)) - return - - if (delta >= 14 * SPERDAY): - target.append(PIEMAIL % loginbase) - # Email slices at site. - slices = plc.slices([loginbase]) - if len(slices) >= 1: - for slice in slices: - target.append(SLICEMAIL % slice) - # If not enough up, freeze slices and email everyone. - if not self.enoughUp(loginbase): - logger.info("POLICY: Suspending %s slices." % loginbase) - tmp = emailTxt.mailtxt.suspendSlices - sbj = tmp[0] - msg = tmp[1] % {'loginbase': loginbase} - plc.suspendSlices([node]) - self.squeezed[loginbase] = (time.time(), "freeze") - mailer.email(sbj, msg, target) - self.emailed[node] = ("freeze", time.time()) - logger.info("POLICY: Emailing (%s) %s - %s"\ - %("freeze", node, target)) - - return - - # Find the bucket the node is in and send appropriate email - # to approriate list of people. - for bkt in self.cmn.comonbkts.keys(): - if (node in getattr(self.cmn, bkt)): - # Send predefined message for that bucket. - logger.info("POLICY: Emailing (%s) %s - %s"\ - %(bkt, node, target)) - tmp = getattr(emailTxt.mailtxt, bkt) - sbj = tmp[0] % {'hostname': node} - msg = tmp[1] % {'hostname': node} - mailer.email(sbj, msg, target) - self.emailed[node] = (bkt , time.time()) - return - - """ Prints, logs, and emails status of up nodes, down nodes, and buckets. """ @@ -289,8 +597,8 @@ class Policy(Thread): numnodes = len(allsitenodes) sicknodes = [] # Get all sick nodes from comon - for bucket in self.cmn.comonbkts.keys(): - for host in getattr(self.cmn, bucket): + for bucket in self.comon.comon_buckets.keys(): + for host in getattr(self.comon, bucket): sicknodes.append(host) # Diff. for node in allsitenodes: @@ -303,16 +611,46 @@ class Policy(Thread): return False else: return True - - - + + def print_stats(self, key, stats): + print "%20s : %d" % (key, stats[key]) def run(self): self.accumSickSites() - #self.actOnSick() - #self.emailedStore("WRITE") - print self.sickdb - + print "merge" + self.mergePreviousActions() + print "Accumulated %d sick sites" % len(self.sickdb.keys()) + logger.debug("Accumulated %d sick sites" % len(self.sickdb.keys())) + + #l1_before = len(self.act_1week.keys()) + #l2_before = len(self.act_2weeks.keys()) + #lwf_before = len(self.act_waitforever.keys()) + + print "analyse" + stats = self.analyseSites() + print "DONE" + + self.print_stats("sites", stats) + self.print_stats("sites_diagnosed", stats) + self.print_stats("nodes_diagnosed", stats) + self.print_stats("sites_emailed", stats) + self.print_stats("nodes_actedon", stats) + print string.join(stats['allsites'], ",") + + #l1 = len(self.act_1week.keys()) + #l2 = len(self.act_2weeks.keys()) + #lwf = len(self.act_waitforever.keys()) + #print "act_1week: %d diff: %d" % (l1, abs(l1-l1_before)) + #print "act_2weeks: %d diff: %d" % (l2, abs(l2-l2_before)) + #print "act_waitforever: %d diff: %d" % (lwf, abs(lwf-lwf_before)) + + #self.__actOnDown() + + if config.policysavedb: + print "Saving Databases... act_all" + #soltesz.dbDump("policy.eventlog", self.eventlog) + soltesz.dbDump("act_all", self.act_all) + def main():