Includes some checks for NM consistency via the 'last_updated' field in PLCdb.
[monitor.git] / policy.py
1 #
2 # Copyright (c) 2004  The Trustees of Princeton University (Trustees).
3 #
4 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
5 #
6 # $Id: policy.py,v 1.17 2007/08/29 17:26:50 soltesz Exp $
7 #
8 # Policy Engine.
9
10 #from monitor import *
11 from threading import *
12 import time
13 import logging
14 import mailer
15 import emailTxt
16 import pickle
17 import Queue
18 import plc
19 import sys
20 import reboot
21 import soltesz
22 import string
23 from printbadbysite import cmpCategoryVal
24 from config import config
25 print "policy"
26 config = config()
27
28 DAT="./monitor.dat"
29
30 logger = logging.getLogger("monitor")
31
32 # Time to enforce policy
33 POLSLEEP = 7200
34
35 # Where to email the summary
36 SUMTO = "soltesz@cs.princeton.edu"
37 TECHEMAIL="tech-%s@sites.planet-lab.org"
38 PIEMAIL="pi-%s@sites.planet-lab.org"
39 SLICEMAIL="%s@slices.planet-lab.org"
40 PLCEMAIL="support@planet-lab.org"
41
42 #Thresholds (DAYS)
43 SPERMIN = 60
44 SPERHOUR = 60*60
45 SPERDAY = 86400
46 PITHRESH = 7 * SPERDAY
47 SLICETHRESH = 7 * SPERDAY
48 # Days before attempting rins again
49 RINSTHRESH = 5 * SPERDAY
50
51 # Days before calling the node dead.
52 DEADTHRESH = 30 * SPERDAY
53 # Minimum number of nodes up before squeezing
54 MINUP = 2
55
56 TECH=1
57 PI=2
58 USER=4
59 ADMIN=8
60
61 # IF:
62 #  no SSH, down.
63 #  bad disk, down
64 #  DNS, kinda down (sick)
65 #  clock, kinda down (sick)
66 #  Full disk, going to be down
67
68 # Actions:
69 #  Email
70 #  suspend slice creation
71 #  kill slices
72 def array_to_priority_map(array):
73         """ Create a mapping where each entry of array is given a priority equal
74         to its position in the array.  This is useful for subsequent use in the
75         cmpMap() function."""
76         map = {}
77         count = 0
78         for i in array:
79                 map[i] = count
80                 count += 1
81         return map
82
83 def getdebug():
84         return config.debug
85
86 def print_stats(key, stats):
87         if key in stats: print "%20s : %d" % (key, stats[key])
88
89 class Merge(Thread):
90         def __init__(self, l_merge, toRT):
91                 self.toRT = toRT
92                 self.merge_list = l_merge
93                 # the hostname to loginbase mapping
94                 self.plcdb_hn2lb = soltesz.dbLoad("plcdb_hn2lb")
95
96                 # Previous actions taken on nodes.
97                 self.act_all = soltesz.if_cached_else(1, "act_all", lambda : {})
98                 self.findbad = soltesz.if_cached_else(1, "findbad", lambda : {})
99
100                 self.cache_all = soltesz.if_cached_else(1, "act_all", lambda : {})
101                 self.sickdb = {}
102                 self.mergedb = {}
103                 Thread.__init__(self)
104
105         def run(self):
106                 # populate sickdb
107                 self.accumSickSites()
108                 # read data from findbad and act_all
109                 self.mergeActionsAndBadDB()
110                 # pass node_records to RT
111                 self.sendToRT()
112
113         def accumSickSites(self):
114                 """
115                 Take all nodes, from l_diagnose, look them up in the act_all database, 
116                 and insert them into sickdb[] as:
117
118                         sickdb[loginbase][nodename] = fb_record
119                 """
120                 # look at all problems reported by findbad
121                 l_nodes = self.findbad['nodes'].keys()
122                 count = 0
123                 for nodename in l_nodes:
124                         if nodename not in self.merge_list:
125                                 continue                # skip this node, since it's not wanted
126
127                         count += 1
128                         loginbase = self.plcdb_hn2lb[nodename]
129                         values = self.findbad['nodes'][nodename]['values']
130
131                         fb_record = {}
132                         fb_record['nodename'] = nodename
133                         fb_record['category'] = values['category']
134                         fb_record['state'] = values['state']
135                         fb_record['comonstats'] = values['comonstats']
136                         fb_record['plcnode'] = values['plcnode']
137                         fb_record['kernel'] = self.getKernel(values['kernel'])
138                         fb_record['stage'] = "findbad"
139                         fb_record['message'] = None
140                         fb_record['bootcd'] = values['bootcd']
141                         fb_record['args'] = None
142                         fb_record['info'] = None
143                         fb_record['time'] = time.time()
144                         fb_record['date_created'] = time.time()
145
146                         if loginbase not in self.sickdb:
147                                 self.sickdb[loginbase] = {}
148
149                         self.sickdb[loginbase][nodename] = fb_record
150
151                 print "Found %d nodes" % count
152
153         def getKernel(self, unamestr):
154                 s = unamestr.split()
155                 if len(s) > 2:
156                         return s[2]
157                 else:
158                         return ""
159
160         def mergeActionsAndBadDB(self): 
161                 """
162                 - Look at the sick node_records as reported in findbad, 
163                 - Then look at the node_records in act_all.  
164
165                 There are four cases:
166                 1) Problem in findbad, no problem in act_all
167                         this ok, b/c it just means it's a new problem
168                 2) Problem in findbad, problem in act_all
169                         -Did the problem get better or worse?  
170                                 -If Same, or Worse, then continue looking for open tickets.
171                                 -If Better, or No problem, then "back-off" penalties.
172                                         This judgement may need to wait until 'Diagnose()'
173
174                 3) No problem in findbad, problem in act_all
175                         The the node is operational again according to Findbad()
176
177                 4) No problem in findbad, no problem in act_all
178                         There won't be a record in either db, so there's no code.
179                 """
180
181                 sorted_sites = self.sickdb.keys()
182                 sorted_sites.sort()
183                 # look at all problems reported by findbad
184                 for loginbase in sorted_sites:
185                         d_fb_nodes = self.sickdb[loginbase]
186                         sorted_nodes = d_fb_nodes.keys()
187                         sorted_nodes.sort()
188                         for nodename in sorted_nodes:
189                                 fb_record = self.sickdb[loginbase][nodename]
190                                 x = fb_record
191                                 if loginbase not in self.mergedb:
192                                         self.mergedb[loginbase] = {}
193
194                                 # We must compare findbad state with act_all state
195                                 if nodename not in self.act_all:
196                                         # 1) ok, b/c it's a new problem. set ticket_id to null
197                                         self.mergedb[loginbase][nodename] = {} 
198                                         self.mergedb[loginbase][nodename].update(x)
199                                         self.mergedb[loginbase][nodename]['ticket_id'] = ""
200                                         self.mergedb[loginbase][nodename]['prev_category'] = None
201                                 else: 
202                                         if len(self.act_all[nodename]) == 0:
203                                                 continue
204
205                                         y = self.act_all[nodename][0]
206
207                                         # skip if end-stage
208                                         if 'stage' in y and "monitor-end-record" in y['stage']:
209                                                 continue
210
211                                         # for legacy actions
212                                         if 'bucket' in y and y['bucket'][0] == 'dbg':
213                                                 # Only bootcd debugs made it to the act_all db.
214                                                 y['prev_category'] = "OLDBOOTCD"
215                                         elif 'bucket' in y and y['bucket'][0] == 'down':
216                                                 y['prev_category'] = "ERROR"
217                                         elif 'bucket' not in y:
218                                                 # for all other actions, just carry over the
219                                                 # previous category
220                                                 y['prev_category'] = y['category']
221                                         else:
222                                                 print "UNKNOWN state for record: %s" % y
223                                                 sys.exit(1)
224                                         # determine through translation, if the buckets match
225                                         #if 'category' in y and x['category'] == y['category']:
226                                         #       b_match = True
227                                         #elif x['category'] == "OLDBOOTCD" and y['bucket'][0] == 'dbg':
228                                         #       b_match = True
229                                         #elif x['category'] == "ERROR" and y['bucket'][0] == 'down':
230                                         #       b_match = True
231                                         #else:
232                                         #       b_match = False
233
234                                         #if b_match: 
235                                         #       # 2b) ok, b/c they agree that there's still a problem..
236                                         #       # 2b) Comon & Monitor still agree; RT ticket?
237                                         #       y['prev_category'] = y['category']
238                                         #else:
239                                         #       # 2a) mismatch, need a policy for how to resolve
240                                         #       #     resolution will be handled in __diagnoseNode()
241                                         #       #         for now just record the two categories.
242                                         #       #if x['category'] == "PROD" and x['state'] == "BOOT" and \
243                                         #       # ( y['bucket'][0] == 'down' or  y['bucket'][0] == 'dbg'):
244                                         #       print "FINDBAD and MONITOR have a mismatch: %s vs %s" % \
245                                         #                               (x['category'], y['bucket'])
246
247
248                                         self.mergedb[loginbase][nodename] = {}
249                                         self.mergedb[loginbase][nodename].update(y)
250                                         self.mergedb[loginbase][nodename]['comonstats'] = x['comonstats']
251                                         self.mergedb[loginbase][nodename]['category']   = x['category']
252                                         self.mergedb[loginbase][nodename]['state'] = x['state']
253                                         self.mergedb[loginbase][nodename]['kernel']=x['kernel']
254                                         self.mergedb[loginbase][nodename]['bootcd']=x['bootcd']
255                                         self.mergedb[loginbase][nodename]['plcnode']=x['plcnode']
256                                         # delete the entry from cache_all to keep it out of case 3)
257                                         del self.cache_all[nodename]
258
259                 # 3) nodes that remin in cache_all were not identified by findbad.
260                 #        Do we keep them or not?
261                 #   NOTE: i think that since the categories are performed before this
262                 #               step now, and by a monitor-controlled agent.
263
264                 # TODO: This does not work correctly.  Do we need this? 
265                 #for hn in self.cache_all.keys():
266                 #       y = self.act_all[hn][0]
267                 #       if 'monitor' in y['bucket']:
268                 #               loginbase = self.plcdb_hn2lb[hn] 
269                 #               if loginbase not in self.sickdb:
270                 #                       self.sickdb[loginbase] = {}
271                 #               self.sickdb[loginbase][hn] = y
272                 #       else:
273                 #               del self.cache_all[hn]
274
275                 print "len of cache_all: %d" % len(self.cache_all.keys())
276                 return
277
278         def sendToRT(self):
279                 sorted_sites = self.mergedb.keys()
280                 sorted_sites.sort()
281                 # look at all problems reported by merge
282                 for loginbase in sorted_sites:
283                         d_merge_nodes = self.mergedb[loginbase]
284                         for nodename in d_merge_nodes.keys():
285                                 record = self.mergedb[loginbase][nodename]
286                                 self.toRT.put(record)
287
288                 # send signal to stop reading
289                 self.toRT.put(None)
290                 return
291
292 class Diagnose(Thread):
293         def __init__(self, fromRT):
294                 self.fromRT = fromRT
295                 self.plcdb_hn2lb = soltesz.dbLoad("plcdb_hn2lb")
296                 self.findbad = soltesz.if_cached_else(1, "findbad", lambda : {})
297
298                 self.diagnose_in = {}
299                 self.diagnose_out = {}
300                 Thread.__init__(self)
301
302
303         def run(self):
304                 self.accumSickSites()
305
306                 print "Accumulated %d sick sites" % len(self.diagnose_in.keys())
307                 logger.debug("Accumulated %d sick sites" % len(self.diagnose_in.keys()))
308
309                 try:
310                         stats = self.diagnoseAll()
311                 except Exception, err:
312                         print "----------------"
313                         import traceback
314                         print traceback.print_exc()
315                         print err
316                         #if config.policysavedb:
317                         sys.exit(1)
318
319                 print_stats("sites_observed", stats)
320                 print_stats("sites_diagnosed", stats)
321                 print_stats("nodes_diagnosed", stats)
322
323                 if config.policysavedb:
324                         print "Saving Databases... diagnose_out"
325                         soltesz.dbDump("diagnose_out", self.diagnose_out)
326
327         def accumSickSites(self):
328                 """
329                 Take all nodes, from l_diagnose, look them up in the diagnose_out database, 
330                 and insert them into diagnose_in[] as:
331
332                         diagnose_in[loginbase] = [diag_node1, diag_node2, ...]
333                 """
334                 while 1:
335                         node_record = self.fromRT.get(block = True)
336                         if node_record == None:
337                                 break;
338
339                         nodename = node_record['nodename']
340                         loginbase = self.plcdb_hn2lb[nodename]
341
342                         if loginbase not in self.diagnose_in:
343                                 self.diagnose_in[loginbase] = {}
344
345                         self.diagnose_in[loginbase][nodename] = node_record
346
347                 return
348
349         def diagnoseAll(self):
350                 i_sites_observed = 0
351                 i_sites_diagnosed = 0
352                 i_nodes_diagnosed = 0
353                 i_nodes_actedon = 0
354                 i_sites_emailed = 0
355                 l_allsites = []
356
357                 sorted_sites = self.diagnose_in.keys()
358                 sorted_sites.sort()
359                 self.diagnose_out= {}
360                 for loginbase in sorted_sites:
361                         l_allsites += [loginbase]
362
363                         d_diag_nodes = self.diagnose_in[loginbase]
364                         d_act_records = self.__diagnoseSite(loginbase, d_diag_nodes)
365                         # store records in diagnose_out, for saving later.
366                         self.diagnose_out.update(d_act_records)
367                         
368                         if len(d_act_records[loginbase]['nodes'].keys()) > 0:
369                                 i_nodes_diagnosed += (len(d_act_records[loginbase]['nodes'].keys()))
370                                 i_sites_diagnosed += 1
371                         i_sites_observed += 1
372
373                 return {'sites_observed': i_sites_observed, 
374                                 'sites_diagnosed': i_sites_diagnosed, 
375                                 'nodes_diagnosed': i_nodes_diagnosed, 
376                                 'allsites':l_allsites}
377
378                 pass
379                 
380         def __getDaysDown(self, diag_record, nodename):
381                 daysdown = -1
382                 if diag_record['comonstats']['sshstatus'] != "null":
383                         daysdown = int(diag_record['comonstats']['sshstatus']) // (60*60*24)
384                 elif diag_record['comonstats']['lastcotop'] != "null":
385                         daysdown = int(diag_record['comonstats']['lastcotop']) // (60*60*24)
386                 else:
387                         now = time.time()
388                         last_contact = diag_record['plcnode']['last_contact']
389                         if last_contact == None:
390                                 # the node has never been up, so give it a break
391                                 daysdown = -1
392                         else:
393                                 diff = now - last_contact
394                                 daysdown = diff // (60*60*24)
395                 return daysdown
396
397         def __getStrDaysDown(self, diag_record, nodename):
398                 daysdown = self.__getDaysDown(diag_record, nodename)
399                 if daysdown > 0:
400                         return "(%d days down)"%daysdown
401                 else:
402                         return "Unknown number of days"
403
404         def __getCDVersion(self, diag_record, nodename):
405                 cdversion = ""
406                 #print "Getting kernel for: %s" % diag_record['nodename']
407                 cdversion = diag_record['kernel']
408                 return cdversion
409
410         def __diagnoseSite(self, loginbase, d_diag_nodes):
411                 """
412                 d_diag_nodes are diagnose_in entries.
413                 """
414                 d_diag_site = {loginbase : { 'config' : 
415                                                                                                 {'squeeze': False,
416                                                                                                  'email': False
417                                                                                                 }, 
418                                                                         'nodes': {}
419                                                                         }
420                                            }
421                 sorted_nodes = d_diag_nodes.keys()
422                 sorted_nodes.sort()
423                 for nodename in sorted_nodes:
424                         node_record = d_diag_nodes[nodename]
425                         diag_record = self.__diagnoseNode(loginbase, node_record)
426
427                         if diag_record != None:
428                                 d_diag_site[loginbase]['nodes'][nodename] = diag_record
429
430                                 # NOTE: improvement means, we need to act/squeeze and email.
431                                 #print "DIAG_RECORD", diag_record
432                                 if 'monitor-end-record' in diag_record['stage'] or \
433                                    'nmreset' in diag_record['stage']:
434                                 #       print "resetting loginbase!" 
435                                         d_diag_site[loginbase]['config']['squeeze'] = True
436                                         d_diag_site[loginbase]['config']['email'] = True
437                                 #else:
438                                 #       print "NO IMPROVEMENT!!!!"
439                         else:
440                                 pass # there is nothing to do for this node.
441
442                 # NOTE: these settings can be overridden by command line arguments,
443                 #       or the state of a record, i.e. if already in RT's Support Queue.
444                 nodes_up = self.getUpAtSite(loginbase, d_diag_site)
445                 if nodes_up < MINUP:
446                         d_diag_site[loginbase]['config']['squeeze'] = True
447
448                 max_slices = self.getMaxSlices(loginbase)
449                 num_nodes = self.getNumNodes(loginbase)
450                 # NOTE: when max_slices == 0, this is either a new site (the old way)
451                 #       or an old disabled site from previous monitor (before site['enabled'])
452                 if nodes_up < num_nodes and max_slices != 0:
453                         d_diag_site[loginbase]['config']['email'] = True
454
455                 if len(d_diag_site[loginbase]['nodes'].keys()) > 0:
456                         print "SITE: %20s : %d nodes up, at most" % (loginbase, nodes_up)
457
458                 return d_diag_site
459
460         def diagRecordByCategory(self, node_record):
461                 nodename = node_record['nodename']
462                 category = node_record['category']
463                 state    = node_record['state']
464                 loginbase = self.plcdb_hn2lb[nodename]
465                 diag_record = None
466
467                 if  "ERROR" in category:        # i.e. "DOWN"
468                         diag_record = {}
469                         diag_record.update(node_record)
470                         daysdown = self.__getDaysDown(diag_record, nodename) 
471                         if daysdown < 7:
472                                 format = "DIAG: %20s : %-40s Down only %s days  NOTHING DONE"
473                                 print format % (loginbase, nodename, daysdown)
474                                 return None
475
476                         s_daysdown = self.__getStrDaysDown(diag_record, nodename)
477                         diag_record['message'] = emailTxt.mailtxt.newdown
478                         diag_record['args'] = {'nodename': nodename}
479                         diag_record['info'] = (nodename, s_daysdown, "")
480                         if diag_record['ticket_id'] == "":
481                                 diag_record['log'] = "DOWN: %20s : %-40s == %20s %s" % \
482                                         (loginbase, nodename, diag_record['info'][1:], diag_record['found_rt_ticket'])
483                         else:
484                                 diag_record['log'] = "DOWN: %20s : %-40s == %20s %s" % \
485                                         (loginbase, nodename, diag_record['info'][1:], diag_record['ticket_id'])
486
487                 elif "OLDBOOTCD" in category:
488                         # V2 boot cds as determined by findbad
489                         s_daysdown = self.__getStrDaysDown(node_record, nodename)
490                         s_cdversion = self.__getCDVersion(node_record, nodename)
491                         diag_record = {}
492                         diag_record.update(node_record)
493                         #if "2.4" in diag_record['kernel'] or "v2" in diag_record['bootcd']:
494                         diag_record['message'] = emailTxt.mailtxt.newbootcd
495                         diag_record['args'] = {'nodename': nodename}
496                         diag_record['info'] = (nodename, s_daysdown, s_cdversion)
497                         if diag_record['ticket_id'] == "":
498                                 diag_record['log'] = "BTCD: %20s : %-40s == %20s %20s %s" % \
499                                                                         (loginbase, nodename, diag_record['kernel'], 
500                                                                          diag_record['bootcd'], diag_record['found_rt_ticket'])
501                         else:
502                                 diag_record['log'] = "BTCD: %20s : %-40s == %20s %20s %s" % \
503                                                                         (loginbase, nodename, diag_record['kernel'], 
504                                                                          diag_record['bootcd'], diag_record['ticket_id'])
505
506                 elif "PROD" in category:
507                         if "DEBUG" in state:
508                                 # Not sure what to do with these yet.  Probably need to
509                                 # reboot, and email.
510                                 print "DEBG: %20s : %-40s  NOTHING DONE" % (loginbase, nodename)
511                                 return None
512                         elif "BOOT" in state:
513                                 # no action needed.
514                                 # TODO: remove penalties, if any are applied.
515                                 now = time.time()
516                                 last_contact = node_record['plcnode']['last_contact']
517                                 if last_contact == None:
518                                         time_diff = 0
519                                 else:
520                                         time_diff = now - last_contact;
521
522                                 if 'improvement' in node_record['stage']:
523                                         # then we need to pass this on to 'action'
524                                         diag_record = {}
525                                         diag_record.update(node_record)
526                                         diag_record['message'] = emailTxt.mailtxt.newthankyou
527                                         diag_record['args'] = {'nodename': nodename}
528                                         diag_record['info'] = (nodename, node_record['prev_category'], 
529                                                                                                          node_record['category'])
530                                         if diag_record['ticket_id'] == "":
531                                                 diag_record['log'] = "IMPR: %20s : %-40s == %20s %20s %s %s" % \
532                                                                         (loginbase, nodename, diag_record['stage'], 
533                                                                          state, category, diag_record['found_rt_ticket'])
534                                         else:
535                                                 diag_record['log'] = "IMPR: %20s : %-40s == %20s %20s %s %s" % \
536                                                                         (loginbase, nodename, diag_record['stage'], 
537                                                                          state, category, diag_record['ticket_id'])
538                                         return diag_record
539                                 elif time_diff >= 6*SPERHOUR:
540                                         # heartbeat is older than 30 min.
541                                         # then reset NM.
542                                         #print "Possible NM problem!! %s - %s = %s" % (now, last_contact, time_diff)
543                                         diag_record = {}
544                                         diag_record.update(node_record)
545                                         diag_record['message'] = emailTxt.mailtxt.NMReset
546                                         diag_record['args'] = {'nodename': nodename}
547                                         diag_record['stage'] = "nmreset"
548                                         diag_record['info'] = (nodename, 
549                                                                                         node_record['prev_category'], 
550                                                                                         node_record['category'])
551                                         if diag_record['ticket_id'] == "":
552                                                 diag_record['log'] = "NM  : %20s : %-40s == %20s %20s %s %s" % \
553                                                                         (loginbase, nodename, diag_record['stage'], 
554                                                                          state, category, diag_record['found_rt_ticket'])
555                                         else:
556                                                 diag_record['log'] = "NM  : %20s : %-40s == %20s" % \
557                                                                         (loginbase, nodename, diag_record['stage'])
558
559                                         return diag_record
560                                 else:
561                                         return None
562                         else:
563                                 # unknown
564                                 pass
565                 elif "ALPHA"    in category:
566                         pass
567                 elif "clock_drift" in category:
568                         pass
569                 elif "dns"    in category:
570                         pass
571                 elif "filerw"    in category:
572                         pass
573                 else:
574                         print "Unknown category!!!! %s" % category
575                         sys.exit(1)
576
577                 return diag_record
578
579         def __diagnoseNode(self, loginbase, node_record):
580                 # TODO: change the format of the hostname in this 
581                 #               record to something more natural.
582                 nodename          = node_record['nodename']
583                 category          = node_record['category']
584                 prev_category = node_record['prev_category']
585                 state             = node_record['state']
586
587                 val = cmpCategoryVal(category, prev_category)
588                 if val == -1:
589                         # current category is worse than previous, carry on
590                         pass
591                 elif val == 1:
592                         # current category is better than previous
593                         # TODO: too generous for now, but will be handled correctly
594                         # TODO: if stage is currently ticket_waitforever, 
595                         if 'ticket_id' not in node_record:
596                                 print "ignoring: ", node_record['nodename']
597                                 return None
598                         else:
599                                 if node_record['ticket_id'] == "" or \
600                                    node_record['ticket_id'] == None:
601                                         print "closing: ", node_record['nodename']
602                                         node_record['action'] = ['close_rt']
603                                         node_record['message'] = None
604                                         node_record['stage'] = 'monitor-end-record'
605                                         return node_record
606                                         #return None
607                                 else:
608                                         node_record['stage'] = 'improvement'
609                 else:
610                         #values are equal, carry on.
611                         pass
612                         
613                 #### COMPARE category and prev_category
614                 # if not_equal
615                 #       then assign a stage based on relative priorities
616                 # else equal
617                 #       then check category for stats.
618                 diag_record = self.diagRecordByCategory(node_record)
619                 if diag_record == None:
620                         return None
621
622                 #### found_RT_ticket
623                 # TODO: need to record time found, and maybe add a stage for acting on it...
624                 if 'found_rt_ticket' in diag_record and \
625                         diag_record['found_rt_ticket'] is not None:
626                         if diag_record['stage'] is not 'improvement':
627                                 diag_record['stage'] = 'ticket_waitforever'
628                                 
629                 current_time = time.time()
630                 # take off four days, for the delay that database caused.
631                 # TODO: generalize delays at PLC, and prevent enforcement when there
632                 #               have been no emails.
633                 # NOTE: 7*SPERDAY exists to offset the 'bad week'
634                 #delta = current_time - diag_record['time'] - 7*SPERDAY
635                 delta = current_time - diag_record['time']
636
637                 message = diag_record['message']
638                 act_record = {}
639                 act_record.update(diag_record)
640
641                 #### DIAGNOSE STAGES 
642                 if   'findbad' in diag_record['stage']:
643                         # The node is bad, and there's no previous record of it.
644                         act_record['email'] = TECH
645                         act_record['action'] = ['noop']
646                         act_record['message'] = message[0]
647                         act_record['stage'] = 'stage_actinoneweek'
648
649                 elif 'nmreset' in diag_record['stage']:
650                         act_record['email']  = ADMIN 
651                         act_record['action'] = ['reset_nodemanager']
652                         act_record['message'] = message[0]
653                         act_record['stage']  = 'nmreset'
654                         
655                 elif 'improvement' in diag_record['stage']:
656                         # - backoff previous squeeze actions (slice suspend, nocreate)
657                         # TODO: add a backoff_squeeze section... Needs to runthrough
658                         act_record['action'] = ['close_rt']
659                         act_record['message'] = message[0]
660                         act_record['stage'] = 'monitor-end-record'
661
662                 elif 'actinoneweek' in diag_record['stage']:
663                         if delta >= 7 * SPERDAY: 
664                                 act_record['email'] = TECH | PI
665                                 act_record['stage'] = 'stage_actintwoweeks'
666                                 act_record['message'] = message[1]
667                                 act_record['action'] = ['nocreate' ]
668                                 act_record['time'] = current_time               # reset clock for waitforever
669                         elif delta >= 3* SPERDAY and not 'second-mail-at-oneweek' in act_record:
670                                 act_record['email'] = TECH 
671                                 act_record['message'] = message[0]
672                                 act_record['action'] = ['sendmailagain-waitforoneweekaction' ]
673                                 act_record['second-mail-at-oneweek'] = True
674                         else:
675                                 act_record['message'] = None
676                                 act_record['action'] = ['waitforoneweekaction' ]
677                                 return None                     # don't send if there's no action
678
679                 elif 'actintwoweeks' in diag_record['stage']:
680                         if delta >= 7 * SPERDAY:
681                                 act_record['email'] = TECH | PI | USER
682                                 act_record['stage'] = 'stage_waitforever'
683                                 act_record['message'] = message[2]
684                                 act_record['action'] = ['suspendslices']
685                                 act_record['time'] = current_time               # reset clock for waitforever
686                         elif delta >= 3* SPERDAY and not 'second-mail-at-twoweeks' in act_record:
687                                 act_record['email'] = TECH | PI
688                                 act_record['message'] = message[1]
689                                 act_record['action'] = ['sendmailagain-waitfortwoweeksaction' ]
690                                 act_record['second-mail-at-twoweeks'] = True
691                         else:
692                                 act_record['message'] = None
693                                 act_record['action'] = ['waitfortwoweeksaction']
694                                 return None                     # don't send if there's no action
695
696                 elif 'ticket_waitforever' in diag_record['stage']:
697                         act_record['email'] = TECH
698                         if 'first-found' not in act_record:
699                                 act_record['first-found'] = True
700                                 act_record['log'] += " firstfound"
701                                 act_record['action'] = ['ticket_waitforever']
702                                 act_record['message'] = None
703                                 act_record['time'] = current_time
704                         else:
705                                 if delta >= 7*SPERDAY:
706                                         act_record['action'] = ['ticket_waitforever']
707                                         act_record['message'] = None
708                                         act_record['time'] = current_time               # reset clock
709                                 else:
710                                         act_record['action'] = ['ticket_waitforever']
711                                         act_record['message'] = None
712                                         return None
713
714                 elif 'waitforever' in diag_record['stage']:
715                         # more than 3 days since last action
716                         # TODO: send only on weekdays.
717                         # NOTE: expects that 'time' has been reset before entering waitforever stage
718                         if delta >= 3*SPERDAY:
719                                 act_record['action'] = ['email-againwaitforever']
720                                 act_record['message'] = message[2]
721                                 act_record['time'] = current_time               # reset clock
722                         else:
723                                 act_record['action'] = ['waitforever']
724                                 act_record['message'] = None
725                                 return None                     # don't send if there's no action
726
727                 else:
728                         # There is no action to be taken, possibly b/c the stage has
729                         # already been performed, but diagnose picked it up again.
730                         # two cases, 
731                         #       1. stage is unknown, or 
732                         #       2. delta is not big enough to bump it to the next stage.
733                         # TODO: figure out which. for now assume 2.
734                         print "UNKNOWN!?!? %s" % nodename
735                         act_record['action'] = ['unknown']
736                         act_record['message'] = message[0]
737                         print "Exiting..."
738                         sys.exit(1)
739
740                 print "%s" % act_record['log'],
741                 print "%15s" % act_record['action']
742                 return act_record
743
744         def getMaxSlices(self, loginbase):
745                 # if sickdb has a loginbase, then it will have at least one node.
746                 site_stats = None
747
748                 for nodename in self.diagnose_in[loginbase].keys():
749                         if nodename in self.findbad['nodes']:
750                                 site_stats = self.findbad['nodes'][nodename]['values']['plcsite']
751                                 break
752
753                 if site_stats == None:
754                         raise Exception, "loginbase with no nodes in findbad"
755                 else:
756                         return site_stats['max_slices']
757
758         def getNumNodes(self, loginbase):
759                 # if sickdb has a loginbase, then it will have at least one node.
760                 site_stats = None
761
762                 for nodename in self.diagnose_in[loginbase].keys():
763                         if nodename in self.findbad['nodes']:
764                                 site_stats = self.findbad['nodes'][nodename]['values']['plcsite']
765                                 break
766
767                 if site_stats == None:
768                         raise Exception, "loginbase with no nodes in findbad"
769                 else:
770                         return site_stats['num_nodes']
771
772         """
773         Returns number of up nodes as the total number *NOT* in act_all with a
774         stage other than 'steady-state' .
775         """
776         def getUpAtSite(self, loginbase, d_diag_site):
777                 # TODO: THIS DOESN"T WORK!!! it misses all the 'debug' state nodes
778                 #               that aren't recorded yet.
779
780                 numnodes = self.getNumNodes(loginbase)
781                 # NOTE: assume nodes we have no record of are ok. (too conservative)
782                 # TODO: make the 'up' value more representative
783                 up = numnodes
784                 for nodename in d_diag_site[loginbase]['nodes'].keys():
785
786                         rec = d_diag_site[loginbase]['nodes'][nodename]
787                         if rec['stage'] != 'monitor-end-record':
788                                 up -= 1
789                         else:
790                                 pass # the node is assumed to be up.
791
792                 #if up != numnodes:
793                 #       print "ERROR: %s total nodes up and down != %d" % (loginbase, numnodes)
794
795                 return up
796
797
798 class SiteAction:
799         def __init__(self, parameter_names=['hostname', 'ticket_id']):
800                 self.parameter_names = parameter_names
801         def checkParam(self, args):
802                 for param in self.parameter_names:
803                         if param not in args:
804                                 raise Exception("Parameter %s not provided in args"%param)
805         def run(self, args):
806                 self.checkParam(args)
807                 return self._run(args)
808         def _run(self, args):
809                 pass
810
811 class SuspendAction(SiteAction):
812         def _run(self, args):
813                 return plc.suspendSlices(args['hostname'])
814
815 class RemoveSliceCreation(SiteAction):
816         def _run(self, args):
817                 return plc.removeSliceCreation(args['hostname'])
818
819 class BackoffActions(SiteAction):
820         def _run(self, args):
821                 plc.enableSlices(args['hostname'])
822                 plc.enableSliceCreation(args['hostname'])
823                 return True
824
825 # TODO: create class for each action below, 
826 #               allow for lists of actions to be performed...
827
828 def close_rt_backoff(args):
829         if 'ticket_id' in args and (args['ticket_id'] != "" and args['ticket_id'] != None):
830                 mailer.closeTicketViaRT(args['ticket_id'], 
831                                                                 "Ticket CLOSED automatically by SiteAssist.")
832                 plc.enableSlices(args['hostname'])
833                 plc.enableSliceCreation(args['hostname'])
834         return
835
836 def reset_nodemanager(args):
837         os.system("ssh root@%s /sbin/service nm restart" % nodename)
838         return
839
840 class Action(Thread):
841         def __init__(self, l_action):
842                 self.l_action = l_action
843
844                 # the hostname to loginbase mapping
845                 self.plcdb_hn2lb = soltesz.dbLoad("plcdb_hn2lb")
846
847                 # Actions to take.
848                 self.diagnose_db = soltesz.if_cached_else(1, "diagnose_out", lambda : {})
849                 # Actions taken.
850                 self.act_all   = soltesz.if_cached_else(1, "act_all", lambda : {})
851
852                 # A dict of actions to specific functions. PICKLE doesnt' like lambdas.
853                 self.actions = {}
854                 self.actions['suspendslices'] = lambda args: plc.suspendSlices(args['hostname'])
855                 self.actions['nocreate'] = lambda args: plc.removeSliceCreation(args['hostname'])
856                 self.actions['close_rt'] = lambda args: close_rt_backoff(args)
857                 self.actions['rins'] = lambda args: plc.nodeBootState(args['hostname'], "rins") 
858                 self.actions['noop'] = lambda args: args
859                 self.actions['reset_nodemanager'] = lambda args: args # reset_nodemanager(args)
860
861                 self.actions['ticket_waitforever'] = lambda args: args
862                 self.actions['waitforever'] = lambda args: args
863                 self.actions['unknown'] = lambda args: args
864                 self.actions['waitforoneweekaction'] = lambda args: args
865                 self.actions['waitfortwoweeksaction'] = lambda args: args
866                 self.actions['sendmailagain-waitforoneweekaction'] = lambda args: args
867                 self.actions['sendmailagain-waitfortwoweeksaction'] = lambda args: args
868                 self.actions['email-againwaitforever'] = lambda args: args
869                 self.actions['email-againticket_waitforever'] = lambda args: args
870                                 
871
872                 self.sickdb = {}
873                 Thread.__init__(self)
874
875         def run(self):
876                 self.accumSites()
877                 print "Accumulated %d sick sites" % len(self.sickdb.keys())
878                 logger.debug("Accumulated %d sick sites" % len(self.sickdb.keys()))
879
880                 try:
881                         stats = self.analyseSites()
882                 except Exception, err:
883                         print "----------------"
884                         import traceback
885                         print traceback.print_exc()
886                         print err
887                         if config.policysavedb:
888                                 print "Saving Databases... act_all"
889                                 soltesz.dbDump("act_all", self.act_all)
890                         sys.exit(1)
891
892                 print_stats("sites_observed", stats)
893                 print_stats("sites_diagnosed", stats)
894                 print_stats("nodes_diagnosed", stats)
895                 print_stats("sites_emailed", stats)
896                 print_stats("nodes_actedon", stats)
897                 print string.join(stats['allsites'], ",")
898
899                 if config.policysavedb:
900                         print "Saving Databases... act_all"
901                         #soltesz.dbDump("policy.eventlog", self.eventlog)
902                         # TODO: remove 'diagnose_out', 
903                         #       or at least the entries that were acted on.
904                         soltesz.dbDump("act_all", self.act_all)
905
906         def accumSites(self):
907                 """
908                 Take all nodes, from l_action, look them up in the diagnose_db database, 
909                 and insert them into sickdb[] as:
910
911                 This way only the given l_action nodes will be acted on regardless
912                 of how many from diagnose_db are available.
913
914                         sickdb[loginbase][nodename] = diag_record
915                 """
916                 # TODO: what if l_action == None ?
917                 for nodename in self.l_action:
918
919                         loginbase = self.plcdb_hn2lb[nodename]
920
921                         if loginbase in self.diagnose_db and \
922                                 nodename in self.diagnose_db[loginbase]['nodes']:
923
924                                 diag_record = self.diagnose_db[loginbase]['nodes'][nodename]
925
926                                 if loginbase not in self.sickdb:
927                                         self.sickdb[loginbase] = {'nodes' : {}}
928
929                                 # NOTE: don't copy all node records, since not all will be in l_action
930                                 self.sickdb[loginbase]['nodes'][nodename] = diag_record
931                                 # NOTE: but, we want to get the loginbase config settings, 
932                                 #               this is the easiest way.
933                                 self.sickdb[loginbase]['config'] = self.diagnose_db[loginbase]['config']
934                         #else:
935                                 #print "%s not in diagnose_db!!" % loginbase
936                 return
937
938         def __emailSite(self, loginbase, roles, message, args):
939                 """
940                 loginbase is the unique site abbreviation, prepended to slice names.
941                 roles contains TECH, PI, USER roles, and derive email aliases.
942                 record contains {'message': [<subj>,<body>], 'args': {...}} 
943                 """
944                 ticket_id = 0
945                 args.update({'loginbase':loginbase})
946
947                 if not config.mail and not config.debug and config.bcc:
948                         roles = ADMIN
949                 if config.mail and config.debug:
950                         roles = ADMIN
951
952                 # build targets
953                 contacts = []
954                 if ADMIN & roles:
955                         contacts += [config.email]
956                 if TECH & roles:
957                         contacts += [TECHEMAIL % loginbase]
958                 if PI & roles:
959                         contacts += [PIEMAIL % loginbase]
960                 if USER & roles:
961                         slices = plc.slices(loginbase)
962                         if len(slices) >= 1:
963                                 for slice in slices:
964                                         contacts += [SLICEMAIL % slice]
965                                 print "SLIC: %20s : %d slices" % (loginbase, len(slices))
966                         else:
967                                 print "SLIC: %20s : 0 slices" % loginbase
968
969                 try:
970                         subject = message[0] % args
971                         body = message[1] % args
972                         if ADMIN & roles:
973                                 # send only to admin
974                                 if 'ticket_id' in args:
975                                         subj = "Re: [PL #%s] %s" % (args['ticket_id'], subject)
976                                 else:
977                                         subj = "Re: [PL noticket] %s" % subject
978                                 mailer.email(subj, body, contacts)
979                                 ticket_id = args['ticket_id']
980                         else:
981                                 ticket_id = mailer.emailViaRT(subject, body, contacts, args['ticket_id'])
982                 except Exception, err:
983                         print "exception on message:"
984                         import traceback
985                         print traceback.print_exc()
986                         print message
987
988                 return ticket_id
989
990
991         def _format_diaginfo(self, diag_node):
992                 info = diag_node['info']
993                 if diag_node['stage'] == 'monitor-end-record':
994                         hlist = "    %s went from '%s' to '%s'\n" % (info[0], info[1], info[2]) 
995                 else:
996                         hlist = "    %s %s - %s\n" % (info[0], info[2], info[1]) #(node,ver,daysdn)
997                 return hlist
998
999
1000         def get_email_args(self, act_recordlist):
1001
1002                 email_args = {}
1003                 email_args['hostname_list'] = ""
1004
1005                 for act_record in act_recordlist:
1006                         email_args['hostname_list'] += act_record['msg_format']
1007                         email_args['hostname'] = act_record['nodename']
1008                         if 'ticket_id' in act_record:
1009                                 email_args['ticket_id'] = act_record['ticket_id']
1010
1011                 return email_args
1012
1013         def get_unique_issues(self, act_recordlist):
1014                 # NOTE: only send one email per site, per problem...
1015                 unique_issues = {}
1016                 for act_record in act_recordlist:
1017                         act_key = act_record['action'][0]
1018                         if act_key not in unique_issues:
1019                                 unique_issues[act_key] = []
1020                                 
1021                         unique_issues[act_key] += [act_record]
1022                         
1023                 return unique_issues
1024                         
1025
1026         def __actOnSite(self, loginbase, site_record):
1027                 i_nodes_actedon = 0
1028                 i_nodes_emailed = 0
1029
1030                 act_recordlist = []
1031
1032                 for nodename in site_record['nodes'].keys():
1033                         diag_record = site_record['nodes'][nodename]
1034                         act_record  = self.__actOnNode(diag_record)
1035                         #print "nodename: %s %s" % (nodename, act_record)
1036                         act_recordlist += [act_record]
1037
1038                 unique_issues = self.get_unique_issues(act_recordlist)
1039
1040                 for issue in unique_issues.keys():
1041                         print "\tworking on issue: %s" % issue
1042                         issue_record_list = unique_issues[issue]
1043                         email_args = self.get_email_args(issue_record_list)
1044                         
1045                         act_record = issue_record_list[0]
1046                         # send message before squeezing
1047                         print "\t\tconfig.email: %s and %s" % (act_record['message'] != None, 
1048                                                                                                 site_record['config']['email'])
1049                         if act_record['message'] != None and site_record['config']['email']:
1050                                 ticket_id = self.__emailSite(loginbase, act_record['email'], 
1051                                                                                          act_record['message'], email_args)
1052
1053                                 # Add ticket_id to ALL nodenames
1054                                 for act_record in issue_record_list:
1055                                         nodename = act_record['nodename']
1056                                         # update node record with RT ticket_id
1057                                         if nodename in self.act_all:
1058                                                 self.act_all[nodename][0]['ticket_id'] = "%s" % ticket_id
1059                                         if config.mail: i_nodes_emailed += 1
1060
1061                         print "\t\tconfig.squeeze: %s and %s" % (config.squeeze,
1062                                                                                                         site_record['config']['squeeze'])
1063                         if config.squeeze and site_record['config']['squeeze']:
1064                                 for act_key in act_record['action']:
1065                                         self.actions[act_key](email_args)
1066                                 i_nodes_actedon += 1
1067                 
1068                 if config.policysavedb:
1069                         print "Saving Databases... act_all, diagnose_out"
1070                         soltesz.dbDump("act_all", self.act_all)
1071                         # remove site record from diagnose_out, it's in act_all as done.
1072                         del self.diagnose_db[loginbase]
1073                         soltesz.dbDump("diagnose_out", self.diagnose_db)
1074
1075                 print "sleeping for 1 sec"
1076                 time.sleep(1)
1077                 #print "Hit enter to continue..."
1078                 #sys.stdout.flush()
1079                 #line = sys.stdin.readline()
1080
1081                 return (i_nodes_actedon, i_nodes_emailed)
1082
1083         def __actOnNode(self, diag_record):
1084                 nodename = diag_record['nodename']
1085                 message = diag_record['message']
1086
1087                 act_record = {}
1088                 act_record.update(diag_record)
1089                 act_record['nodename'] = nodename
1090                 act_record['msg_format'] = self._format_diaginfo(diag_record)
1091
1092                 print "%s" % act_record['log'],
1093                 print "%15s" % act_record['action']
1094
1095                 if act_record['stage'] is not 'monitor-end-record' and \
1096                    act_record['stage'] is not 'nmreset':
1097                         if nodename not in self.act_all: 
1098                                 self.act_all[nodename] = []
1099
1100                         self.act_all[nodename].insert(0,act_record)
1101                 else:
1102                         print "Not recording %s in act_all" % nodename
1103
1104                 return act_record
1105
1106         def analyseSites(self):
1107                 i_sites_observed = 0
1108                 i_sites_diagnosed = 0
1109                 i_nodes_diagnosed = 0
1110                 i_nodes_actedon = 0
1111                 i_sites_emailed = 0
1112                 l_allsites = []
1113
1114                 sorted_sites = self.sickdb.keys()
1115                 sorted_sites.sort()
1116                 for loginbase in sorted_sites:
1117                         site_record = self.sickdb[loginbase]
1118                         print "sites: %s" % loginbase
1119                         
1120                         i_nodes_diagnosed += len(site_record.keys())
1121                         i_sites_diagnosed += 1
1122
1123                         (na,ne) = self.__actOnSite(loginbase, site_record)
1124
1125                         i_sites_observed += 1
1126                         i_nodes_actedon += na
1127                         i_sites_emailed += ne
1128
1129                         l_allsites += [loginbase]
1130
1131                 return {'sites_observed': i_sites_observed, 
1132                                 'sites_diagnosed': i_sites_diagnosed, 
1133                                 'nodes_diagnosed': i_nodes_diagnosed, 
1134                                 'sites_emailed': i_sites_emailed, 
1135                                 'nodes_actedon': i_nodes_actedon, 
1136                                 'allsites':l_allsites}
1137
1138         def print_stats(self, key, stats):
1139                 print "%20s : %d" % (key, stats[key])
1140
1141
1142
1143         #"""
1144         #Prints, logs, and emails status of up nodes, down nodes, and buckets.
1145         #"""
1146         #def status(self):
1147         #       sub = "Monitor Summary"
1148         #       msg = "\nThe following nodes were acted upon:  \n\n"
1149         #       for (node, (type, date)) in self.emailed.items():
1150         #               # Print only things acted on today.
1151         #               if (time.gmtime(time.time())[2] == time.gmtime(date)[2]):
1152         #                       msg +="%s\t(%s)\t%s\n" %(node, type, time.ctime(date))
1153         #       msg +="\n\nThe following sites have been 'squeezed':\n\n"
1154         #       for (loginbase, (date, type)) in self.squeezed.items():
1155         #               # Print only things acted on today.
1156         #               if (time.gmtime(time.time())[2] == time.gmtime(date)[2]):
1157         #                       msg +="%s\t(%s)\t%s\n" %(loginbase, type, time.ctime(date))
1158         #       mailer.email(sub, msg, [SUMTO])
1159         #       logger.info(msg)
1160         #       return 
1161
1162         #"""
1163         #Store/Load state of emails.  When, where, what.
1164         #"""
1165         #def emailedStore(self, action):
1166         #       try:
1167         #               if action == "LOAD":
1168         #                       f = open(DAT, "r+")
1169         #                       logger.info("POLICY:  Found and reading " + DAT)
1170         #                       self.emailed.update(pickle.load(f))
1171         #               if action == "WRITE":
1172         #                       f = open(DAT, "w")
1173         #                       #logger.debug("Writing " + DAT)
1174         #                       pickle.dump(self.emailed, f)
1175         #               f.close()
1176         #       except Exception, err:
1177         #               logger.info("POLICY:  Problem with DAT, %s" %err)
1178
1179
1180 #class Policy(Thread):
1181
1182 def main():
1183         print "policy.py is a module, not a script for running directly."
1184
1185 if __name__ == '__main__':
1186         import os
1187         import plc
1188         try:
1189                 main()
1190         except KeyboardInterrupt:
1191                 print "Killed.  Exitting."
1192                 logger.info('Monitor Killed')
1193                 os._exit(0)