+ diagnose.py: added --refresh option so that cached values can be refresh, and either
[monitor.git] / policy.py
1 #
2 # Copyright (c) 2004  The Trustees of Princeton University (Trustees).
3 #
4 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
5 #
6 # $Id: policy.py,v 1.16 2007/08/08 13:30:42 soltesz Exp $
7 #
8 # Policy Engine.
9
10 #from monitor import *
11 from threading import *
12 import time
13 import logging
14 import mailer
15 import emailTxt
16 import pickle
17 import Queue
18 import plc
19 import sys
20 import reboot
21 import soltesz
22 import string
23 from printbadbysite import cmpCategoryVal
24 from config import config
25 print "policy"
26 config = config()
27
28 DAT="./monitor.dat"
29
30 logger = logging.getLogger("monitor")
31
32 # Time to enforce policy
33 POLSLEEP = 7200
34
35 # Where to email the summary
36 SUMTO = "soltesz@cs.princeton.edu"
37 TECHEMAIL="tech-%s@sites.planet-lab.org"
38 PIEMAIL="pi-%s@sites.planet-lab.org"
39 SLICEMAIL="%s@slices.planet-lab.org"
40 PLCEMAIL="support@planet-lab.org"
41
42 #Thresholds (DAYS)
43 SPERDAY = 86400
44 PITHRESH = 7 * SPERDAY
45 SLICETHRESH = 7 * SPERDAY
46 # Days before attempting rins again
47 RINSTHRESH = 5 * SPERDAY
48
49 # Days before calling the node dead.
50 DEADTHRESH = 30 * SPERDAY
51 # Minimum number of nodes up before squeezing
52 MINUP = 2
53
54 TECH=1
55 PI=2
56 USER=4
57 ADMIN=8
58
59 # IF:
60 #  no SSH, down.
61 #  bad disk, down
62 #  DNS, kinda down (sick)
63 #  clock, kinda down (sick)
64 #  Full disk, going to be down
65
66 # Actions:
67 #  Email
68 #  suspend slice creation
69 #  kill slices
70 def array_to_priority_map(array):
71         """ Create a mapping where each entry of array is given a priority equal
72         to its position in the array.  This is useful for subsequent use in the
73         cmpMap() function."""
74         map = {}
75         count = 0
76         for i in array:
77                 map[i] = count
78                 count += 1
79         return map
80
81 def getdebug():
82         return config.debug
83
84 def print_stats(key, stats):
85         if key in stats: print "%20s : %d" % (key, stats[key])
86
87 class Merge(Thread):
88         def __init__(self, l_merge, toRT):
89                 self.toRT = toRT
90                 self.merge_list = l_merge
91                 # the hostname to loginbase mapping
92                 self.plcdb_hn2lb = soltesz.dbLoad("plcdb_hn2lb")
93
94                 # Previous actions taken on nodes.
95                 self.act_all = soltesz.if_cached_else(1, "act_all", lambda : {})
96                 self.findbad = soltesz.if_cached_else(1, "findbad", lambda : {})
97
98                 self.cache_all = soltesz.if_cached_else(1, "act_all", lambda : {})
99                 self.sickdb = {}
100                 self.mergedb = {}
101                 Thread.__init__(self)
102
103         def run(self):
104                 # populate sickdb
105                 self.accumSickSites()
106                 # read data from findbad and act_all
107                 self.mergeActionsAndBadDB()
108                 # pass node_records to RT
109                 self.sendToRT()
110
111         def accumSickSites(self):
112                 """
113                 Take all nodes, from l_diagnose, look them up in the act_all database, 
114                 and insert them into sickdb[] as:
115
116                         sickdb[loginbase][nodename] = fb_record
117                 """
118                 # look at all problems reported by findbad
119                 l_nodes = self.findbad['nodes'].keys()
120                 count = 0
121                 for nodename in l_nodes:
122                         if nodename not in self.merge_list:
123                                 continue                # skip this node, since it's not wanted
124
125                         count += 1
126                         loginbase = self.plcdb_hn2lb[nodename]
127                         values = self.findbad['nodes'][nodename]['values']
128
129                         fb_record = {}
130                         fb_record['nodename'] = nodename
131                         fb_record['category'] = values['category']
132                         fb_record['state'] = values['state']
133                         fb_record['comonstats'] = values['comonstats']
134                         fb_record['plcnode'] = values['plcnode']
135                         fb_record['kernel'] = self.getKernel(values['kernel'])
136                         fb_record['stage'] = "findbad"
137                         fb_record['message'] = None
138                         fb_record['bootcd'] = values['bootcd']
139                         fb_record['args'] = None
140                         fb_record['info'] = None
141                         fb_record['time'] = time.time()
142                         fb_record['date_created'] = time.time()
143
144                         if loginbase not in self.sickdb:
145                                 self.sickdb[loginbase] = {}
146
147                         self.sickdb[loginbase][nodename] = fb_record
148
149                 print "Found %d nodes" % count
150
151         def getKernel(self, unamestr):
152                 s = unamestr.split()
153                 if len(s) > 2:
154                         return s[2]
155                 else:
156                         return ""
157
158         def mergeActionsAndBadDB(self): 
159                 """
160                 - Look at the sick node_records as reported in findbad, 
161                 - Then look at the node_records in act_all.  
162
163                 There are four cases:
164                 1) Problem in findbad, no problem in act_all
165                         this ok, b/c it just means it's a new problem
166                 2) Problem in findbad, problem in act_all
167                         -Did the problem get better or worse?  
168                                 -If Same, or Worse, then continue looking for open tickets.
169                                 -If Better, or No problem, then "back-off" penalties.
170                                         This judgement may need to wait until 'Diagnose()'
171
172                 3) No problem in findbad, problem in act_all
173                         The the node is operational again according to Findbad()
174
175                 4) No problem in findbad, no problem in act_all
176                         There won't be a record in either db, so there's no code.
177                 """
178
179                 sorted_sites = self.sickdb.keys()
180                 sorted_sites.sort()
181                 # look at all problems reported by findbad
182                 for loginbase in sorted_sites:
183                         d_fb_nodes = self.sickdb[loginbase]
184                         sorted_nodes = d_fb_nodes.keys()
185                         sorted_nodes.sort()
186                         for nodename in sorted_nodes:
187                                 fb_record = self.sickdb[loginbase][nodename]
188                                 x = fb_record
189                                 if loginbase not in self.mergedb:
190                                         self.mergedb[loginbase] = {}
191
192                                 # We must compare findbad state with act_all state
193                                 if nodename not in self.act_all:
194                                         # 1) ok, b/c it's a new problem. set ticket_id to null
195                                         self.mergedb[loginbase][nodename] = {} 
196                                         self.mergedb[loginbase][nodename].update(x)
197                                         self.mergedb[loginbase][nodename]['ticket_id'] = ""
198                                         self.mergedb[loginbase][nodename]['prev_category'] = None
199                                 else: 
200                                         y = self.act_all[nodename][0]
201
202                                         # skip if end-stage
203                                         if 'stage' in y and "monitor-end-record" in y['stage']:
204                                                 continue
205
206                                         # for legacy actions
207                                         if 'bucket' in y and y['bucket'][0] == 'dbg':
208                                                 # Only bootcd debugs made it to the act_all db.
209                                                 y['prev_category'] = "OLDBOOTCD"
210                                         elif 'bucket' in y and y['bucket'][0] == 'down':
211                                                 y['prev_category'] = "ERROR"
212                                         elif 'bucket' not in y:
213                                                 # for all other actions, just carry over the
214                                                 # previous category
215                                                 y['prev_category'] = y['category']
216                                         else:
217                                                 print "UNKNOWN state for record: %s" % y
218                                                 sys.exit(1)
219                                         # determine through translation, if the buckets match
220                                         #if 'category' in y and x['category'] == y['category']:
221                                         #       b_match = True
222                                         #elif x['category'] == "OLDBOOTCD" and y['bucket'][0] == 'dbg':
223                                         #       b_match = True
224                                         #elif x['category'] == "ERROR" and y['bucket'][0] == 'down':
225                                         #       b_match = True
226                                         #else:
227                                         #       b_match = False
228
229                                         #if b_match: 
230                                         #       # 2b) ok, b/c they agree that there's still a problem..
231                                         #       # 2b) Comon & Monitor still agree; RT ticket?
232                                         #       y['prev_category'] = y['category']
233                                         #else:
234                                         #       # 2a) mismatch, need a policy for how to resolve
235                                         #       #     resolution will be handled in __diagnoseNode()
236                                         #       #         for now just record the two categories.
237                                         #       #if x['category'] == "PROD" and x['state'] == "BOOT" and \
238                                         #       # ( y['bucket'][0] == 'down' or  y['bucket'][0] == 'dbg'):
239                                         #       print "FINDBAD and MONITOR have a mismatch: %s vs %s" % \
240                                         #                               (x['category'], y['bucket'])
241
242
243                                         self.mergedb[loginbase][nodename] = {}
244                                         self.mergedb[loginbase][nodename].update(y)
245                                         self.mergedb[loginbase][nodename]['comonstats'] = x['comonstats']
246                                         self.mergedb[loginbase][nodename]['category']   = x['category']
247                                         self.mergedb[loginbase][nodename]['state'] = x['state']
248                                         self.mergedb[loginbase][nodename]['kernel']=x['kernel']
249                                         self.mergedb[loginbase][nodename]['bootcd']=x['bootcd']
250                                         self.mergedb[loginbase][nodename]['plcnode']=x['plcnode']
251                                         # delete the entry from cache_all to keep it out of case 3)
252                                         del self.cache_all[nodename]
253
254                 # 3) nodes that remin in cache_all were not identified by findbad.
255                 #        Do we keep them or not?
256                 #   NOTE: i think that since the categories are performed before this
257                 #               step now, and by a monitor-controlled agent.
258
259                 # TODO: This does not work correctly.  Do we need this? 
260                 #for hn in self.cache_all.keys():
261                 #       y = self.act_all[hn][0]
262                 #       if 'monitor' in y['bucket']:
263                 #               loginbase = self.plcdb_hn2lb[hn] 
264                 #               if loginbase not in self.sickdb:
265                 #                       self.sickdb[loginbase] = {}
266                 #               self.sickdb[loginbase][hn] = y
267                 #       else:
268                 #               del self.cache_all[hn]
269
270                 print "len of cache_all: %d" % len(self.cache_all.keys())
271                 return
272
273         def sendToRT(self):
274                 sorted_sites = self.mergedb.keys()
275                 sorted_sites.sort()
276                 # look at all problems reported by merge
277                 for loginbase in sorted_sites:
278                         d_merge_nodes = self.mergedb[loginbase]
279                         for nodename in d_merge_nodes.keys():
280                                 record = self.mergedb[loginbase][nodename]
281                                 self.toRT.put(record)
282
283                 # send signal to stop reading
284                 self.toRT.put(None)
285                 return
286
287 class Diagnose(Thread):
288         def __init__(self, fromRT):
289                 self.fromRT = fromRT
290                 self.plcdb_hn2lb = soltesz.dbLoad("plcdb_hn2lb")
291                 self.findbad = soltesz.if_cached_else(1, "findbad", lambda : {})
292
293                 self.diagnose_in = {}
294                 self.diagnose_out = {}
295                 Thread.__init__(self)
296
297
298         def run(self):
299                 self.accumSickSites()
300
301                 print "Accumulated %d sick sites" % len(self.diagnose_in.keys())
302                 logger.debug("Accumulated %d sick sites" % len(self.diagnose_in.keys()))
303
304                 try:
305                         stats = self.diagnoseAll()
306                 except Exception, err:
307                         print "----------------"
308                         import traceback
309                         print traceback.print_exc()
310                         print err
311                         #if config.policysavedb:
312                         sys.exit(1)
313
314                 print_stats("sites_observed", stats)
315                 print_stats("sites_diagnosed", stats)
316                 print_stats("nodes_diagnosed", stats)
317
318                 if config.policysavedb:
319                         print "Saving Databases... diagnose_out"
320                         soltesz.dbDump("diagnose_out", self.diagnose_out)
321
322         def accumSickSites(self):
323                 """
324                 Take all nodes, from l_diagnose, look them up in the diagnose_out database, 
325                 and insert them into diagnose_in[] as:
326
327                         diagnose_in[loginbase] = [diag_node1, diag_node2, ...]
328                 """
329                 while 1:
330                         node_record = self.fromRT.get(block = True)
331                         if node_record == None:
332                                 break;
333
334                         nodename = node_record['nodename']
335                         loginbase = self.plcdb_hn2lb[nodename]
336
337                         if loginbase not in self.diagnose_in:
338                                 self.diagnose_in[loginbase] = {}
339
340                         self.diagnose_in[loginbase][nodename] = node_record
341
342                 return
343
344         def diagnoseAll(self):
345                 i_sites_observed = 0
346                 i_sites_diagnosed = 0
347                 i_nodes_diagnosed = 0
348                 i_nodes_actedon = 0
349                 i_sites_emailed = 0
350                 l_allsites = []
351
352                 sorted_sites = self.diagnose_in.keys()
353                 sorted_sites.sort()
354                 self.diagnose_out= {}
355                 for loginbase in sorted_sites:
356                         l_allsites += [loginbase]
357
358                         d_diag_nodes = self.diagnose_in[loginbase]
359                         d_act_records = self.__diagnoseSite(loginbase, d_diag_nodes)
360                         # store records in diagnose_out, for saving later.
361                         self.diagnose_out.update(d_act_records)
362                         
363                         if len(d_act_records[loginbase]['nodes'].keys()) > 0:
364                                 i_nodes_diagnosed += (len(d_act_records[loginbase]['nodes'].keys()))
365                                 i_sites_diagnosed += 1
366                         i_sites_observed += 1
367
368                 return {'sites_observed': i_sites_observed, 
369                                 'sites_diagnosed': i_sites_diagnosed, 
370                                 'nodes_diagnosed': i_nodes_diagnosed, 
371                                 'allsites':l_allsites}
372
373                 pass
374                 
375         def __getDaysDown(self, diag_record, nodename):
376                 daysdown = -1
377                 if diag_record['comonstats']['sshstatus'] != "null":
378                         daysdown = int(diag_record['comonstats']['sshstatus']) // (60*60*24)
379                 elif diag_record['comonstats']['lastcotop'] != "null":
380                         daysdown = int(diag_record['comonstats']['lastcotop']) // (60*60*24)
381                 else:
382                         now = time.time()
383                         last_contact = diag_record['plcnode']['last_contact']
384                         if last_contact == None:
385                                 # the node has never been up, so give it a break
386                                 daysdown = -1
387                         else:
388                                 diff = now - last_contact
389                                 daysdown = diff // (60*60*24)
390                 return daysdown
391
392         def __getStrDaysDown(self, diag_record, nodename):
393                 daysdown = self.__getDaysDown(diag_record, nodename)
394                 if daysdown > 0:
395                         return "(%d days down)"%daysdown
396                 else:
397                         return "Unknown number of days"
398
399         def __getCDVersion(self, diag_record, nodename):
400                 cdversion = ""
401                 #print "Getting kernel for: %s" % diag_record['nodename']
402                 cdversion = diag_record['kernel']
403                 return cdversion
404
405         def __diagnoseSite(self, loginbase, d_diag_nodes):
406                 """
407                 d_diag_nodes are diagnose_in entries.
408                 """
409                 d_diag_site = {loginbase : { 'config' : 
410                                                                                                 {'squeeze': False, 
411                                                                                                  'email': False
412                                                                                                 }, 
413                                                                         'nodes': {}
414                                                                         }
415                                            }
416                 sorted_nodes = d_diag_nodes.keys()
417                 sorted_nodes.sort()
418                 for nodename in sorted_nodes:
419                         node_record = d_diag_nodes[nodename]
420                         diag_record = self.__diagnoseNode(loginbase, node_record)
421
422                         if diag_record != None:
423                                 d_diag_site[loginbase]['nodes'][nodename] = diag_record
424                         else:
425                                 pass # there is nothing to do for this node.
426
427                 # NOTE: these settings can be overridden by command line arguments,
428                 #       or the state of a record, i.e. if already in RT's Support Queue.
429                 nodes_up = self.getUpAtSite(loginbase, d_diag_site)
430                 if nodes_up < MINUP:
431                         d_diag_site[loginbase]['config']['squeeze'] = True
432
433                 max_slices = self.getMaxSlices(loginbase)
434                 num_nodes = self.getNumNodes(loginbase)
435                 # NOTE: when max_slices == 0, this is either a new site (the old way)
436                 #       or an old disabled site from previous monitor (before site['enabled'])
437                 if nodes_up < num_nodes and max_slices != 0:
438                         d_diag_site[loginbase]['config']['email'] = True
439
440                 if len(d_diag_site[loginbase]['nodes'].keys()) > 0:
441                         print "SITE: %20s : %d nodes up, at most" % (loginbase, nodes_up)
442
443                 return d_diag_site
444
445         def diagRecordByCategory(self, node_record):
446                 nodename = node_record['nodename']
447                 category = node_record['category']
448                 state    = node_record['state']
449                 loginbase = self.plcdb_hn2lb[nodename]
450                 diag_record = None
451
452                 if  "ERROR" in category:        # i.e. "DOWN"
453                         diag_record = {}
454                         diag_record.update(node_record)
455                         daysdown = self.__getDaysDown(diag_record, nodename) 
456                         if daysdown < 7:
457                                 format = "DIAG: %20s : %-40s Down only %s days  NOTHING DONE"
458                                 print format % (loginbase, nodename, daysdown)
459                                 return None
460
461                         s_daysdown = self.__getStrDaysDown(diag_record, nodename)
462                         diag_record['message'] = emailTxt.mailtxt.newdown
463                         diag_record['args'] = {'nodename': nodename}
464                         diag_record['info'] = (nodename, s_daysdown, "")
465                         if diag_record['ticket_id'] == "":
466                                 diag_record['log'] = "DOWN: %20s : %-40s == %20s %s" % \
467                                         (loginbase, nodename, diag_record['info'][1:], diag_record['found_rt_ticket'])
468                         else:
469                                 diag_record['log'] = "DOWN: %20s : %-40s == %20s %s" % \
470                                         (loginbase, nodename, diag_record['info'][1:], diag_record['ticket_id'])
471
472                 elif "OLDBOOTCD" in category:
473                         # V2 boot cds as determined by findbad
474                         s_daysdown = self.__getStrDaysDown(node_record, nodename)
475                         s_cdversion = self.__getCDVersion(node_record, nodename)
476                         diag_record = {}
477                         diag_record.update(node_record)
478                         #if "2.4" in diag_record['kernel'] or "v2" in diag_record['bootcd']:
479                         diag_record['message'] = emailTxt.mailtxt.newbootcd
480                         diag_record['args'] = {'nodename': nodename}
481                         diag_record['info'] = (nodename, s_daysdown, s_cdversion)
482                         if diag_record['ticket_id'] == "":
483                                 diag_record['log'] = "BTCD: %20s : %-40s == %20s %20s %s" % \
484                                                                         (loginbase, nodename, diag_record['kernel'], 
485                                                                          diag_record['bootcd'], diag_record['found_rt_ticket'])
486                         else:
487                                 diag_record['log'] = "BTCD: %20s : %-40s == %20s %20s %s" % \
488                                                                         (loginbase, nodename, diag_record['kernel'], 
489                                                                          diag_record['bootcd'], diag_record['ticket_id'])
490
491                 elif "PROD" in category:
492                         if "DEBUG" in state:
493                                 # Not sure what to do with these yet.  Probably need to
494                                 # reboot, and email.
495                                 print "DEBG: %20s : %-40s  NOTHING DONE" % (loginbase, nodename)
496                                 return None
497                         elif "BOOT" in state:
498                                 # no action needed.
499                                 # TODO: remove penalties, if any are applied.
500                                 if 'improvement' in node_record['stage']:
501                                         # then we need to pass this on to 'action'
502                                         diag_record = {}
503                                         diag_record.update(node_record)
504                                         diag_record['message'] = emailTxt.mailtxt.newthankyou
505                                         diag_record['args'] = {'nodename': nodename}
506                                         diag_record['info'] = (nodename, node_record['prev_category'], 
507                                                                                                          node_record['category'])
508                                         if diag_record['ticket_id'] == "":
509                                                 diag_record['log'] = "IMPR: %20s : %-40s == %20s %20s %s %s" % \
510                                                                         (loginbase, nodename, diag_record['stage'], 
511                                                                          state, category, diag_record['found_rt_ticket'])
512                                         else:
513                                                 diag_record['log'] = "IMPR: %20s : %-40s == %20s %20s %s %s" % \
514                                                                         (loginbase, nodename, diag_record['stage'], 
515                                                                          state, category, diag_record['ticket_id'])
516                                         return diag_record
517                                 else:
518                                         return None
519                         else:
520                                 # unknown
521                                 pass
522                 elif "ALPHA"    in category:
523                         pass
524                 elif "clock_drift" in category:
525                         pass
526                 elif "dns"    in category:
527                         pass
528                 elif "filerw"    in category:
529                         pass
530                 else:
531                         print "Unknown category!!!! %s" % category
532                         sys.exit(1)
533
534                 return diag_record
535
536         def __diagnoseNode(self, loginbase, node_record):
537                 # TODO: change the format of the hostname in this 
538                 #               record to something more natural.
539                 nodename          = node_record['nodename']
540                 category          = node_record['category']
541                 prev_category = node_record['prev_category']
542                 state             = node_record['state']
543
544                 val = cmpCategoryVal(category, prev_category)
545                 if val == -1:
546                         # current category is worse than previous, carry on
547                         pass
548                 elif val == 1:
549                         # current category is better than previous
550                         # TODO: too generous for now, but will be handled correctly
551                         # TODO: if stage is currently ticket_waitforever, 
552                         if 'ticket_id' not in node_record:
553                                 print "ignoring: ", node_record['nodename']
554                                 return None
555                         else:
556                                 if node_record['ticket_id'] == "" or \
557                                    node_record['ticket_id'] == None:
558                                         print "closing: ", node_record['nodename']
559                                         node_record['action'] = ['close_rt']
560                                         node_record['message'] = None
561                                         node_record['stage'] = 'monitor-end-record'
562                                         return node_record
563                                         #return None
564                                 else:
565                                         node_record['stage'] = 'improvement'
566                 else:
567                         #values are equal, carry on.
568                         pass
569                         
570                 #### COMPARE category and prev_category
571                 # if not_equal
572                 #       then assign a stage based on relative priorities
573                 # else equal
574                 #       then check category for stats.
575                 diag_record = self.diagRecordByCategory(node_record)
576                 if diag_record == None:
577                         return None
578
579                 #### found_RT_ticket
580                 # TODO: need to record time found, and maybe add a stage for acting on it...
581                 if 'found_rt_ticket' in diag_record and \
582                         diag_record['found_rt_ticket'] is not None:
583                         if diag_record['stage'] is not 'improvement':
584                                 diag_record['stage'] = 'ticket_waitforever'
585                                 
586                 current_time = time.time()
587                 # take off four days, for the delay that database caused.
588                 # TODO: generalize delays at PLC, and prevent enforcement when there
589                 #               have been no emails.
590                 # NOTE: 7*SPERDAY exists to offset the 'bad week'
591                 delta = current_time - diag_record['time'] - 7*SPERDAY
592
593                 message = diag_record['message']
594                 act_record = {}
595                 act_record.update(diag_record)
596
597                 #### DIAGNOSE STAGES 
598                 #print "%s has stage %s" % (nodename, diag_record['stage'])
599                 if   'findbad' in diag_record['stage']:
600                         # The node is bad, and there's no previous record of it.
601                         act_record['email'] = TECH              # addative emails
602                         act_record['action'] = ['noop']
603                         act_record['message'] = message[0]
604                         act_record['stage'] = 'stage_actinoneweek'
605
606                 elif 'improvement' in diag_record['stage']:
607                         # - backoff previous squeeze actions (slice suspend, nocreate)
608                         # TODO: add a backoff_squeeze section... Needs to runthrough
609                         act_record['action'] = ['close_rt']
610                         act_record['message'] = message[0]
611                         act_record['stage'] = 'monitor-end-record'
612
613                 elif 'actinoneweek' in diag_record['stage']:
614                         if delta >= 7 * SPERDAY: 
615                                 act_record['email'] = TECH | PI
616                                 act_record['stage'] = 'stage_actintwoweeks'
617                                 act_record['message'] = message[1]
618                                 act_record['action'] = ['nocreate' ]
619                         elif delta >= 3* SPERDAY and not 'second-mail-at-oneweek' in act_record:
620                                 act_record['email'] = TECH 
621                                 act_record['message'] = message[0]
622                                 act_record['action'] = ['sendmailagain-waitforoneweekaction' ]
623                                 act_record['second-mail-at-oneweek'] = True
624                         else:
625                                 act_record['message'] = None
626                                 act_record['action'] = ['waitforoneweekaction' ]
627                                 return None                     # don't send if there's no action
628
629                 elif 'actintwoweeks' in diag_record['stage']:
630                         if delta >= 14 * SPERDAY:
631                                 act_record['email'] = TECH | PI | USER
632                                 act_record['stage'] = 'stage_waitforever'
633                                 act_record['message'] = message[2]
634                                 act_record['action'] = ['suspendslices']
635                                 act_record['time'] = current_time               # reset clock for waitforever
636                         elif delta >= 10* SPERDAY and not 'second-mail-at-twoweeks' in act_record:
637                                 act_record['email'] = TECH | PI
638                                 act_record['message'] = message[1]
639                                 act_record['action'] = ['sendmailagain-waitfortwoweeksaction' ]
640                                 act_record['second-mail-at-twoweeks'] = True
641                         else:
642                                 act_record['message'] = None
643                                 act_record['action'] = ['waitfortwoweeksaction']
644                                 return None                     # don't send if there's no action
645
646                 elif 'ticket_waitforever' in diag_record['stage']:
647                         act_record['email'] = TECH
648                         if 'first-found' not in act_record:
649                                 act_record['first-found'] = True
650                                 act_record['log'] += " firstfound"
651                                 act_record['action'] = ['ticket_waitforever']
652                                 act_record['message'] = None
653                                 act_record['time'] = current_time
654                         else:
655                                 if delta >= 7*SPERDAY:
656                                         act_record['action'] = ['ticket_waitforever']
657                                         act_record['message'] = None
658                                         act_record['time'] = current_time               # reset clock
659                                 else:
660                                         act_record['action'] = ['ticket_waitforever']
661                                         act_record['message'] = None
662                                         return None
663
664                 elif 'waitforever' in diag_record['stage']:
665                         # more than 3 days since last action
666                         # TODO: send only on weekdays.
667                         # NOTE: expects that 'time' has been reset before entering waitforever stage
668                         if delta >= 3*SPERDAY:
669                                 act_record['action'] = ['email-againwaitforever']
670                                 act_record['message'] = message[2]
671                                 act_record['time'] = current_time               # reset clock
672                         else:
673                                 act_record['action'] = ['waitforever']
674                                 act_record['message'] = None
675                                 return None                     # don't send if there's no action
676
677                 else:
678                         # There is no action to be taken, possibly b/c the stage has
679                         # already been performed, but diagnose picked it up again.
680                         # two cases, 
681                         #       1. stage is unknown, or 
682                         #       2. delta is not big enough to bump it to the next stage.
683                         # TODO: figure out which. for now assume 2.
684                         print "UNKNOWN!!? %s" % nodename
685                         act_record['action'] = ['unknown']
686                         act_record['message'] = message[0]
687                         print "Exiting..."
688                         sys.exit(1)
689
690                 print "%s" % act_record['log'],
691                 print "%15s" % act_record['action']
692                 return act_record
693
694         def getMaxSlices(self, loginbase):
695                 # if sickdb has a loginbase, then it will have at least one node.
696                 site_stats = None
697
698                 for nodename in self.diagnose_in[loginbase].keys():
699                         if nodename in self.findbad['nodes']:
700                                 site_stats = self.findbad['nodes'][nodename]['values']['plcsite']
701                                 break
702
703                 if site_stats == None:
704                         raise Exception, "loginbase with no nodes in findbad"
705                 else:
706                         return site_stats['max_slices']
707
708         def getNumNodes(self, loginbase):
709                 # if sickdb has a loginbase, then it will have at least one node.
710                 site_stats = None
711
712                 for nodename in self.diagnose_in[loginbase].keys():
713                         if nodename in self.findbad['nodes']:
714                                 site_stats = self.findbad['nodes'][nodename]['values']['plcsite']
715                                 break
716
717                 if site_stats == None:
718                         raise Exception, "loginbase with no nodes in findbad"
719                 else:
720                         return site_stats['num_nodes']
721
722         """
723         Returns number of up nodes as the total number *NOT* in act_all with a
724         stage other than 'steady-state' .
725         """
726         def getUpAtSite(self, loginbase, d_diag_site):
727                 # TODO: THIS DOESN"T WORK!!! it misses all the 'debug' state nodes
728                 #               that aren't recorded yet.
729
730                 numnodes = self.getNumNodes(loginbase)
731                 # NOTE: assume nodes we have no record of are ok. (too conservative)
732                 # TODO: make the 'up' value more representative
733                 up = numnodes
734                 for nodename in d_diag_site[loginbase]['nodes'].keys():
735
736                         rec = d_diag_site[loginbase]['nodes'][nodename]
737                         if rec['stage'] != 'monitor-end-record':
738                                 up -= 1
739                         else:
740                                 pass # the node is assumed to be up.
741
742                 #if up != numnodes:
743                 #       print "ERROR: %s total nodes up and down != %d" % (loginbase, numnodes)
744
745                 return up
746
747
748 class SiteAction:
749         def __init__(self, parameter_names=['hostname', 'ticket_id']):
750                 self.parameter_names = parameter_names
751         def checkParam(self, args):
752                 for param in self.parameter_names:
753                         if param not in args:
754                                 raise Exception("Parameter %s not provided in args"%param)
755         def run(self, args):
756                 self.checkParam(args)
757                 return self._run(args)
758         def _run(self, args):
759                 pass
760
761 class SuspendAction(SiteAction):
762         def _run(self, args):
763                 return plc.suspendSlices(args['hostname'])
764
765 class RemoveSliceCreation(SiteAction):
766         def _run(self, args):
767                 return plc.removeSliceCreation(args['hostname'])
768
769 class BackoffActions(SiteAction):
770         def _run(self, args):
771                 plc.enableSlices(args['hostname'])
772                 plc.enableSliceCreation(args['hostname'])
773                 return True
774
775 # TODO: create class for each action below, 
776 #               allow for lists of actions to be performed...
777
778 def close_rt_backoff(args):
779         if 'ticket_id' in args and (args['ticket_id'] != "" and args['ticket_id'] != None):
780                 mailer.closeTicketViaRT(args['ticket_id'], 
781                                                                 "Ticket CLOSED automatically by SiteAssist.")
782                 plc.enableSlices(args['hostname'])
783                 plc.enableSliceCreation(args['hostname'])
784         return
785
786 class Action(Thread):
787         def __init__(self, l_action):
788                 self.l_action = l_action
789
790                 # the hostname to loginbase mapping
791                 self.plcdb_hn2lb = soltesz.dbLoad("plcdb_hn2lb")
792
793                 # Actions to take.
794                 self.diagnose_db = soltesz.if_cached_else(1, "diagnose_out", lambda : {})
795                 # Actions taken.
796                 self.act_all   = soltesz.if_cached_else(1, "act_all", lambda : {})
797
798                 # A dict of actions to specific functions. PICKLE doesnt' like lambdas.
799                 self.actions = {}
800                 self.actions['suspendslices'] = lambda args: plc.suspendSlices(args['hostname'])
801                 self.actions['nocreate'] = lambda args: plc.removeSliceCreation(args['hostname'])
802                 self.actions['close_rt'] = lambda args: close_rt_backoff(args)
803                 self.actions['rins'] = lambda args: plc.nodeBootState(args['hostname'], "rins") 
804                 self.actions['noop'] = lambda args: args
805                 self.actions['ticket_waitforever'] = lambda args: args
806                 self.actions['waitforever'] = lambda args: args
807                 self.actions['unknown'] = lambda args: args
808                 self.actions['waitforoneweekaction'] = lambda args: args
809                 self.actions['waitfortwoweeksaction'] = lambda args: args
810                 self.actions['sendmailagain-waitforoneweekaction'] = lambda args: args
811                 self.actions['sendmailagain-waitfortwoweeksaction'] = lambda args: args
812                 self.actions['email-againwaitforever'] = lambda args: args
813                 self.actions['email-againticket_waitforever'] = lambda args: args
814                                 
815
816                 self.sickdb = {}
817                 Thread.__init__(self)
818
819         def run(self):
820                 self.accumSites()
821                 print "Accumulated %d sick sites" % len(self.sickdb.keys())
822                 logger.debug("Accumulated %d sick sites" % len(self.sickdb.keys()))
823
824                 try:
825                         stats = self.analyseSites()
826                 except Exception, err:
827                         print "----------------"
828                         import traceback
829                         print traceback.print_exc()
830                         print err
831                         if config.policysavedb:
832                                 print "Saving Databases... act_all"
833                                 soltesz.dbDump("act_all", self.act_all)
834                         sys.exit(1)
835
836                 print_stats("sites_observed", stats)
837                 print_stats("sites_diagnosed", stats)
838                 print_stats("nodes_diagnosed", stats)
839                 print_stats("sites_emailed", stats)
840                 print_stats("nodes_actedon", stats)
841                 print string.join(stats['allsites'], ",")
842
843                 if config.policysavedb:
844                         print "Saving Databases... act_all"
845                         #soltesz.dbDump("policy.eventlog", self.eventlog)
846                         # TODO: remove 'diagnose_out', 
847                         #       or at least the entries that were acted on.
848                         soltesz.dbDump("act_all", self.act_all)
849
850         def accumSites(self):
851                 """
852                 Take all nodes, from l_action, look them up in the diagnose_db database, 
853                 and insert them into sickdb[] as:
854
855                 This way only the given l_action nodes will be acted on regardless
856                 of how many from diagnose_db are available.
857
858                         sickdb[loginbase][nodename] = diag_record
859                 """
860                 # TODO: what if l_action == None ?
861                 for nodename in self.l_action:
862
863                         loginbase = self.plcdb_hn2lb[nodename]
864
865                         if loginbase in self.diagnose_db and \
866                                 nodename in self.diagnose_db[loginbase]['nodes']:
867
868                                 diag_record = self.diagnose_db[loginbase]['nodes'][nodename]
869
870                                 if loginbase not in self.sickdb:
871                                         self.sickdb[loginbase] = {'nodes' : {}}
872
873                                 # NOTE: don't copy all node records, since not all will be in l_action
874                                 self.sickdb[loginbase]['nodes'][nodename] = diag_record
875                                 # NOTE: but, we want to get the loginbase config settings, 
876                                 #               this is the easiest way.
877                                 self.sickdb[loginbase]['config'] = self.diagnose_db[loginbase]['config']
878                         #else:
879                                 #print "%s not in diagnose_db!!" % loginbase
880                 return
881
882         def __emailSite(self, loginbase, roles, message, args):
883                 """
884                 loginbase is the unique site abbreviation, prepended to slice names.
885                 roles contains TECH, PI, USER roles, and derive email aliases.
886                 record contains {'message': [<subj>,<body>], 'args': {...}} 
887                 """
888                 ticket_id = 0
889                 args.update({'loginbase':loginbase})
890
891                 if not config.mail and not config.debug and config.bcc:
892                         roles = ADMIN
893                 if config.mail and config.debug:
894                         roles = ADMIN
895
896                 # build targets
897                 contacts = []
898                 if ADMIN & roles:
899                         contacts += [config.email]
900                 if TECH & roles:
901                         contacts += [TECHEMAIL % loginbase]
902                 if PI & roles:
903                         contacts += [PIEMAIL % loginbase]
904                 if USER & roles:
905                         slices = plc.slices(loginbase)
906                         if len(slices) >= 1:
907                                 for slice in slices:
908                                         contacts += [SLICEMAIL % slice]
909                                 print "SLIC: %20s : %d slices" % (loginbase, len(slices))
910                         else:
911                                 print "SLIC: %20s : 0 slices" % loginbase
912
913                 try:
914                         subject = message[0] % args
915                         body = message[1] % args
916                         if ADMIN & roles:
917                                 # send only to admin
918                                 if 'ticket_id' in args:
919                                         subj = "Re: [PL #%s] %s" % (args['ticket_id'], subject)
920                                 else:
921                                         subj = "Re: [PL noticket] %s" % subject
922                                 mailer.email(subj, body, contacts)
923                                 ticket_id = args['ticket_id']
924                         else:
925                                 #if 'ticket_id' in args and 'ticket_id' != "":
926                                 #       # Reformat Subject to include Ticket_ID for RT
927                                 #       subj = "Re: [PL #%s] %s" % (args['ticket_id'], subject)
928                                 #       # RT remembers old contacts, so only add new users
929                                 #       mailer.email(subj, body, ['monitor@planet-lab.org'] + contacts)
930                                 #       ticket_id = args['ticket_id']
931                                 #else:
932                                 #       ticket_id = mailer.emailViaRT(subject, body, contacts)  
933                                 ticket_id = mailer.emailViaRT(subject, body, contacts, args['ticket_id'])
934                 except Exception, err:
935                         print "exception on message:"
936                         import traceback
937                         print traceback.print_exc()
938                         print message
939
940                 return ticket_id
941
942
943         def _format_diaginfo(self, diag_node):
944                 info = diag_node['info']
945                 if diag_node['stage'] == 'monitor-end-record':
946                         hlist = "    %s went from '%s' to '%s'\n" % (info[0], info[1], info[2]) 
947                 else:
948                         hlist = "    %s %s - %s\n" % (info[0], info[2], info[1]) #(node,ver,daysdn)
949                 return hlist
950
951
952         def get_email_args(self, act_recordlist):
953
954                 email_args = {}
955                 email_args['hostname_list'] = ""
956
957                 for act_record in act_recordlist:
958                         email_args['hostname_list'] += act_record['msg_format']
959                         email_args['hostname'] = act_record['nodename']
960                         if 'ticket_id' in act_record:
961                                 email_args['ticket_id'] = act_record['ticket_id']
962
963                 return email_args
964
965         def get_unique_issues(self, act_recordlist):
966                 # NOTE: only send one email per site, per problem...
967                 unique_issues = {}
968                 for act_record in act_recordlist:
969                         act_key = act_record['action'][0]
970                         if act_key not in unique_issues:
971                                 unique_issues[act_key] = []
972                                 
973                         unique_issues[act_key] += [act_record]
974                         
975                 return unique_issues
976                         
977
978         def __actOnSite(self, loginbase, site_record):
979                 i_nodes_actedon = 0
980                 i_nodes_emailed = 0
981
982                 act_recordlist = []
983
984                 for nodename in site_record['nodes'].keys():
985                         diag_record = site_record['nodes'][nodename]
986                         act_record  = self.__actOnNode(diag_record)
987                         #print "nodename: %s %s" % (nodename, act_record)
988                         act_recordlist += [act_record]
989
990                 unique_issues = self.get_unique_issues(act_recordlist)
991
992                 for issue in unique_issues.keys():
993                         print "\tworking on issue: %s" % issue
994                         issue_record_list = unique_issues[issue]
995                         email_args = self.get_email_args(issue_record_list)
996                         
997                         act_record = issue_record_list[0]
998                         # send message before squeezing
999                         print "\t\tconfig.email: %s and %s" % (act_record['message'] != None, 
1000                                                                                                 site_record['config']['email'])
1001                         if act_record['message'] != None and site_record['config']['email']:
1002                                 ticket_id = self.__emailSite(loginbase, act_record['email'], 
1003                                                                                          act_record['message'], email_args)
1004
1005                                 # Add ticket_id to ALL nodenames
1006                                 for act_record in issue_record_list:
1007                                         nodename = act_record['nodename']
1008                                         # update node record with RT ticket_id
1009                                         self.act_all[nodename][0]['ticket_id'] = "%s" % ticket_id
1010                                         if config.mail: i_nodes_emailed += 1
1011
1012                         print "\t\tconfig.squeeze: %s and %s" % (config.squeeze,
1013                                                                                                         site_record['config']['squeeze'])
1014                         if config.squeeze and site_record['config']['squeeze']:
1015                                 for act_key in act_record['action']:
1016                                         #act_key = act_record['action']
1017                                         self.actions[act_key](email_args)
1018                                         i_nodes_actedon += 1
1019                 
1020                 if config.policysavedb:
1021                         print "Saving Databases... act_all, diagnose_out"
1022                         soltesz.dbDump("act_all", self.act_all)
1023                         # remove site record from diagnose_out, it's in act_all as done.
1024                         del self.diagnose_db[loginbase]
1025                         soltesz.dbDump("diagnose_out", self.diagnose_db)
1026
1027                 #print "sleeping for 1 sec"
1028                 #time.sleep(1)
1029                 print "Hit enter to continue..."
1030                 sys.stdout.flush()
1031                 line = sys.stdin.readline()
1032
1033                 return (i_nodes_actedon, i_nodes_emailed)
1034
1035         def __actOnNode(self, diag_record):
1036                 nodename = diag_record['nodename']
1037                 message = diag_record['message']
1038
1039                 act_record = {}
1040                 act_record.update(diag_record)
1041                 act_record['nodename'] = nodename
1042                 act_record['msg_format'] = self._format_diaginfo(diag_record)
1043
1044                 print "%s" % act_record['log'],
1045                 print "%15s" % act_record['action']
1046
1047                 if act_record['stage'] is not 'monitor-end-record':
1048                         if nodename not in self.act_all: 
1049                                 self.act_all[nodename] = []
1050
1051                         self.act_all[nodename].insert(0,act_record)
1052                 else:
1053                         print "Not recording %s in act_all" % nodename
1054
1055                 return act_record
1056
1057         def analyseSites(self):
1058                 i_sites_observed = 0
1059                 i_sites_diagnosed = 0
1060                 i_nodes_diagnosed = 0
1061                 i_nodes_actedon = 0
1062                 i_sites_emailed = 0
1063                 l_allsites = []
1064
1065                 sorted_sites = self.sickdb.keys()
1066                 sorted_sites.sort()
1067                 for loginbase in sorted_sites:
1068                         site_record = self.sickdb[loginbase]
1069                         print "sites: %s" % loginbase
1070                         
1071                         i_nodes_diagnosed += len(site_record.keys())
1072                         i_sites_diagnosed += 1
1073
1074                         (na,ne) = self.__actOnSite(loginbase, site_record)
1075
1076                         i_sites_observed += 1
1077                         i_nodes_actedon += na
1078                         i_sites_emailed += ne
1079
1080                         l_allsites += [loginbase]
1081
1082                 return {'sites_observed': i_sites_observed, 
1083                                 'sites_diagnosed': i_sites_diagnosed, 
1084                                 'nodes_diagnosed': i_nodes_diagnosed, 
1085                                 'sites_emailed': i_sites_emailed, 
1086                                 'nodes_actedon': i_nodes_actedon, 
1087                                 'allsites':l_allsites}
1088
1089         def print_stats(self, key, stats):
1090                 print "%20s : %d" % (key, stats[key])
1091
1092
1093
1094         #"""
1095         #Prints, logs, and emails status of up nodes, down nodes, and buckets.
1096         #"""
1097         #def status(self):
1098         #       sub = "Monitor Summary"
1099         #       msg = "\nThe following nodes were acted upon:  \n\n"
1100         #       for (node, (type, date)) in self.emailed.items():
1101         #               # Print only things acted on today.
1102         #               if (time.gmtime(time.time())[2] == time.gmtime(date)[2]):
1103         #                       msg +="%s\t(%s)\t%s\n" %(node, type, time.ctime(date))
1104         #       msg +="\n\nThe following sites have been 'squeezed':\n\n"
1105         #       for (loginbase, (date, type)) in self.squeezed.items():
1106         #               # Print only things acted on today.
1107         #               if (time.gmtime(time.time())[2] == time.gmtime(date)[2]):
1108         #                       msg +="%s\t(%s)\t%s\n" %(loginbase, type, time.ctime(date))
1109         #       mailer.email(sub, msg, [SUMTO])
1110         #       logger.info(msg)
1111         #       return 
1112
1113         #"""
1114         #Store/Load state of emails.  When, where, what.
1115         #"""
1116         #def emailedStore(self, action):
1117         #       try:
1118         #               if action == "LOAD":
1119         #                       f = open(DAT, "r+")
1120         #                       logger.info("POLICY:  Found and reading " + DAT)
1121         #                       self.emailed.update(pickle.load(f))
1122         #               if action == "WRITE":
1123         #                       f = open(DAT, "w")
1124         #                       #logger.debug("Writing " + DAT)
1125         #                       pickle.dump(self.emailed, f)
1126         #               f.close()
1127         #       except Exception, err:
1128         #               logger.info("POLICY:  Problem with DAT, %s" %err)
1129
1130
1131 #class Policy(Thread):
1132
1133 def main():
1134         print "policy.py is a module, not a script for running directly."
1135
1136 if __name__ == '__main__':
1137         import os
1138         import plc
1139         try:
1140                 main()
1141         except KeyboardInterrupt:
1142                 print "Killed.  Exitting."
1143                 logger.info('Monitor Killed')
1144                 os._exit(0)