import logging
import mailer
import emailTxt
+import pickle
+import Queue
+import plc
+import reboot
+import config
+
+DAT="./monitor.dat"
logger = logging.getLogger("monitor")
# Time to enforce policy
POLSLEEP = 7200
+# Where to email the summary
+SUMTO = "pupadm@lists.planet-lab.org"
+TECHEMAIL="tech-%s@sites.planet-lab.org"
+PIEMAIL="pi-%s@sites.planet-lab.org"
+SLICEMAIL="%s@slices.planet-lab.org"
+PLCEMAIL="support@planet-lab.org"
+
+#Thresholds (DAYS)
+SPERDAY = 86400
+PITHRESH = 1 * SPERDAY
+SLICETHRESH = 5 * SPERDAY
+# Days before attempting rins again
+RINSTHRESH = 5 * SPERDAY
+
+# Minimum number of nodes up before squeezing
+MINUP = 2
+
# IF:
# no SSH, down.
# bad disk, down
# suspend slice creation
# kill slices
class Policy(Thread):
- def __init__(self, comonthread, tickets):
+ def __init__(self, comonthread, sickNoTicket, emailed):
self.cmn = comonthread
- self.tickets = tickets
- # host - > time of email
- self.emailed = {}
- # all sick nodes w/ tickets
- self.cursickw = tickets
+ # host - > (time of email, type of email)
+ self.emailed = emailed
# all sick nodes w/o tickets
- self.cursick = []
+ self.sickNoTicket = sickNoTicket
+ # Sitess we've Squeezed.
+ self.squeezed = {}
Thread.__init__(self)
+
+
'''
- Gets all nodes without tickets and puts them in self.cursick
+ What to do when node is in dbg (as reported by CoMon).
'''
- def getAllSick(self):
- self.cursick = []
- for bucket in self.cmn.comonbkts.keys():
- for host in getattr(self.cmn, bucket):
- if host not in self.cursickw.keys():
- if host not in self.cursick:
- self.cursick.append(host)
- logger.debug("Nodes sick wo tickets %s " % len(self.cursick))
+ def __actOnDebug(self, node):
+ # Check to see if we've done this before
+ if (node in self.emailed.keys()):
+ if (self.emailed[node][0] == "dbg"):
+ delta = time.time() - self.emailed[node][1]
+ if (delta <= RINSTHRESH ):
+ # Don't mess with node if under Thresh.
+ # Return, move on.
+ logger.info("POLICY: %s in dbg, but acted on %s days ago" % (node, delta // SPERDAY))
+ return
+ logger.info("POLICY: Node in dbg - " + node)
+ plc.nodeBootState(node, "rins")
+ # If it has a PCU
+ return reboot.reboot(node)
+
+ '''
+ What to do when node is in dbg (as reported by CoMon).
+ '''
+ def __actOnFilerw(self, node):
+ target = [PLCEMAIL]
+ logger.info("POLICY: Emailing PLC for " + node)
+ tmp = emailTxt.mailtxt.filerw
+ sbj = tmp[0] % {'hostname': node}
+ msg = tmp[1] % {'hostname': node}
+ mailer.email(sbj, msg, target)
+ self.emailed[node] = ("filerw", time.time())
+
'''
Acts on sick nodes.
'''
- def emailSick(self):
- for node in self.cmn.ssh:
- if node in self.cursick:
- if node not in self.emailed.keys():
- logger.debug("Emailing " + node)
- try:
- self.emailed[node] = "ssh"
- mailer.email('DISREGARD',
- emailTxt.mailtxt.STANDARD % {'hostname': node},
- "tech-" + mailer.siteId(node) + "@sites.planet-lab.org")
- except Exception, err:
- logger.info(err)
+ def actOnSick(self):
+ # Get list of nodes in debug from PLC
+ #dbgNodes = NodesDebug()
+ global TECHEMAIL, PIEMAIL
+ # Grab a node from the queue (pushed by rt thread).
+ node = self.sickNoTicket.get(block = True)
+ # Get the login base
+ loginbase = plc.siteId(node)
+
+ # Send appropriate message for node if in appropriate bucket.
+ # If we know where to send a message
+ if not loginbase:
+ logger.info("POLICY: loginbase for %s not found" %node)
+ # And we didn't email already.
+ else:
+ # If first email, send to Tech
+ target = [TECHEMAIL % loginbase]
+
+ # If disk is foobarred, PLC should check it.
+ if (node in self.cmn.filerw) and \
+ (node not in self.emailed.keys()):
+ self.__actOnFilerw(node)
+ return
+
+ # If in dbg, set to rins, then reboot. Inform PLC.
+ if (node in self.cmn.dbg):
+ # If reboot failure via PCU, POD and send email
+ # if contacted PCU, return
+ if self.__actOnDebug(node): return
+
+ if (node in self.emailed.keys()) and \
+ (node not in self.cmn.filerw) and \
+ (node not in self.cmn.clock_drift):
+ # If we emailed before, how long ago?
+ delta = time.time() - self.emailed[node][1]
+ if delta < SPERDAY:
+ logger.info("POLICY: already acted on %s today." % node)
+ return
+
+ logger.info("POLICY: acted %s on %s days ago" % (node,
+ delta // SPERDAY))
+
+ # If more than PI thresh, but less than slicethresh
+ if (delta >= PITHRESH) and (delta < SLICETHRESH):
+ target.append(PIEMAIL % loginbase)
+ #remove slice creation if enough nodes arent up
+ if not self.enoughUp(loginbase):
+ logger.info("POLICY: Removing slice creation from %s" % loginbase)
+ plc.removeSliceCreation(node)
+ self.squeezed[loginbase] = (time.time(), "creation")
+ # If more than PI thresh and slicethresh
+ if (delta >= PITHRESH) and (delta > SLICETHRESH):
+ # Email slices at site.
+ slices = plc.slices(loginbase)
+ if len(slices) >= 1:
+ for slice in slices:
+ target.append(SLICEMAIL % slice)
+ if not self.enoughUp(loginbase):
+ plc.suspendSlices(node)
+ self.squeezed[loginbase] = (time.time(),
+ "freeze")
+
+ # Find the bucket the node is in and send appropriate email
+ # to approriate list of people.
+ for bkt in self.cmn.comonbkts.keys():
+ if (node in getattr(self.cmn, bkt)):
+ # Send predefined message for that bucket.
+ logger.info("POLICY: Emailing (%s) %s - %s"\
+ %(bkt, node, target))
+ tmp = getattr(emailTxt.mailtxt, bkt)
+ sbj = tmp[0] % {'hostname': node}
+ msg = tmp[1] % {'hostname': node}
+ mailer.email(sbj, msg, target)
+ self.emailed[node] = (bkt , time.time())
+ return
+
'''
Prints, logs, and emails status of up nodes, down nodes, and buckets.
'''
def status(self):
- return 0
+ sub = "Monitor Summary"
+ msg = "\nThe following nodes were acted upon: \n\n"
+ for (node, (type, date)) in self.emailed.items():
+ # Print only things acted on today.
+ if (time.gmtime(time.time())[2] == time.gmtime(date)[2]):
+ msg +="%s\t(%s)\t%s\n" %(node, type, time.ctime(date))
+ msg +="\n\nThe following sites have been 'squeezed':\n\n"
+ for (loginbase, (date, type)) in self.squeezed.items():
+ # Print only things acted on today.
+ if (time.gmtime(time.time())[2] == time.gmtime(date)[2]):
+ msg +="%s\t(%s)\t%s\n" %(loginbase, type, time.ctime(date))
+ mailer.email(sub, msg, [SUMTO])
+ logger.info(msg)
+ return
+
+ '''
+ Store/Load state of emails. When, where, what.
+ '''
+ def emailedStore(self, action):
+ try:
+ if action == "LOAD":
+ f = open(DAT, "r+")
+ logger.info("POLICY: Found and reading " + DAT)
+ self.emailed.update(pickle.load(f))
+ if action == "WRITE":
+ f = open(DAT, "w")
+ #logger.debug("Writing " + DAT)
+ pickle.dump(self.emailed, f)
+ f.close()
+ except Exception, err:
+ logger.info("POLICY: Problem with DAT, %s" %err)
+
+ '''
+ Returns True if more than MINUP nodes are up at a site.
+ '''
+ def enoughUp(self, loginbase):
+ allsitenodes = plc.getSiteNodes(loginbase)
+ if len(allsitenodes) == 0:
+ logger.info("Node not in db")
+ return
+
+ numnodes = len(allsitenodes)
+ sicknodes = []
+ # Get all sick nodes from comon
+ for bucket in self.cmn.comonbkts.keys():
+ for host in getattr(self.cmn, bucket):
+ sicknodes.append(host)
+ # Diff.
+ for node in allsitenodes:
+ if node in sicknodes:
+ numnodes -= 1
+
+ if numnodes < MINUP:
+ logger.info(\
+"POLICY: site with %s has nodes %s up." %(loginbase, numnodes))
+ return False
+ else:
+ return True
+
+
+
def run(self):
- #while 1:
- self.getAllSick()
- self.emailSick()
+ while 1:
+ self.actOnSick()
+ self.emailedStore("WRITE")
+
+
+def main():
+ logger.setLevel(logging.DEBUG)
+ ch = logging.StreamHandler()
+ ch.setLevel(logging.DEBUG)
+ formatter = logging.Formatter('%(message)s')
+ ch.setFormatter(formatter)
+ logger.addHandler(ch)
+
+ #print NodesDebug()
+ #tmp = Queue.Queue()
+ #a = Policy(None, tmp)
+ #a.emailedStore("LOAD")
+ #print a.emailed
+
+ print plc.slices(plc.siteId("alice.cs.princeton.edu"))
+ os._exit(0)
+if __name__ == '__main__':
+ import os
+ import plc
+ try:
+ main()
+ except KeyboardInterrupt:
+ print "Killed. Exitting."
+ logger.info('Monitor Killed')
+ os._exit(0)