2 # Copyright (c) 2004 The Trustees of Princeton University (Trustees).
4 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
6 # $Id: policy.py,v 1.17 2007/08/29 17:26:50 soltesz Exp $
10 #from monitor import *
11 from threading import *
23 from www.printbadnodes import cmpCategoryVal
24 from config import config
30 logger = logging.getLogger("monitor")
32 # Time to enforce policy
35 # Where to email the summary
36 SUMTO = "soltesz@cs.princeton.edu"
37 TECHEMAIL="tech-%s@sites.planet-lab.org"
38 PIEMAIL="pi-%s@sites.planet-lab.org"
39 SLICEMAIL="%s@slices.planet-lab.org"
40 PLCEMAIL="support@planet-lab.org"
46 PITHRESH = 7 * SPERDAY
47 SLICETHRESH = 7 * SPERDAY
48 # Days before attempting rins again
49 RINSTHRESH = 5 * SPERDAY
51 # Days before calling the node dead.
52 DEADTHRESH = 30 * SPERDAY
53 # Minimum number of nodes up before squeezing
64 # DNS, kinda down (sick)
65 # clock, kinda down (sick)
66 # Full disk, going to be down
70 # suspend slice creation
72 def array_to_priority_map(array):
73 """ Create a mapping where each entry of array is given a priority equal
74 to its position in the array. This is useful for subsequent use in the
86 def print_stats(key, stats):
87 if key in stats: print "%20s : %d" % (key, stats[key])
90 def __init__(self, l_merge, toRT):
92 self.merge_list = l_merge
93 # the hostname to loginbase mapping
94 self.plcdb_hn2lb = soltesz.dbLoad("plcdb_hn2lb")
96 # Previous actions taken on nodes.
97 self.act_all = soltesz.if_cached_else(1, "act_all", lambda : {})
98 self.findbad = soltesz.if_cached_else(1, "findbad", lambda : {})
100 self.cache_all = soltesz.if_cached_else(1, "act_all", lambda : {})
103 Thread.__init__(self)
107 self.accumSickSites()
108 # read data from findbad and act_all
109 self.mergeActionsAndBadDB()
110 # pass node_records to RT
113 def accumSickSites(self):
115 Take all nodes, from l_diagnose, look them up in the act_all database,
116 and insert them into sickdb[] as:
118 sickdb[loginbase][nodename] = fb_record
120 # look at all problems reported by findbad
121 l_nodes = self.findbad['nodes'].keys()
123 for nodename in l_nodes:
124 if nodename not in self.merge_list:
125 continue # skip this node, since it's not wanted
128 loginbase = self.plcdb_hn2lb[nodename]
129 values = self.findbad['nodes'][nodename]['values']
132 fb_record['nodename'] = nodename
133 fb_record['category'] = values['category']
134 fb_record['state'] = values['state']
135 fb_record['comonstats'] = values['comonstats']
136 fb_record['plcnode'] = values['plcnode']
137 fb_record['kernel'] = self.getKernel(values['kernel'])
138 fb_record['stage'] = "findbad"
139 fb_record['message'] = None
140 fb_record['bootcd'] = values['bootcd']
141 fb_record['args'] = None
142 fb_record['info'] = None
143 fb_record['time'] = time.time()
144 fb_record['date_created'] = time.time()
146 if loginbase not in self.sickdb:
147 self.sickdb[loginbase] = {}
149 self.sickdb[loginbase][nodename] = fb_record
151 print "Found %d nodes" % count
153 def getKernel(self, unamestr):
160 def mergeActionsAndBadDB(self):
162 - Look at the sick node_records as reported in findbad,
163 - Then look at the node_records in act_all.
165 There are four cases:
166 1) Problem in findbad, no problem in act_all
167 this ok, b/c it just means it's a new problem
168 2) Problem in findbad, problem in act_all
169 -Did the problem get better or worse?
170 -If Same, or Worse, then continue looking for open tickets.
171 -If Better, or No problem, then "back-off" penalties.
172 This judgement may need to wait until 'Diagnose()'
174 3) No problem in findbad, problem in act_all
175 The the node is operational again according to Findbad()
177 4) No problem in findbad, no problem in act_all
178 There won't be a record in either db, so there's no code.
181 sorted_sites = self.sickdb.keys()
183 # look at all problems reported by findbad
184 for loginbase in sorted_sites:
185 d_fb_nodes = self.sickdb[loginbase]
186 sorted_nodes = d_fb_nodes.keys()
188 for nodename in sorted_nodes:
189 fb_record = self.sickdb[loginbase][nodename]
191 if loginbase not in self.mergedb:
192 self.mergedb[loginbase] = {}
194 # We must compare findbad state with act_all state
195 if nodename not in self.act_all:
196 # 1) ok, b/c it's a new problem. set ticket_id to null
197 self.mergedb[loginbase][nodename] = {}
198 self.mergedb[loginbase][nodename].update(x)
199 self.mergedb[loginbase][nodename]['ticket_id'] = ""
200 self.mergedb[loginbase][nodename]['prev_category'] = None
202 if len(self.act_all[nodename]) == 0:
203 print "len(act_all[%s]) == 0, skipping %s %s" % (nodename, loginbase, nodename)
206 y = self.act_all[nodename][0]
209 if 'stage' in y and "monitor-end-record" in y['stage']:
210 # 1) ok, b/c it's a new problem. set ticket_id to null
211 self.mergedb[loginbase][nodename] = {}
212 self.mergedb[loginbase][nodename].update(x)
213 self.mergedb[loginbase][nodename]['ticket_id'] = ""
214 self.mergedb[loginbase][nodename]['prev_category'] = None
217 ## for legacy actions
218 #if 'bucket' in y and y['bucket'][0] == 'dbg':
219 # # Only bootcd debugs made it to the act_all db.
220 # y['prev_category'] = "OLDBOOTCD"
221 #elif 'bucket' in y and y['bucket'][0] == 'down':
222 # y['prev_category'] = "ERROR"
223 #elif 'bucket' not in y:
224 # # for all other actions, just carry over the
225 # # previous category
226 # y['prev_category'] = y['category']
228 # print "UNKNOWN state for record: %s" % y
231 # determine through translation, if the buckets match
232 #if 'category' in y and x['category'] == y['category']:
234 #elif x['category'] == "OLDBOOTCD" and y['bucket'][0] == 'dbg':
236 #elif x['category'] == "ERROR" and y['bucket'][0] == 'down':
242 # # 2b) ok, b/c they agree that there's still a problem..
243 # # 2b) Comon & Monitor still agree; RT ticket?
244 # y['prev_category'] = y['category']
246 # # 2a) mismatch, need a policy for how to resolve
247 # # resolution will be handled in __diagnoseNode()
248 # # for now just record the two categories.
249 # #if x['category'] == "PROD" and x['state'] == "BOOT" and \
250 # # ( y['bucket'][0] == 'down' or y['bucket'][0] == 'dbg'):
251 # print "FINDBAD and MONITOR have a mismatch: %s vs %s" % \
252 # (x['category'], y['bucket'])
255 self.mergedb[loginbase][nodename] = {}
256 self.mergedb[loginbase][nodename].update(y)
257 self.mergedb[loginbase][nodename]['comonstats'] = x['comonstats']
258 self.mergedb[loginbase][nodename]['category'] = x['category']
259 self.mergedb[loginbase][nodename]['state'] = x['state']
260 self.mergedb[loginbase][nodename]['kernel']=x['kernel']
261 self.mergedb[loginbase][nodename]['bootcd']=x['bootcd']
262 self.mergedb[loginbase][nodename]['plcnode']=x['plcnode']
263 # delete the entry from cache_all to keep it out of case 3)
264 del self.cache_all[nodename]
266 # 3) nodes that remin in cache_all were not identified by findbad.
267 # Do we keep them or not?
268 # NOTE: i think that since the categories are performed before this
269 # step now, and by a monitor-controlled agent.
271 # TODO: This does not work correctly. Do we need this?
272 #for hn in self.cache_all.keys():
273 # y = self.act_all[hn][0]
274 # if 'monitor' in y['bucket']:
275 # loginbase = self.plcdb_hn2lb[hn]
276 # if loginbase not in self.sickdb:
277 # self.sickdb[loginbase] = {}
278 # self.sickdb[loginbase][hn] = y
280 # del self.cache_all[hn]
282 print "len of cache_all: %d" % len(self.cache_all.keys())
286 sorted_sites = self.mergedb.keys()
288 # look at all problems reported by merge
289 for loginbase in sorted_sites:
290 d_merge_nodes = self.mergedb[loginbase]
291 for nodename in d_merge_nodes.keys():
292 record = self.mergedb[loginbase][nodename]
293 self.toRT.put(record)
295 # send signal to stop reading
299 class Diagnose(Thread):
300 def __init__(self, fromRT):
302 self.plcdb_hn2lb = soltesz.dbLoad("plcdb_hn2lb")
303 self.findbad = soltesz.if_cached_else(1, "findbad", lambda : {})
305 self.diagnose_in = {}
306 self.diagnose_out = {}
307 Thread.__init__(self)
311 self.accumSickSites()
313 print "Accumulated %d sick sites" % len(self.diagnose_in.keys())
314 logger.debug("Accumulated %d sick sites" % len(self.diagnose_in.keys()))
317 stats = self.diagnoseAll()
318 except Exception, err:
319 print "----------------"
321 print traceback.print_exc()
323 #if config.policysavedb:
326 print_stats("sites_observed", stats)
327 print_stats("sites_diagnosed", stats)
328 print_stats("nodes_diagnosed", stats)
330 if config.policysavedb:
331 print "Saving Databases... diagnose_out"
332 soltesz.dbDump("diagnose_out", self.diagnose_out)
334 def accumSickSites(self):
336 Take all nodes, from l_diagnose, look them up in the diagnose_out database,
337 and insert them into diagnose_in[] as:
339 diagnose_in[loginbase] = [diag_node1, diag_node2, ...]
342 node_record = self.fromRT.get(block = True)
343 if node_record == None:
346 nodename = node_record['nodename']
347 loginbase = self.plcdb_hn2lb[nodename]
349 if loginbase not in self.diagnose_in:
350 self.diagnose_in[loginbase] = {}
352 self.diagnose_in[loginbase][nodename] = node_record
356 def diagnoseAll(self):
358 i_sites_diagnosed = 0
359 i_nodes_diagnosed = 0
364 sorted_sites = self.diagnose_in.keys()
366 self.diagnose_out= {}
367 for loginbase in sorted_sites:
368 l_allsites += [loginbase]
370 d_diag_nodes = self.diagnose_in[loginbase]
371 d_act_records = self.__diagnoseSite(loginbase, d_diag_nodes)
372 # store records in diagnose_out, for saving later.
373 self.diagnose_out.update(d_act_records)
375 if len(d_act_records[loginbase]['nodes'].keys()) > 0:
376 i_nodes_diagnosed += (len(d_act_records[loginbase]['nodes'].keys()))
377 i_sites_diagnosed += 1
378 i_sites_observed += 1
380 return {'sites_observed': i_sites_observed,
381 'sites_diagnosed': i_sites_diagnosed,
382 'nodes_diagnosed': i_nodes_diagnosed,
383 'allsites':l_allsites}
387 def __getDaysDown(self, diag_record, nodename):
389 if diag_record['comonstats']['sshstatus'] != "null":
390 daysdown = int(diag_record['comonstats']['sshstatus']) // (60*60*24)
391 elif diag_record['comonstats']['lastcotop'] != "null":
392 daysdown = int(diag_record['comonstats']['lastcotop']) // (60*60*24)
395 last_contact = diag_record['plcnode']['last_contact']
396 if last_contact == None:
397 # the node has never been up, so give it a break
400 diff = now - last_contact
401 daysdown = diff // (60*60*24)
404 def __getStrDaysDown(self, diag_record, nodename):
405 daysdown = self.__getDaysDown(diag_record, nodename)
407 return "(%d days down)"%daysdown
409 return "Unknown number of days"
411 def __getCDVersion(self, diag_record, nodename):
413 #print "Getting kernel for: %s" % diag_record['nodename']
414 cdversion = diag_record['kernel']
417 def __diagnoseSite(self, loginbase, d_diag_nodes):
419 d_diag_nodes are diagnose_in entries.
421 d_diag_site = {loginbase : { 'config' :
428 sorted_nodes = d_diag_nodes.keys()
430 for nodename in sorted_nodes:
431 node_record = d_diag_nodes[nodename]
432 diag_record = self.__diagnoseNode(loginbase, node_record)
434 if diag_record != None:
435 d_diag_site[loginbase]['nodes'][nodename] = diag_record
437 # NOTE: improvement means, we need to act/squeeze and email.
438 #print "DIAG_RECORD", diag_record
439 if 'monitor-end-record' in diag_record['stage'] or \
440 'nmreset' in diag_record['stage']:
441 # print "resetting loginbase!"
442 d_diag_site[loginbase]['config']['squeeze'] = True
443 d_diag_site[loginbase]['config']['email'] = True
445 # print "NO IMPROVEMENT!!!!"
447 pass # there is nothing to do for this node.
449 # NOTE: these settings can be overridden by command line arguments,
450 # or the state of a record, i.e. if already in RT's Support Queue.
451 nodes_up = self.getUpAtSite(loginbase, d_diag_site)
453 d_diag_site[loginbase]['config']['squeeze'] = True
455 max_slices = self.getMaxSlices(loginbase)
456 num_nodes = self.getNumNodes(loginbase)
457 # NOTE: when max_slices == 0, this is either a new site (the old way)
458 # or an old disabled site from previous monitor (before site['enabled'])
459 if nodes_up < num_nodes and max_slices != 0:
460 d_diag_site[loginbase]['config']['email'] = True
462 if len(d_diag_site[loginbase]['nodes'].keys()) > 0:
463 print "SITE: %20s : %d nodes up, at most" % (loginbase, nodes_up)
467 def diagRecordByCategory(self, node_record):
468 nodename = node_record['nodename']
469 category = node_record['category']
470 state = node_record['state']
471 loginbase = self.plcdb_hn2lb[nodename]
474 if "ERROR" in category: # i.e. "DOWN"
476 diag_record.update(node_record)
477 daysdown = self.__getDaysDown(diag_record, nodename)
479 format = "DIAG: %20s : %-40s Down only %s days NOTHING DONE"
480 print format % (loginbase, nodename, daysdown)
483 s_daysdown = self.__getStrDaysDown(diag_record, nodename)
484 diag_record['message'] = emailTxt.mailtxt.newdown
485 diag_record['args'] = {'nodename': nodename}
486 diag_record['info'] = (nodename, s_daysdown, "")
487 if diag_record['ticket_id'] == "":
488 diag_record['log'] = "DOWN: %20s : %-40s == %20s %s" % \
489 (loginbase, nodename, diag_record['info'][1:], diag_record['found_rt_ticket'])
491 diag_record['log'] = "DOWN: %20s : %-40s == %20s %s" % \
492 (loginbase, nodename, diag_record['info'][1:], diag_record['ticket_id'])
494 elif "OLDBOOTCD" in category:
495 # V2 boot cds as determined by findbad
496 s_daysdown = self.__getStrDaysDown(node_record, nodename)
497 s_cdversion = self.__getCDVersion(node_record, nodename)
499 diag_record.update(node_record)
500 #if "2.4" in diag_record['kernel'] or "v2" in diag_record['bootcd']:
501 diag_record['message'] = emailTxt.mailtxt.newbootcd
502 diag_record['args'] = {'nodename': nodename}
503 diag_record['info'] = (nodename, s_daysdown, s_cdversion)
504 if diag_record['ticket_id'] == "":
505 diag_record['log'] = "BTCD: %20s : %-40s == %20s %20s %s" % \
506 (loginbase, nodename, diag_record['kernel'],
507 diag_record['bootcd'], diag_record['found_rt_ticket'])
509 diag_record['log'] = "BTCD: %20s : %-40s == %20s %20s %s" % \
510 (loginbase, nodename, diag_record['kernel'],
511 diag_record['bootcd'], diag_record['ticket_id'])
513 elif "PROD" in category:
515 # Not sure what to do with these yet. Probably need to
517 print "DEBG: %20s : %-40s NOTHING DONE" % (loginbase, nodename)
519 elif "BOOT" in state:
521 # TODO: remove penalties, if any are applied.
523 last_contact = node_record['plcnode']['last_contact']
524 if last_contact == None:
527 time_diff = now - last_contact;
529 if 'improvement' in node_record['stage']:
530 # then we need to pass this on to 'action'
532 diag_record.update(node_record)
533 diag_record['message'] = emailTxt.mailtxt.newthankyou
534 diag_record['args'] = {'nodename': nodename}
535 diag_record['info'] = (nodename, node_record['prev_category'],
536 node_record['category'])
537 if diag_record['ticket_id'] == "":
538 diag_record['log'] = "IMPR: %20s : %-40s == %20s %20s %s %s" % \
539 (loginbase, nodename, diag_record['stage'],
540 state, category, diag_record['found_rt_ticket'])
542 diag_record['log'] = "IMPR: %20s : %-40s == %20s %20s %s %s" % \
543 (loginbase, nodename, diag_record['stage'],
544 state, category, diag_record['ticket_id'])
546 elif time_diff >= 6*SPERHOUR:
547 # heartbeat is older than 30 min.
549 #print "Possible NM problem!! %s - %s = %s" % (now, last_contact, time_diff)
551 diag_record.update(node_record)
552 diag_record['message'] = emailTxt.mailtxt.NMReset
553 diag_record['args'] = {'nodename': nodename}
554 diag_record['stage'] = "nmreset"
555 diag_record['info'] = (nodename,
556 node_record['prev_category'],
557 node_record['category'])
558 if diag_record['ticket_id'] == "":
559 diag_record['log'] = "NM : %20s : %-40s == %20s %20s %s %s" % \
560 (loginbase, nodename, diag_record['stage'],
561 state, category, diag_record['found_rt_ticket'])
563 diag_record['log'] = "NM : %20s : %-40s == %20s" % \
564 (loginbase, nodename, diag_record['stage'])
572 elif "ALPHA" in category:
574 elif "clock_drift" in category:
576 elif "dns" in category:
578 elif "filerw" in category:
581 print "Unknown category!!!! %s" % category
586 def __diagnoseNode(self, loginbase, node_record):
587 # TODO: change the format of the hostname in this
588 # record to something more natural.
589 nodename = node_record['nodename']
590 category = node_record['category']
591 prev_category = node_record['prev_category']
592 state = node_record['state']
594 val = cmpCategoryVal(category, prev_category)
596 # current category is worse than previous, carry on
599 # current category is better than previous
600 # TODO: too generous for now, but will be handled correctly
601 # TODO: if stage is currently ticket_waitforever,
602 if 'ticket_id' not in node_record:
603 print "ignoring: ", node_record['nodename']
606 if node_record['ticket_id'] == "" or \
607 node_record['ticket_id'] == None:
608 print "closing: ", node_record['nodename']
609 node_record['action'] = ['close_rt']
610 node_record['message'] = None
611 node_record['stage'] = 'monitor-end-record'
615 node_record['stage'] = 'improvement'
617 #values are equal, carry on.
620 #### COMPARE category and prev_category
622 # then assign a stage based on relative priorities
624 # then check category for stats.
625 diag_record = self.diagRecordByCategory(node_record)
626 if diag_record == None:
630 # TODO: need to record time found, and maybe add a stage for acting on it...
631 if 'found_rt_ticket' in diag_record and \
632 diag_record['found_rt_ticket'] is not None:
633 if diag_record['stage'] is not 'improvement':
634 diag_record['stage'] = 'ticket_waitforever'
636 current_time = time.time()
637 # take off four days, for the delay that database caused.
638 # TODO: generalize delays at PLC, and prevent enforcement when there
639 # have been no emails.
640 # NOTE: 7*SPERDAY exists to offset the 'bad week'
641 #delta = current_time - diag_record['time'] - 7*SPERDAY
642 delta = current_time - diag_record['time']
644 message = diag_record['message']
646 act_record.update(diag_record)
649 if 'findbad' in diag_record['stage']:
650 # The node is bad, and there's no previous record of it.
651 act_record['email'] = TECH
652 act_record['action'] = ['noop']
653 act_record['message'] = message[0]
654 act_record['stage'] = 'stage_actinoneweek'
656 elif 'nmreset' in diag_record['stage']:
657 act_record['email'] = ADMIN
658 act_record['action'] = ['reset_nodemanager']
659 act_record['message'] = message[0]
660 act_record['stage'] = 'nmreset'
663 elif 'improvement' in diag_record['stage']:
664 # - backoff previous squeeze actions (slice suspend, nocreate)
665 # TODO: add a backoff_squeeze section... Needs to runthrough
666 act_record['action'] = ['close_rt']
667 act_record['message'] = message[0]
668 act_record['stage'] = 'monitor-end-record'
670 elif 'actinoneweek' in diag_record['stage']:
671 if delta >= 7 * SPERDAY:
672 act_record['email'] = TECH | PI
673 act_record['stage'] = 'stage_actintwoweeks'
674 act_record['message'] = message[1]
675 act_record['action'] = ['nocreate' ]
676 act_record['time'] = current_time # reset clock for waitforever
677 elif delta >= 3* SPERDAY and not 'second-mail-at-oneweek' in act_record:
678 act_record['email'] = TECH
679 act_record['message'] = message[0]
680 act_record['action'] = ['sendmailagain-waitforoneweekaction' ]
681 act_record['second-mail-at-oneweek'] = True
683 act_record['message'] = None
684 act_record['action'] = ['waitforoneweekaction' ]
685 return None # don't send if there's no action
687 elif 'actintwoweeks' in diag_record['stage']:
688 if delta >= 7 * SPERDAY:
689 act_record['email'] = TECH | PI | USER
690 act_record['stage'] = 'stage_waitforever'
691 act_record['message'] = message[2]
692 act_record['action'] = ['suspendslices']
693 act_record['time'] = current_time # reset clock for waitforever
694 elif delta >= 3* SPERDAY and not 'second-mail-at-twoweeks' in act_record:
695 act_record['email'] = TECH | PI
696 act_record['message'] = message[1]
697 act_record['action'] = ['sendmailagain-waitfortwoweeksaction' ]
698 act_record['second-mail-at-twoweeks'] = True
700 act_record['message'] = None
701 act_record['action'] = ['waitfortwoweeksaction']
702 return None # don't send if there's no action
704 elif 'ticket_waitforever' in diag_record['stage']:
705 act_record['email'] = TECH
706 if 'first-found' not in act_record:
707 act_record['first-found'] = True
708 act_record['log'] += " firstfound"
709 act_record['action'] = ['ticket_waitforever']
710 act_record['message'] = None
711 act_record['time'] = current_time
713 if delta >= 7*SPERDAY:
714 act_record['action'] = ['ticket_waitforever']
715 act_record['message'] = None
716 act_record['time'] = current_time # reset clock
718 act_record['action'] = ['ticket_waitforever']
719 act_record['message'] = None
722 elif 'waitforever' in diag_record['stage']:
723 # more than 3 days since last action
724 # TODO: send only on weekdays.
725 # NOTE: expects that 'time' has been reset before entering waitforever stage
726 if delta >= 3*SPERDAY:
727 act_record['action'] = ['email-againwaitforever']
728 act_record['message'] = message[2]
729 act_record['time'] = current_time # reset clock
731 act_record['action'] = ['waitforever']
732 act_record['message'] = None
733 return None # don't send if there's no action
736 # There is no action to be taken, possibly b/c the stage has
737 # already been performed, but diagnose picked it up again.
739 # 1. stage is unknown, or
740 # 2. delta is not big enough to bump it to the next stage.
741 # TODO: figure out which. for now assume 2.
742 print "UNKNOWN!?!? %s" % nodename
743 act_record['action'] = ['unknown']
744 act_record['message'] = message[0]
748 print "%s" % act_record['log'],
749 print "%15s" % act_record['action']
752 def getMaxSlices(self, loginbase):
753 # if sickdb has a loginbase, then it will have at least one node.
756 for nodename in self.diagnose_in[loginbase].keys():
757 if nodename in self.findbad['nodes']:
758 site_stats = self.findbad['nodes'][nodename]['values']['plcsite']
761 if site_stats == None:
762 raise Exception, "loginbase with no nodes in findbad"
764 return site_stats['max_slices']
766 def getNumNodes(self, loginbase):
767 # if sickdb has a loginbase, then it will have at least one node.
770 for nodename in self.diagnose_in[loginbase].keys():
771 if nodename in self.findbad['nodes']:
772 site_stats = self.findbad['nodes'][nodename]['values']['plcsite']
775 if site_stats == None:
776 raise Exception, "loginbase with no nodes in findbad"
778 return site_stats['num_nodes']
781 Returns number of up nodes as the total number *NOT* in act_all with a
782 stage other than 'steady-state' .
784 def getUpAtSite(self, loginbase, d_diag_site):
785 # TODO: THIS DOESN"T WORK!!! it misses all the 'debug' state nodes
786 # that aren't recorded yet.
788 numnodes = self.getNumNodes(loginbase)
789 # NOTE: assume nodes we have no record of are ok. (too conservative)
790 # TODO: make the 'up' value more representative
792 for nodename in d_diag_site[loginbase]['nodes'].keys():
794 rec = d_diag_site[loginbase]['nodes'][nodename]
795 if rec['stage'] != 'monitor-end-record':
798 pass # the node is assumed to be up.
801 # print "ERROR: %s total nodes up and down != %d" % (loginbase, numnodes)
807 def __init__(self, parameter_names=['hostname', 'ticket_id']):
808 self.parameter_names = parameter_names
809 def checkParam(self, args):
810 for param in self.parameter_names:
811 if param not in args:
812 raise Exception("Parameter %s not provided in args"%param)
814 self.checkParam(args)
815 return self._run(args)
816 def _run(self, args):
819 class SuspendAction(SiteAction):
820 def _run(self, args):
821 return plc.suspendSlices(args['hostname'])
823 class RemoveSliceCreation(SiteAction):
824 def _run(self, args):
825 return plc.removeSliceCreation(args['hostname'])
827 class BackoffActions(SiteAction):
828 def _run(self, args):
829 plc.enableSlices(args['hostname'])
830 plc.enableSliceCreation(args['hostname'])
833 # TODO: create class for each action below,
834 # allow for lists of actions to be performed...
836 def close_rt_backoff(args):
837 if 'ticket_id' in args and (args['ticket_id'] != "" and args['ticket_id'] != None):
838 mailer.closeTicketViaRT(args['ticket_id'],
839 "Ticket CLOSED automatically by SiteAssist.")
840 plc.enableSlices(args['hostname'])
841 plc.enableSliceCreation(args['hostname'])
844 def reset_nodemanager(args):
845 os.system("ssh root@%s /sbin/service nm restart" % nodename)
848 class Action(Thread):
849 def __init__(self, l_action):
850 self.l_action = l_action
852 # the hostname to loginbase mapping
853 self.plcdb_hn2lb = soltesz.dbLoad("plcdb_hn2lb")
856 self.diagnose_db = soltesz.if_cached_else(1, "diagnose_out", lambda : {})
858 self.act_all = soltesz.if_cached_else(1, "act_all", lambda : {})
860 # A dict of actions to specific functions. PICKLE doesnt' like lambdas.
862 self.actions['suspendslices'] = lambda args: plc.suspendSlices(args['hostname'])
863 self.actions['nocreate'] = lambda args: plc.removeSliceCreation(args['hostname'])
864 self.actions['close_rt'] = lambda args: close_rt_backoff(args)
865 self.actions['rins'] = lambda args: plc.nodeBootState(args['hostname'], "rins")
866 self.actions['noop'] = lambda args: args
867 self.actions['reset_nodemanager'] = lambda args: args # reset_nodemanager(args)
869 self.actions['ticket_waitforever'] = lambda args: args
870 self.actions['waitforever'] = lambda args: args
871 self.actions['unknown'] = lambda args: args
872 self.actions['waitforoneweekaction'] = lambda args: args
873 self.actions['waitfortwoweeksaction'] = lambda args: args
874 self.actions['sendmailagain-waitforoneweekaction'] = lambda args: args
875 self.actions['sendmailagain-waitfortwoweeksaction'] = lambda args: args
876 self.actions['email-againwaitforever'] = lambda args: args
877 self.actions['email-againticket_waitforever'] = lambda args: args
881 Thread.__init__(self)
885 print "Accumulated %d sick sites" % len(self.sickdb.keys())
886 logger.debug("Accumulated %d sick sites" % len(self.sickdb.keys()))
889 stats = self.analyseSites()
890 except Exception, err:
891 print "----------------"
893 print traceback.print_exc()
895 if config.policysavedb:
896 print "Saving Databases... act_all"
897 soltesz.dbDump("act_all", self.act_all)
900 print_stats("sites_observed", stats)
901 print_stats("sites_diagnosed", stats)
902 print_stats("nodes_diagnosed", stats)
903 print_stats("sites_emailed", stats)
904 print_stats("nodes_actedon", stats)
905 print string.join(stats['allsites'], ",")
907 if config.policysavedb:
908 print "Saving Databases... act_all"
909 #soltesz.dbDump("policy.eventlog", self.eventlog)
910 # TODO: remove 'diagnose_out',
911 # or at least the entries that were acted on.
912 soltesz.dbDump("act_all", self.act_all)
914 def accumSites(self):
916 Take all nodes, from l_action, look them up in the diagnose_db database,
917 and insert them into sickdb[] as:
919 This way only the given l_action nodes will be acted on regardless
920 of how many from diagnose_db are available.
922 sickdb[loginbase][nodename] = diag_record
924 # TODO: what if l_action == None ?
925 for nodename in self.l_action:
927 loginbase = self.plcdb_hn2lb[nodename]
929 if loginbase in self.diagnose_db and \
930 nodename in self.diagnose_db[loginbase]['nodes']:
932 diag_record = self.diagnose_db[loginbase]['nodes'][nodename]
934 if loginbase not in self.sickdb:
935 self.sickdb[loginbase] = {'nodes' : {}}
937 # NOTE: don't copy all node records, since not all will be in l_action
938 self.sickdb[loginbase]['nodes'][nodename] = diag_record
939 # NOTE: but, we want to get the loginbase config settings,
940 # this is the easiest way.
941 self.sickdb[loginbase]['config'] = self.diagnose_db[loginbase]['config']
943 #print "%s not in diagnose_db!!" % loginbase
946 def __emailSite(self, loginbase, roles, message, args):
948 loginbase is the unique site abbreviation, prepended to slice names.
949 roles contains TECH, PI, USER roles, and derive email aliases.
950 record contains {'message': [<subj>,<body>], 'args': {...}}
953 args.update({'loginbase':loginbase})
955 if not config.mail and not config.debug and config.bcc:
957 if config.mail and config.debug:
963 contacts += [config.email]
965 contacts += [TECHEMAIL % loginbase]
967 contacts += [PIEMAIL % loginbase]
969 slices = plc.slices(loginbase)
972 contacts += [SLICEMAIL % slice]
973 print "SLIC: %20s : %d slices" % (loginbase, len(slices))
975 print "SLIC: %20s : 0 slices" % loginbase
978 subject = message[0] % args
979 body = message[1] % args
982 if 'ticket_id' in args:
983 subj = "Re: [PL #%s] %s" % (args['ticket_id'], subject)
985 subj = "Re: [PL noticket] %s" % subject
986 mailer.email(subj, body, contacts)
987 ticket_id = args['ticket_id']
989 ticket_id = mailer.emailViaRT(subject, body, contacts, args['ticket_id'])
990 except Exception, err:
991 print "exception on message:"
993 print traceback.print_exc()
999 def _format_diaginfo(self, diag_node):
1000 info = diag_node['info']
1001 if diag_node['stage'] == 'monitor-end-record':
1002 hlist = " %s went from '%s' to '%s'\n" % (info[0], info[1], info[2])
1004 hlist = " %s %s - %s\n" % (info[0], info[2], info[1]) #(node,ver,daysdn)
1008 def get_email_args(self, act_recordlist):
1011 email_args['hostname_list'] = ""
1013 for act_record in act_recordlist:
1014 email_args['hostname_list'] += act_record['msg_format']
1015 email_args['hostname'] = act_record['nodename']
1016 if 'ticket_id' in act_record:
1017 email_args['ticket_id'] = act_record['ticket_id']
1021 def get_unique_issues(self, act_recordlist):
1022 # NOTE: only send one email per site, per problem...
1024 for act_record in act_recordlist:
1025 act_key = act_record['action'][0]
1026 if act_key not in unique_issues:
1027 unique_issues[act_key] = []
1029 unique_issues[act_key] += [act_record]
1031 return unique_issues
1034 def __actOnSite(self, loginbase, site_record):
1040 for nodename in site_record['nodes'].keys():
1041 diag_record = site_record['nodes'][nodename]
1042 act_record = self.__actOnNode(diag_record)
1043 #print "nodename: %s %s" % (nodename, act_record)
1044 act_recordlist += [act_record]
1046 unique_issues = self.get_unique_issues(act_recordlist)
1048 for issue in unique_issues.keys():
1049 print "\tworking on issue: %s" % issue
1050 issue_record_list = unique_issues[issue]
1051 email_args = self.get_email_args(issue_record_list)
1053 act_record = issue_record_list[0]
1054 # send message before squeezing
1055 print "\t\tconfig.email: %s and %s" % (act_record['message'] != None,
1056 site_record['config']['email'])
1057 if act_record['message'] != None and site_record['config']['email']:
1058 ticket_id = self.__emailSite(loginbase, act_record['email'],
1059 act_record['message'], email_args)
1061 # Add ticket_id to ALL nodenames
1062 for act_record in issue_record_list:
1063 nodename = act_record['nodename']
1064 # update node record with RT ticket_id
1065 if nodename in self.act_all:
1066 self.act_all[nodename][0]['ticket_id'] = "%s" % ticket_id
1067 if config.mail: i_nodes_emailed += 1
1069 print "\t\tconfig.squeeze: %s and %s" % (config.squeeze,
1070 site_record['config']['squeeze'])
1071 if config.squeeze and site_record['config']['squeeze']:
1072 for act_key in act_record['action']:
1073 self.actions[act_key](email_args)
1074 i_nodes_actedon += 1
1076 if config.policysavedb:
1077 print "Saving Databases... act_all, diagnose_out"
1078 soltesz.dbDump("act_all", self.act_all)
1079 # remove site record from diagnose_out, it's in act_all as done.
1080 del self.diagnose_db[loginbase]
1081 soltesz.dbDump("diagnose_out", self.diagnose_db)
1083 print "sleeping for 1 sec"
1085 #print "Hit enter to continue..."
1087 #line = sys.stdin.readline()
1089 return (i_nodes_actedon, i_nodes_emailed)
1091 def __actOnNode(self, diag_record):
1092 nodename = diag_record['nodename']
1093 message = diag_record['message']
1096 act_record.update(diag_record)
1097 act_record['nodename'] = nodename
1098 act_record['msg_format'] = self._format_diaginfo(diag_record)
1100 print "%s" % act_record['log'],
1101 print "%15s" % act_record['action']
1103 if act_record['stage'] is not 'monitor-end-record' and \
1104 act_record['stage'] is not 'nmreset':
1105 if nodename not in self.act_all:
1106 self.act_all[nodename] = []
1108 self.act_all[nodename].insert(0,act_record)
1110 print "Not recording %s in act_all" % nodename
1114 def analyseSites(self):
1115 i_sites_observed = 0
1116 i_sites_diagnosed = 0
1117 i_nodes_diagnosed = 0
1122 sorted_sites = self.sickdb.keys()
1124 for loginbase in sorted_sites:
1125 site_record = self.sickdb[loginbase]
1126 print "sites: %s" % loginbase
1128 i_nodes_diagnosed += len(site_record.keys())
1129 i_sites_diagnosed += 1
1131 (na,ne) = self.__actOnSite(loginbase, site_record)
1133 i_sites_observed += 1
1134 i_nodes_actedon += na
1135 i_sites_emailed += ne
1137 l_allsites += [loginbase]
1139 return {'sites_observed': i_sites_observed,
1140 'sites_diagnosed': i_sites_diagnosed,
1141 'nodes_diagnosed': i_nodes_diagnosed,
1142 'sites_emailed': i_sites_emailed,
1143 'nodes_actedon': i_nodes_actedon,
1144 'allsites':l_allsites}
1146 def print_stats(self, key, stats):
1147 print "%20s : %d" % (key, stats[key])
1152 #Prints, logs, and emails status of up nodes, down nodes, and buckets.
1155 # sub = "Monitor Summary"
1156 # msg = "\nThe following nodes were acted upon: \n\n"
1157 # for (node, (type, date)) in self.emailed.items():
1158 # # Print only things acted on today.
1159 # if (time.gmtime(time.time())[2] == time.gmtime(date)[2]):
1160 # msg +="%s\t(%s)\t%s\n" %(node, type, time.ctime(date))
1161 # msg +="\n\nThe following sites have been 'squeezed':\n\n"
1162 # for (loginbase, (date, type)) in self.squeezed.items():
1163 # # Print only things acted on today.
1164 # if (time.gmtime(time.time())[2] == time.gmtime(date)[2]):
1165 # msg +="%s\t(%s)\t%s\n" %(loginbase, type, time.ctime(date))
1166 # mailer.email(sub, msg, [SUMTO])
1171 #Store/Load state of emails. When, where, what.
1173 #def emailedStore(self, action):
1175 # if action == "LOAD":
1176 # f = open(DAT, "r+")
1177 # logger.info("POLICY: Found and reading " + DAT)
1178 # self.emailed.update(pickle.load(f))
1179 # if action == "WRITE":
1180 # f = open(DAT, "w")
1181 # #logger.debug("Writing " + DAT)
1182 # pickle.dump(self.emailed, f)
1184 # except Exception, err:
1185 # logger.info("POLICY: Problem with DAT, %s" %err)
1188 #class Policy(Thread):
1191 print "policy.py is a module, not a script for running directly."
1193 if __name__ == '__main__':
1198 except KeyboardInterrupt:
1199 print "Killed. Exitting."
1200 logger.info('Monitor Killed')