2 # Copyright (c) 2004 The Trustees of Princeton University (Trustees).
4 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
6 # $Id: policy.py,v 1.14 2007/06/29 12:42:22 soltesz Exp $
10 #from monitor import *
11 from threading import *
23 from config import config
28 logger = logging.getLogger("monitor")
30 # Time to enforce policy
33 # Where to email the summary
34 SUMTO = "soltesz@cs.princeton.edu"
35 TECHEMAIL="tech-%s@sites.planet-lab.org"
36 PIEMAIL="pi-%s@sites.planet-lab.org"
37 SLICEMAIL="%s@slices.planet-lab.org"
38 PLCEMAIL="support@planet-lab.org"
42 PITHRESH = 7 * SPERDAY
43 SLICETHRESH = 7 * SPERDAY
44 # Days before attempting rins again
45 RINSTHRESH = 5 * SPERDAY
47 # Days before calling the node dead.
48 DEADTHRESH = 30 * SPERDAY
49 # Minimum number of nodes up before squeezing
59 # DNS, kinda down (sick)
60 # clock, kinda down (sick)
61 # Full disk, going to be down
65 # suspend slice creation
71 def __init__(self, comonthread, sickNoTicket, emailed):
72 self.comon = comonthread
74 # the hostname to loginbase mapping
75 self.plcdb_hn2lb = soltesz.dbLoad("plcdb_hn2lb")
77 # Actions taken on nodes.
78 self.cache_all = soltesz.if_cached_else(1, "act_all", lambda : {})
79 self.act_all= soltesz.if_cached_else(1, "act_all", lambda : {})
81 # A dict of actions to specific functions. PICKLE doesnt' like lambdas.
83 self.actions['suspendslices'] = lambda hn: plc.suspendSlices(hn)
84 self.actions['nocreate'] = lambda hn: plc.removeSliceCreation(hn);
85 self.actions['rins'] = lambda hn: plc.nodeBootState(hn, "rins")
86 self.actions['noop'] = lambda hn: hn
88 self.bootcds = soltesz.dbLoad("bootcds")
89 self.emailed = emailed # host - > (time of email, type of email)
91 # all sick nodes w/o tickets
93 self.sickNoTicket = sickNoTicket
96 # sick nodes with no tickets
97 # sickdb{loginbase: [{hostname1: [buckets]}, {...}]}
101 def mergePreviousActions(self):
103 look at the sick node_records as reported by comon, and then look at the
104 node_records in act_all. There are four cases:
105 1) problem in comon but not in act_all
106 this ok, b/c it just means it's a new problem
107 2) problem in comon and in act_all
108 we need to figure out the mis-match. Did the problem get better
109 or worse? Reset the stage clock to 'initial', if it's better,
110 continue if it's gotten worse. Hard to make this judgement here, though.
111 3) no problem in comon, problem in act_all
112 this may mean that the node is operational again, or that monitor
113 knows how to define a problem that comon does not. For now, if
114 comon does not report a problem, monitor obeys. Ultimately,
115 however, we want to catch problems that comon can't see.
116 4) no problem in comon, no problem in act_all
117 there won't be a record in either db, so there's no code.
119 TODO: this is where back-offs will be acknowledged. If the nodes get
120 better, it should be possible to 're-enable' the site, or slice, etc.
122 sorted_sites = self.sickdb.keys()
124 # look at all problems reported by comon
125 for loginbase in sorted_sites:
126 rec_nodedict = self.sickdb[loginbase]
127 sorted_nodes = rec_nodedict.keys()
129 #for rec_node in rec_nodelist:
130 for nodename in sorted_nodes:
131 rec_node = rec_nodedict[nodename]
133 x = self.sickdb[loginbase][hn]
134 if hn in self.act_all:
135 y = self.act_all[hn][0]
136 if x['bucket'][0] != y['bucket'][0]:
137 # 2a) mismatch, need a policy for how to resolve
138 print "COMON and MONITOR have a mismatch: %s vs %s" % \
139 (x['bucket'], y['bucket'])
141 # 2b) ok, b/c they agree that there's still a problem..
144 # for now, overwrite the comon entry for the one in act_all
145 self.sickdb[loginbase][hn] = y
146 # delete the entry from cache_all to keep it out of case 3)
147 del self.cache_all[hn]
149 # 1) ok, b/c it's a new problem.
152 # 3) nodes that remin in cache_all were not identified by comon as
153 # down. Do we keep them or not?
154 for hn in self.cache_all.keys():
155 y = self.act_all[hn][0]
156 if 'monitor' in y['bucket']:
157 loginbase = self.plcdb_hn2lb[hn]
158 if loginbase not in self.sickdb:
159 self.sickdb[loginbase] = {}
160 self.sickdb[loginbase][hn] = y
162 del self.cache_all[hn]
164 print "len of cache_all: %d" % len(self.cache_all.keys())
168 def accumSickSites(self):
170 Take all sick nodes, find their sites, and put in
171 sickdb[loginbase] = [diag_node1, diag_node2, ...]
174 diag_node = self.sickNoTicket.get(block = True)
175 if diag_node == "None":
178 #for bucket in self.comon.comon_buckets.keys():
179 # if (hostname in getattr(self.comon, bucket)):
180 # buckets_per_node.append(bucket)
182 #########################################################
183 # TODO: this will break with more than one comon bucket!!
184 nodename = diag_node['nodename']
185 loginbase = self.plcdb_hn2lb[nodename] # plc.siteId(node)
187 if loginbase not in self.sickdb:
188 self.sickdb[loginbase] = {}
189 #self.sickdb[loginbase][nodename] = []
191 #if nodename not in self.sickdb[loginbase]:
192 # self.sickdb[loginbase][nodename] = []
194 #self.sickdb[loginbase][nodename].append(diag_node)
195 self.sickdb[loginbase][nodename] = diag_node
196 # TODO: this will break with more than one comon bucket!!
197 #########################################################
200 def __actOnDebug(self, node):
202 If in debug, set the node to rins, reboot via PCU/POD
204 daysdown = self.comon.codata[node]['sshstatus'] // (60*60*24)
205 logger.info("POLICY: Node %s in dbg. down for %s" %(node,daysdown))
206 plc.nodeBootState(node, "rins")
207 # TODO: only reboot if BootCD > 3.0
208 # if bootcd[node] > 3.0:
209 # if NODE_KEY in planet.cnf:
210 # plc.nodeBootState(node, "rins")
211 # reboot.reboot(node)
213 # email to update planet.cnf file
218 # email upgrade bootcd message, and treat as down.
220 self.actionlogdb[node] = ['rins', daysdown, time.time()]
222 def __emailSite(self, loginbase, roles, message, args):
224 loginbase is the unique site abbreviation, prepended to slice names.
225 roles contains TECH, PI, USER roles, and derive email aliases.
226 record contains {'message': [<subj>,<body>], 'args': {...}}
228 args.update({'loginbase':loginbase})
232 contacts += [TECHEMAIL % loginbase]
234 contacts += [PIEMAIL % loginbase]
236 slices = plc.slices(loginbase)
239 contacts += [SLICEMAIL % slice]
241 print "Received no slices for site: %s" % loginbase
244 subject = message[0] % args
245 body = message[1] % args
246 mailer.emailViaRT(subject, body, contacts)
247 except Exception, err:
248 print "exception on message:"
253 def format_diaginfo(self, diag_node):
254 info = diag_node['info']
255 hlist = " %s %s %s\n" % (info[0], info[2], info[1]) # (node, version, daysdown)
258 def __actOnSite(self, loginbase, rec_diaglist):
261 b_squeeze = config.squeeze
264 for diag_node in rec_diaglist:
265 #print "calling actOnNode(%s)" % diag_node['nodename']
266 action_args = self.__actOnNode(diag_node)
267 action_argslist += [action_args]
269 #print "getSiteNodes(%s)" % loginbase
270 nodelist = plc.getSiteNodes(loginbase)
271 if len(nodelist) - len(action_argslist) < 2:
272 print "SITE: %20s : < 2 nodes !!" % loginbase
273 # TODO: check how long this has occurred.
274 # then plc.removeSliceCreation(nodename)
275 # There may be a similar act_1,act_2,wait db for sites?
277 #print "SITE: goodNodesUp(%s) > 2 && %d bad" % \
278 # (loginbase, len(action_argslist))
281 # create 'args' for email
282 #print "Create email args..."
284 email_args['hostname_list'] = ""
285 for action_args in action_argslist:
286 email_args['hostname_list'] += action_args['msg_format']
287 email_args['hostname'] = action_args['nodename']
289 # Send email, perform node action
290 # TODO: only send one email per site for a given problem...
291 if len(action_argslist) > 0:
292 action_args = action_argslist[0]
293 #for action_args in action_argslist:
294 # TODO: perform the most severe action?
296 act_key = action_args['action']
297 self.actions[act_key](email_args['hostname'])
299 #print "Send email..."
300 if action_args['message'] != None:
301 self.__emailSite(loginbase, action_args['email'],
302 action_args['message'], email_args)
303 if config.mail: i_nodes_emailed += 1
305 return (i_nodes_actedon, i_nodes_emailed)
307 def __actOnNode(self, diag_node):
308 nodename = diag_node['nodename']
309 message = diag_node['message']
310 info = diag_node['info']
313 # TODO: a node should only be in one category, right?
314 # - This is a constraint that should be enforced. It may be possible
315 # for a node to fall into the wrong set.
316 # - Also, it is necessary to remove a node from an action set, if it
317 # comes back up, or enters another state between checks.
318 # TODO: check that the reason a node ends up in a 'bad' state has or
319 # hasn't changed. If it's changed, then probably the process should
320 # start over, or at leat be acknowledged. I'm not sure that this is
321 # the right place for this operation.
323 args['nodename'] = nodename
324 args['msg_format'] = self.format_diaginfo(diag_node)
325 current_time = time.time()
327 #k1 = self.act_1week.keys()
328 #k2 = self.act_2weeks.keys()
329 #k3 = self.act_waitforever.keys()
330 #print "lengths: %d %d %d" % (len(k1), len(k2), len(k3))
332 delta = current_time - diag_node['time']
334 if 'waitforever' in diag_node['stage']:
335 # TODO: define what to do in the 'forever' state
336 # TODO: there should probably be a periodic email sent after this,
337 # to the site, or to us...
338 args['action'] = 'noop'
339 args['message'] = None
341 elif 'actintwoweeks' in diag_node['stage'] or delta >= 14 * SPERDAY:
342 #nodename in self.act_2weeks:
343 args['email'] = TECH | PI | USER
344 args['action'] = 'suspendslices'
345 args['message'] = message[2]
346 args['stage'] = 'stage_waitforever'
347 # TODO: This will lose original 'time'
348 diag_node.update(args)
350 elif 'actinoneweek' in diag_node['stage'] or delta >= 7 * SPERDAY:
351 # nodename in self.act_1week:
352 args['email'] = TECH | PI
354 args['action'] = 'nocreate'
355 # args['action'] = 'rins'
356 args['message'] = message[1]
357 args['stage'] = 'stage_actintwoweeks'
358 diag_node.update(args)
361 # the node is bad, but there's no previous record of it.
363 args['action'] = 'noop'
364 args['message'] = message[0]
365 args['stage'] = 'stage_actinoneweek'
366 diag_node.update(args)
368 print "%s" % diag_node['log'],
369 print "%15s" % args['action']
371 if nodename not in self.act_all: self.act_all[nodename] = []
372 self.act_all[nodename].insert(0,diag_node)
376 def lappend_once(list, element):
377 if element not in list:
379 def sappend_once(string, element, separator=','):
380 if element not in string:
381 return ("%s%c%s" % (string, separator, element),1)
385 def analyseSites(self):
387 i_sites_diagnosed = 0
388 i_nodes_diagnosed = 0
393 sorted_sites = self.sickdb.keys()
395 for loginbase in sorted_sites:
396 rec_nodedict = self.sickdb[loginbase]
397 #print "calling diagnoseSite(%s)" % loginbase
398 rec_diaglist = self.__diagnoseSite(loginbase, rec_nodedict)
399 l_allsites += [loginbase]
402 if len(rec_diaglist) > 0:
403 i_nodes_diagnosed += len(rec_diaglist)
404 i_sites_diagnosed += 1
406 #print "calling actOnSite(%s)" % loginbase
407 (na,ne) = self.__actOnSite(loginbase, rec_diaglist)
410 i_nodes_actedon += na
411 i_sites_emailed += ne
413 return {'sites': i_sites,
414 'sites_diagnosed': i_sites_diagnosed,
415 'nodes_diagnosed': i_nodes_diagnosed,
416 'sites_emailed': i_sites_emailed,
417 'nodes_actedon': i_nodes_actedon,
418 'allsites':l_allsites}
421 def __diagnoseSite(self, loginbase, rec_nodedict):
423 rec_sitelist is a sickdb entry:
426 sorted_nodes = rec_nodedict.keys()
428 for nodename in sorted_nodes:
429 rec_node = rec_nodedict[nodename]
430 diag_node = self.__diagnoseNode(loginbase, rec_node)
431 if diag_node != None:
432 diag_list += [ diag_node ]
435 def __getDaysDown(self, nodename):
437 if self.comon.codata[nodename]['sshstatus'] != "null":
438 daysdown = int(self.comon.codata[nodename]['sshstatus']) // (60*60*24)
441 def __getStrDaysDown(self, nodename):
442 daysdown = self.__getDaysDown(nodename)
444 return "(%d days down)"%daysdown
448 def __getCDVersion(self, nodename):
450 if nodename in self.bootcds:
451 cdversion = self.bootcds[nodename]
454 def __diagnoseNode(self, loginbase, rec_node):
455 # TODO: change the format of the hostname in this
456 # record to something more natural.
457 nodename = rec_node['nodename']
458 buckets = rec_node['bucket']
461 # xyz as determined by monitor
462 # down as determined by comon
463 if rec_node['stage'] == "stage_rt_working":
464 # err, this can be used as a counter of some kind..
465 # but otherwise, no diagnosis is necessary, return None, implies that
467 print "DIAG: %20s : %-40s ticket %s" % \
468 (loginbase, nodename, rec_node['ticket_id'])
470 elif "down" in buckets:
472 diag_record.update(rec_node)
473 diag_record['nodename'] = nodename
474 diag_record['message'] = emailTxt.mailtxt.newdown
475 diag_record['args'] = {'nodename': nodename}
476 s_daysdown = self.__getStrDaysDown(nodename)
477 diag_record['info'] = (nodename, s_daysdown, "")
478 diag_record['bucket'] = ["down"]
479 diag_record['log'] = "DOWN: %20s : %-40s == %20s" % \
480 (loginbase, nodename, diag_record['info']),
482 elif "dbg" in buckets:
483 # V2 boot cds as determined by monitor
484 s_daysdown = self.__getStrDaysDown(nodename)
485 s_cdversion = self.__getCDVersion(nodename)
487 diag_record.update(rec_node)
488 diag_record['nodename'] = nodename
489 diag_record['info'] = (nodename, s_daysdown, s_cdversion)
491 if nodename in self.bootcds and "v2" in self.bootcds[nodename]:
492 diag_record['log'] = "BTCD: %20s : %-40s == %20s" % \
493 (loginbase, nodename, self.bootcds[nodename]),
494 diag_record['message'] = emailTxt.mailtxt.newbootcd
495 diag_record['args'] = {'nodename': nodename}
496 # TODO: figure a better 'bucket' scheme, for merge()
497 #diag_record['bucket'] = ["monitor"]
499 print "DEBG: %20s : %-40s" % \
500 (loginbase, nodename)
504 "Comon reports the node in debug mode, %s" % \
505 "but monitor does not know what to do yet.")
506 # TODO: replace with a real action
507 diag_record['message'] = [msg, msg, msg]
508 diag_record['bucket'] = ["dbg"]
509 diag_record['args'] = {'nodename': nodename}
510 elif "ssh" in buckets:
512 elif "clock_drift" in buckets:
514 elif "dns" in buckets:
516 elif "filerw" in buckets:
519 print "Unknown buckets!!!! %s" % buckets
525 def __actOnFilerw(self, node):
527 Report to PLC when node needs disk checked.
530 logger.info("POLICY: Emailing PLC for " + node)
531 tmp = emailTxt.mailtxt.filerw
532 sbj = tmp[0] % {'hostname': node}
533 msg = tmp[1] % {'hostname': node}
534 mailer.email(sbj, msg, target)
535 self.actionlogdb[node] = ["filerw", None, time.time()]
538 def __actOnDNS(self, node):
543 def __policy(self, node, loginbase, bucket):
545 target = [TECHEMAIL % loginbase]
546 tmp = emailTxt.mailtxt.down
547 sbj = tmp[0] % {'hostname': node}
548 msg = tmp[1] % {'hostname': node, 'days': daysdown}
549 mailer.email(sbj, msg, target)
553 Prints, logs, and emails status of up nodes, down nodes, and buckets.
556 sub = "Monitor Summary"
557 msg = "\nThe following nodes were acted upon: \n\n"
558 for (node, (type, date)) in self.emailed.items():
559 # Print only things acted on today.
560 if (time.gmtime(time.time())[2] == time.gmtime(date)[2]):
561 msg +="%s\t(%s)\t%s\n" %(node, type, time.ctime(date))
562 msg +="\n\nThe following sites have been 'squeezed':\n\n"
563 for (loginbase, (date, type)) in self.squeezed.items():
564 # Print only things acted on today.
565 if (time.gmtime(time.time())[2] == time.gmtime(date)[2]):
566 msg +="%s\t(%s)\t%s\n" %(loginbase, type, time.ctime(date))
567 mailer.email(sub, msg, [SUMTO])
572 Store/Load state of emails. When, where, what.
574 def emailedStore(self, action):
578 logger.info("POLICY: Found and reading " + DAT)
579 self.emailed.update(pickle.load(f))
580 if action == "WRITE":
582 #logger.debug("Writing " + DAT)
583 pickle.dump(self.emailed, f)
585 except Exception, err:
586 logger.info("POLICY: Problem with DAT, %s" %err)
589 Returns True if more than MINUP nodes are up at a site.
591 def enoughUp(self, loginbase):
592 allsitenodes = plc.getSiteNodes([loginbase])
593 if len(allsitenodes) == 0:
594 logger.info("Node not in db")
597 numnodes = len(allsitenodes)
599 # Get all sick nodes from comon
600 for bucket in self.comon.comon_buckets.keys():
601 for host in getattr(self.comon, bucket):
602 sicknodes.append(host)
604 for node in allsitenodes:
605 if node in sicknodes:
610 "POLICY: site with %s has nodes %s up." %(loginbase, numnodes))
615 def print_stats(self, key, stats):
616 print "%20s : %d" % (key, stats[key])
619 self.accumSickSites()
621 self.mergePreviousActions()
622 print "Accumulated %d sick sites" % len(self.sickdb.keys())
623 logger.debug("Accumulated %d sick sites" % len(self.sickdb.keys()))
625 #l1_before = len(self.act_1week.keys())
626 #l2_before = len(self.act_2weeks.keys())
627 #lwf_before = len(self.act_waitforever.keys())
630 stats = self.analyseSites()
633 self.print_stats("sites", stats)
634 self.print_stats("sites_diagnosed", stats)
635 self.print_stats("nodes_diagnosed", stats)
636 self.print_stats("sites_emailed", stats)
637 self.print_stats("nodes_actedon", stats)
638 print string.join(stats['allsites'], ",")
640 #l1 = len(self.act_1week.keys())
641 #l2 = len(self.act_2weeks.keys())
642 #lwf = len(self.act_waitforever.keys())
643 #print "act_1week: %d diff: %d" % (l1, abs(l1-l1_before))
644 #print "act_2weeks: %d diff: %d" % (l2, abs(l2-l2_before))
645 #print "act_waitforever: %d diff: %d" % (lwf, abs(lwf-lwf_before))
649 if config.policysavedb:
650 print "Saving Databases... act_all"
651 #soltesz.dbDump("policy.eventlog", self.eventlog)
652 soltesz.dbDump("act_all", self.act_all)
657 logger.setLevel(logging.DEBUG)
658 ch = logging.StreamHandler()
659 ch.setLevel(logging.DEBUG)
660 formatter = logging.Formatter('%(message)s')
661 ch.setFormatter(formatter)
662 logger.addHandler(ch)
666 #a = Policy(None, tmp)
667 #a.emailedStore("LOAD")
670 #print plc.slices([plc.siteId(["alice.cs.princeton.edu"])])
672 if __name__ == '__main__':
677 except KeyboardInterrupt:
678 print "Killed. Exitting."
679 logger.info('Monitor Killed')