+ split the policy file into three classes: Merge(), Diagnose(), and Action().
[monitor.git] / policy.py
1 #
2 # Copyright (c) 2004  The Trustees of Princeton University (Trustees).
3 #
4 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
5 #
6 # $Id: policy.py,v 1.15 2007/07/03 19:58:34 soltesz Exp $
7 #
8 # Policy Engine.
9
10 #from monitor import *
11 from threading import *
12 import time
13 import logging
14 import mailer
15 import emailTxt
16 import pickle
17 import Queue
18 import plc
19 import sys
20 import reboot
21 import soltesz
22 import string
23 from printbadbysite import cmpCategoryVal
24 from config import config
25 print "policy"
26 config = config()
27
28 DAT="./monitor.dat"
29
30 logger = logging.getLogger("monitor")
31
32 # Time to enforce policy
33 POLSLEEP = 7200
34
35 # Where to email the summary
36 SUMTO = "soltesz@cs.princeton.edu"
37 TECHEMAIL="tech-%s@sites.planet-lab.org"
38 PIEMAIL="pi-%s@sites.planet-lab.org"
39 SLICEMAIL="%s@slices.planet-lab.org"
40 PLCEMAIL="support@planet-lab.org"
41
42 #Thresholds (DAYS)
43 SPERDAY = 86400
44 PITHRESH = 7 * SPERDAY
45 SLICETHRESH = 7 * SPERDAY
46 # Days before attempting rins again
47 RINSTHRESH = 5 * SPERDAY
48
49 # Days before calling the node dead.
50 DEADTHRESH = 30 * SPERDAY
51 # Minimum number of nodes up before squeezing
52 MINUP = 2
53
54 TECH=1
55 PI=2
56 USER=4
57 ADMIN=8
58
59 # IF:
60 #  no SSH, down.
61 #  bad disk, down
62 #  DNS, kinda down (sick)
63 #  clock, kinda down (sick)
64 #  Full disk, going to be down
65
66 # Actions:
67 #  Email
68 #  suspend slice creation
69 #  kill slices
70 def array_to_priority_map(array):
71         """ Create a mapping where each entry of array is given a priority equal
72         to its position in the array.  This is useful for subsequent use in the
73         cmpMap() function."""
74         map = {}
75         count = 0
76         for i in array:
77                 map[i] = count
78                 count += 1
79         return map
80
81 def getdebug():
82         return config.debug
83
84 class Merge(Thread):
85         def __init__(self, l_merge, toRT):
86                 self.toRT = toRT
87                 self.merge_list = l_merge
88                 # the hostname to loginbase mapping
89                 self.plcdb_hn2lb = soltesz.dbLoad("plcdb_hn2lb")
90
91                 # Previous actions taken on nodes.
92                 self.act_all = soltesz.if_cached_else(1, "act_all", lambda : {})
93                 self.findbad = soltesz.if_cached_else(1, "findbad", lambda : {})
94
95                 self.cache_all = soltesz.if_cached_else(1, "act_all", lambda : {})
96                 self.sickdb = {}
97                 self.mergedb = {}
98                 Thread.__init__(self)
99
100         def run(self):
101                 # populate sickdb
102                 self.accumSickSites()
103                 # read data from findbad and act_all
104                 self.mergeActionsAndBadDB()
105                 # pass node_records to RT
106                 self.sendToRT()
107
108         def accumSickSites(self):
109                 """
110                 Take all nodes, from l_diagnose, look them up in the act_all database, 
111                 and insert them into sickdb[] as:
112
113                         sickdb[loginbase][nodename] = fb_record
114                 """
115                 # look at all problems reported by findbad
116                 l_nodes = self.findbad['nodes'].keys()
117                 count = 0
118                 for nodename in l_nodes:
119                         if nodename not in self.merge_list:
120                                 continue                # skip this node, since it's not wanted
121
122                         count += 1
123                         loginbase = self.plcdb_hn2lb[nodename]
124                         values = self.findbad['nodes'][nodename]['values']
125
126                         fb_record = {}
127                         fb_record['nodename'] = nodename
128                         fb_record['category'] = values['category']
129                         fb_record['state'] = values['state']
130                         fb_record['comonstats'] = values['comonstats']
131                         fb_record['kernel'] = self.getKernel(values['kernel'])
132                         fb_record['stage'] = "findbad"
133                         fb_record['message'] = None
134                         fb_record['bootcd'] = values['bootcd']
135                         fb_record['args'] = None
136                         fb_record['info'] = None
137                         fb_record['time'] = time.time()
138                         fb_record['date_created'] = time.time()
139
140                         if loginbase not in self.sickdb:
141                                 self.sickdb[loginbase] = {}
142
143                         self.sickdb[loginbase][nodename] = fb_record
144
145                 print "Found %d nodes" % count
146
147         def getKernel(self, unamestr):
148                 s = unamestr.split()
149                 if len(s) > 2:
150                         return s[2]
151                 else:
152                         return ""
153
154         def mergeActionsAndBadDB(self): 
155                 """
156                 - Look at the sick node_records as reported in findbad, 
157                 - Then look at the node_records in act_all.  
158
159                 There are four cases:
160                 1) Problem in findbad, no problem in act_all
161                         this ok, b/c it just means it's a new problem
162                 2) Problem in findbad, problem in act_all
163                         -Did the problem get better or worse?  
164                                 -If Same, or Worse, then continue looking for open tickets.
165                                 -If Better, or No problem, then "back-off" penalties.
166                                         This judgement may need to wait until 'Diagnose()'
167
168                 3) No problem in findbad, problem in act_all
169                         The the node is operational again according to Findbad()
170
171                 4) No problem in findbad, no problem in act_all
172                         There won't be a record in either db, so there's no code.
173                 """
174
175                 sorted_sites = self.sickdb.keys()
176                 sorted_sites.sort()
177                 # look at all problems reported by findbad
178                 for loginbase in sorted_sites:
179                         d_fb_nodes = self.sickdb[loginbase]
180                         sorted_nodes = d_fb_nodes.keys()
181                         sorted_nodes.sort()
182                         for nodename in sorted_nodes:
183                                 fb_record = self.sickdb[loginbase][nodename]
184                                 x = fb_record
185                                 if loginbase not in self.mergedb:
186                                         self.mergedb[loginbase] = {}
187
188                                 # We must compare findbad state with act_all state
189                                 if nodename not in self.act_all:
190                                         # 1) ok, b/c it's a new problem. set ticket_id to null
191                                         self.mergedb[loginbase][nodename] = {} 
192                                         self.mergedb[loginbase][nodename].update(x)
193                                         self.mergedb[loginbase][nodename]['ticket_id'] = ""
194                                         self.mergedb[loginbase][nodename]['prev_category'] = None
195                                 else: 
196                                         y = self.act_all[nodename][0]
197
198                                         # skip if end-stage
199                                         if 'stage' in y and "monitor-end-record" in y['stage']:
200                                                 continue
201
202                                         # for legacy actions
203                                         if 'bucket' in y and y['bucket'][0] == 'dbg':
204                                                 # Only bootcd debugs made it to the act_all db.
205                                                 y['prev_category'] = "OLDBOOTCD"
206                                         elif 'bucket' in y and y['bucket'][0] == 'down':
207                                                 y['prev_category'] = "ERROR"
208                                         elif 'bucket' not in y:
209                                                 # for all other actions, just carry over the
210                                                 # previous category
211                                                 y['prev_category'] = y['category']
212                                         else:
213                                                 print "UNKNOWN state for record: %s" % y
214                                                 sys.exit(1)
215                                         # determine through translation, if the buckets match
216                                         #if 'category' in y and x['category'] == y['category']:
217                                         #       b_match = True
218                                         #elif x['category'] == "OLDBOOTCD" and y['bucket'][0] == 'dbg':
219                                         #       b_match = True
220                                         #elif x['category'] == "ERROR" and y['bucket'][0] == 'down':
221                                         #       b_match = True
222                                         #else:
223                                         #       b_match = False
224
225                                         #if b_match: 
226                                         #       # 2b) ok, b/c they agree that there's still a problem..
227                                         #       # 2b) Comon & Monitor still agree; RT ticket?
228                                         #       y['prev_category'] = y['category']
229                                         #else:
230                                         #       # 2a) mismatch, need a policy for how to resolve
231                                         #       #     resolution will be handled in __diagnoseNode()
232                                         #       #         for now just record the two categories.
233                                         #       #if x['category'] == "PROD" and x['state'] == "BOOT" and \
234                                         #       # ( y['bucket'][0] == 'down' or  y['bucket'][0] == 'dbg'):
235                                         #       print "FINDBAD and MONITOR have a mismatch: %s vs %s" % \
236                                         #                               (x['category'], y['bucket'])
237
238
239                                         self.mergedb[loginbase][nodename] = {}
240                                         self.mergedb[loginbase][nodename].update(y)
241                                         self.mergedb[loginbase][nodename]['comonstats'] = x['comonstats']
242                                         self.mergedb[loginbase][nodename]['category']   = x['category']
243                                         self.mergedb[loginbase][nodename]['state'] = x['state']
244                                         self.mergedb[loginbase][nodename]['kernel']=x['kernel']
245                                         self.mergedb[loginbase][nodename]['bootcd']=x['bootcd']
246                                         # delete the entry from cache_all to keep it out of case 3)
247                                         del self.cache_all[nodename]
248
249                 # 3) nodes that remin in cache_all were not identified by findbad.
250                 #        Do we keep them or not?
251                 #   NOTE: i think that since the categories are performed before this
252                 #               step now, and by a monitor-controlled agent.
253
254                 # TODO: This does not work correctly.  Do we need this? 
255                 #for hn in self.cache_all.keys():
256                 #       y = self.act_all[hn][0]
257                 #       if 'monitor' in y['bucket']:
258                 #               loginbase = self.plcdb_hn2lb[hn] 
259                 #               if loginbase not in self.sickdb:
260                 #                       self.sickdb[loginbase] = {}
261                 #               self.sickdb[loginbase][hn] = y
262                 #       else:
263                 #               del self.cache_all[hn]
264
265                 print "len of cache_all: %d" % len(self.cache_all.keys())
266                 return
267
268         def sendToRT(self):
269                 sorted_sites = self.mergedb.keys()
270                 sorted_sites.sort()
271                 # look at all problems reported by merge
272                 for loginbase in sorted_sites:
273                         d_merge_nodes = self.mergedb[loginbase]
274                         for nodename in d_merge_nodes.keys():
275                                 record = self.mergedb[loginbase][nodename]
276                                 self.toRT.put(record)
277
278                 # send signal to stop reading
279                 self.toRT.put(None)
280                 return
281
282 class Diagnose(Thread):
283         def __init__(self, fromRT):
284                 self.fromRT = fromRT
285                 self.plcdb_hn2lb = soltesz.dbLoad("plcdb_hn2lb")
286
287                 self.diagnose_in = {}
288                 self.diagnose_out = {}
289                 Thread.__init__(self)
290
291         def print_stats(self, key, stats):
292                 print "%20s : %d" % (key, stats[key])
293
294         def run(self):
295                 self.accumSickSites()
296
297                 print "Accumulated %d sick sites" % len(self.diagnose_in.keys())
298                 logger.debug("Accumulated %d sick sites" % len(self.diagnose_in.keys()))
299
300                 try:
301                         stats = self.diagnoseAll()
302                 except Exception, err:
303                         print "----------------"
304                         import traceback
305                         print traceback.print_exc()
306                         print err
307                         #if config.policysavedb:
308                         sys.exit(1)
309
310                 self.print_stats("sites", stats)
311                 self.print_stats("sites_diagnosed", stats)
312                 self.print_stats("nodes_diagnosed", stats)
313
314                 if config.policysavedb:
315                         print "Saving Databases... diagnose_out"
316                         soltesz.dbDump("diagnose_out", self.diagnose_out)
317
318         def accumSickSites(self):
319                 """
320                 Take all nodes, from l_diagnose, look them up in the diagnose_out database, 
321                 and insert them into diagnose_in[] as:
322
323                         diagnose_in[loginbase] = [diag_node1, diag_node2, ...]
324                 """
325                 while 1:
326                         node_record = self.fromRT.get(block = True)
327                         if node_record == None:
328                                 break;
329
330                         nodename = node_record['nodename']
331                         loginbase = self.plcdb_hn2lb[nodename]
332
333                         if loginbase not in self.diagnose_in:
334                                 self.diagnose_in[loginbase] = {}
335
336                         self.diagnose_in[loginbase][nodename] = node_record
337
338                 return
339
340         def diagnoseAll(self):
341                 i_sites = 0
342                 i_sites_diagnosed = 0
343                 i_nodes_diagnosed = 0
344                 i_nodes_actedon = 0
345                 i_sites_emailed = 0
346                 l_allsites = []
347
348                 sorted_sites = self.diagnose_in.keys()
349                 sorted_sites.sort()
350                 l_diagnosed_all = []
351                 for loginbase in sorted_sites:
352                         l_allsites += [loginbase]
353
354                         d_diag_nodes = self.diagnose_in[loginbase]
355                         l_diag_records = self.__diagnoseSite(loginbase, d_diag_nodes)
356                         l_diagnosed_all += l_diag_records
357                         
358                         if len(l_diag_records) > 0:
359                                 i_nodes_diagnosed += len(l_diag_records)
360                                 i_sites_diagnosed += 1
361                         i_sites += 1
362
363                 self.diagnose_out= {}
364                 for diag_record in l_diagnosed_all:
365                         nodename = diag_record['nodename']
366                         loginbase = self.plcdb_hn2lb[nodename]
367
368                         if loginbase not in self.diagnose_out:
369                                 self.diagnose_out[loginbase] = {}
370
371                         self.diagnose_out[loginbase][nodename] = diag_record
372
373                 return {'sites': i_sites, 
374                                 'sites_diagnosed': i_sites_diagnosed, 
375                                 'nodes_diagnosed': i_nodes_diagnosed, 
376                                 'allsites':l_allsites}
377
378                 pass
379                 
380         def __getDaysDown(self, diag_record, nodename):
381                 daysdown = -1
382                 if diag_record['comonstats']['sshstatus'] != "null":
383                         daysdown = int(diag_record['comonstats']['sshstatus']) // (60*60*24)
384                 elif diag_record['comonstats']['lastcotop'] != "null":
385                         daysdown = int(diag_record['comonstats']['lastcotop']) // (60*60*24)
386                 else:
387                         daysdown = -1
388                 return daysdown
389
390         def __getStrDaysDown(self, diag_record, nodename):
391                 daysdown = self.__getDaysDown(diag_record, nodename)
392                 if daysdown > 0:
393                         return "(%d days down)"%daysdown
394                 else:
395                         return "Unknown number of days"
396
397         def __getCDVersion(self, diag_record, nodename):
398                 cdversion = ""
399                 #print "Getting kernel for: %s" % diag_record['nodename']
400                 cdversion = diag_record['kernel']
401                 return cdversion
402
403         def __diagnoseSite(self, loginbase, d_diag_nodes):
404                 """
405                 rec_sitelist is a diagnose_in entry: 
406                 """
407                 diag_list = []
408                 sorted_nodes = d_diag_nodes.keys()
409                 sorted_nodes.sort()
410                 for nodename in sorted_nodes:
411                         node_record = d_diag_nodes[nodename]
412                         diag_record = self.__diagnoseNode(loginbase, node_record)
413
414                         if diag_record != None:
415                                 diag_list += [ diag_record ]
416
417                 return diag_list
418
419         def diagRecordByCategory(self, node_record):
420                 nodename = node_record['nodename']
421                 category = node_record['category']
422                 state    = node_record['state']
423                 loginbase = self.plcdb_hn2lb[nodename]
424                 diag_record = None
425
426                 if  "ERROR" in category:        # i.e. "DOWN"
427                         diag_record = {}
428                         diag_record.update(node_record)
429                         daysdown = self.__getDaysDown(diag_record, nodename) 
430                         if daysdown >= 0 and daysdown < 7:
431                                 format = "DIAG: %20s : %-40s Down only %s days  NOTHING DONE"
432                                 print format % (loginbase, nodename, daysdown)
433                                 return None
434
435                         s_daysdown = self.__getStrDaysDown(diag_record, nodename)
436                         diag_record['message'] = emailTxt.mailtxt.newdown
437                         diag_record['args'] = {'nodename': nodename}
438                         diag_record['info'] = (nodename, s_daysdown, "")
439                         diag_record['log'] = "DOWN: %20s : %-40s == %20s %s" % \
440                                         (loginbase, nodename, diag_record['info'], diag_record['ticket_id']),
441
442                 elif "OLDBOOTCD" in category:
443                         # V2 boot cds as determined by findbad
444                         s_daysdown = self.__getStrDaysDown(node_record, nodename)
445                         s_cdversion = self.__getCDVersion(node_record, nodename)
446                         diag_record = {}
447                         diag_record.update(node_record)
448                         #if "2.4" in diag_record['kernel'] or "v2" in diag_record['bootcd']:
449                         diag_record['message'] = emailTxt.mailtxt.newbootcd
450                         diag_record['args'] = {'nodename': nodename}
451                         diag_record['info'] = (nodename, s_daysdown, s_cdversion)
452                         diag_record['log'] = "BTCD: %20s : %-40s == %20s %20s %s" % \
453                                                                         (loginbase, nodename, diag_record['kernel'], 
454                                                                          diag_record['bootcd'], diag_record['ticket_id']),
455
456                 elif "PROD" in category:
457                         if "DEBUG" in state:
458                                 # Not sure what to do with these yet.  Probably need to
459                                 # reboot, and email.
460                                 print "DEBG: %20s : %-40s  NOTHING DONE" % (loginbase, nodename)
461                                 return None
462                         elif "BOOT" in state:
463                                 # no action needed.
464                                 # TODO: remove penalties, if any are applied.
465                                 if 'improvement' in node_record['stage']:
466                                         # then we need to pass this on to 'action'
467                                         diag_record = {}
468                                         diag_record.update(node_record)
469                                         diag_record['message'] = emailTxt.mailtxt.newthankyou
470                                         diag_record['args'] = {'nodename': nodename}
471                                         diag_record['info'] = (nodename, node_record['prev_category'], 
472                                                                                                          node_record['category'])
473                                         diag_record['log'] = "IMPR: %20s : %-40s == %20s %20s %s" % \
474                                                                         (loginbase, nodename, diag_record['stage'], 
475                                                                          state, category),
476                                         return diag_record
477                                 else:
478                                         return None
479                         else:
480                                 # unknown
481                                 pass
482                 elif "ALPHA"    in category:
483                         pass
484                 elif "clock_drift" in category:
485                         pass
486                 elif "dns"    in category:
487                         pass
488                 elif "filerw"    in category:
489                         pass
490                 else:
491                         print "Unknown category!!!! %s" % category
492                         sys.exit(1)
493
494                 return diag_record
495
496         def __diagnoseNode(self, loginbase, node_record):
497                 # TODO: change the format of the hostname in this 
498                 #               record to something more natural.
499                 nodename          = node_record['nodename']
500                 category          = node_record['category']
501                 prev_category = node_record['prev_category']
502                 state             = node_record['state']
503
504                 val = cmpCategoryVal(category, prev_category)
505                 if val == -1:
506                         # current category is worse than previous, carry on
507                         pass
508                 elif val == 1:
509                         # current category is better than previous
510                         # TODO: too generous for now, but will be handled correctly
511                         node_record['stage'] = 'improvement'
512                 else:
513                         #values are equal, carry on.
514                         pass
515                         
516                 #### COMPARE category and prev_category
517                 # if not_equal
518                 #       then assign a stage based on relative priorities
519                 # else equal
520                 #       then check category for stats.
521                 diag_record = self.diagRecordByCategory(node_record)
522                 if diag_record == None:
523                         return None
524
525                 #### found_RT_ticket
526                 # TODO: need to record time found, and maybe add a stage for acting on it...
527                 if 'found_rt_ticket' in diag_record and \
528                         diag_record['found_rt_ticket'] is not None:
529                         if diag_record['stage'] is not 'improvement':
530                                 diag_record['stage'] = 'ticket_waitforever'
531                                 
532                 current_time = time.time()
533                 delta = current_time - diag_record['time']
534
535                 message = diag_record['message']
536                 act_record = {}
537                 act_record.update(diag_record)
538
539                 #### DIAGNOSE STAGES 
540                 #print "%s has stage %s" % (nodename, diag_record['stage'])
541                 if   'findbad' in diag_record['stage']:
542                         # The node is bad, and there's no previous record of it.
543                         act_record['email'] = TECH              # addative emails
544                         act_record['action'] = 'noop'
545                         act_record['message'] = message[0]
546                         act_record['stage'] = 'stage_actinoneweek'
547
548                 elif 'improvement' in diag_record['stage']:
549                         # - backoff previous squeeze actions (slice suspend, nocreate)
550                         # TODO: add a backoff_squeeze section... Needs to runthrough
551                         act_record['action'] = 'close_rt'
552                         act_record['message'] = message[0]
553                         act_record['stage'] = 'monitor-end-record'
554
555                 elif 'actinoneweek' in diag_record['stage']:
556                         act_record['email'] = TECH | PI         # addative emails
557                         if delta >= 7 * SPERDAY: 
558                                 act_record['stage'] = 'stage_actintwoweeks'
559                                 act_record['message'] = message[1]
560                                 act_record['action'] = 'nocreate' 
561                         elif delta >= 3* SPERDAY and not 'second-mail-at-oneweek' in act_record:
562                                 act_record['message'] = message[1]
563                                 act_record['action'] = 'sendmailagain-waitforoneweekaction' 
564                                 act_record['second-mail-at-oneweek'] = True
565                         else:
566                                 act_record['message'] = None
567                                 act_record['action'] = 'waitforoneweekaction' 
568
569                 elif 'actintwoweeks' in diag_record['stage']:
570                         act_record['email'] = TECH | PI | USER          # addative emails
571                         if delta >= 14 * SPERDAY:
572                                 act_record['stage'] = 'stage_waitforever'
573                                 act_record['message'] = message[2]
574                                 act_record['action'] = 'suspendslices'
575                                 act_record['time'] = current_time               # reset clock for waitforever
576                         elif delta >= 10* SPERDAY and not 'second-mail-at-twoweeks' in act_record:
577                                 act_record['message'] = message[2]
578                                 act_record['action'] = 'sendmailagain-waitfortwoweeksaction' 
579                                 act_record['second-mail-at-twoweeks'] = True
580                         else:
581                                 act_record['message'] = None
582                                 act_record['action'] = 'waitfortwoweeksaction'
583
584                 elif 'ticket_waitforever' in diag_record['stage']:
585                         act_record['email'] = TECH
586                         if 'first-found' not in act_record:
587                                 act_record['first-found'] = True
588                                 act_record['action'] = 'ticket_waitforever'
589                                 act_record['message'] = None
590                                 act_record['time'] = current_time
591                         else:
592                                 if delta >= 7*SPERDAY:
593                                         act_record['action'] = 'email-againticket_waitforever'
594                                         act_record['message'] = message[0]
595                                         act_record['time'] = current_time               # reset clock
596                                 else:
597                                         act_record['action'] = 'ticket_waitforever'
598                                         act_record['message'] = None
599
600                 elif 'waitforever' in diag_record['stage']:
601                         # more than 3 days since last action
602                         # TODO: send only on weekdays.
603                         # NOTE: expects that 'time' has been reset before entering waitforever stage
604                         if delta >= 3*SPERDAY:
605                                 act_record['action'] = 'email-againwaitforever'
606                                 act_record['message'] = message[0]
607                                 act_record['time'] = current_time               # reset clock
608                         else:
609                                 act_record['action'] = 'waitforever'
610                                 act_record['message'] = None
611
612                 else:
613                         # There is no action to be taken, possibly b/c the stage has
614                         # already been performed, but diagnose picked it up again.
615                         # two cases, 
616                         #       1. stage is unknown, or 
617                         #       2. delta is not big enough to bump it to the next stage.
618                         # TODO: figure out which. for now assume 2.
619                         print "UNKNOWN!!? %s" % nodename
620                         act_record['action'] = 'unknown'
621                         act_record['message'] = message[0]
622                         print "Exiting..."
623                         sys.exit(1)
624
625                 print "%s" % act_record['log'],
626                 print "%15s" % act_record['action']
627                 return act_record
628
629
630 class SiteAction:
631         def __init__(self, parameter_names=['hostname', 'ticket_id']):
632                 self.parameter_names = parameter_names
633         def checkParam(self, args):
634                 for param in self.parameter_names:
635                         if param not in args:
636                                 raise Exception("Parameter %s not provided in args"%param)
637         def run(self, args):
638                 self.checkParam(args)
639                 return self._run(args)
640         def _run(self, args):
641                 pass
642
643 class SuspendAction(SiteAction):
644         def _run(self, args):
645                 return plc.suspendSlices(args['hostname'])
646
647 class RemoveSliceCreation(SiteAction):
648         def _run(self, args):
649                 return plc.removeSliceCreation(args['hostname'])
650
651 class BackoffActions(SiteAction):
652         def _run(self, args):
653                 plc.enableSlices(args['hostname'])
654                 plc.enableSliceCreation(args['hostname'])
655                 return True
656
657 # TODO: create class for each action below, 
658 #               allow for lists of actions to be performed...
659
660 def close_rt_backoff(args):
661         mailer.closeTicketViaRT(args['ticket_id'], "Ticket CLOSED automatically by SiteAssist.")
662         plc.enableSlices(args['hostname'])
663         plc.enableSliceCreation(args['hostname'])
664         return
665
666 class Action(Thread):
667         def __init__(self, l_action):
668                 self.l_action = l_action
669
670                 # the hostname to loginbase mapping
671                 self.plcdb_hn2lb = soltesz.dbLoad("plcdb_hn2lb")
672
673                 # Actions to take.
674                 self.diagnose_db = soltesz.if_cached_else(1, "diagnose_out", lambda : {})
675                 # Actions taken.
676                 self.act_all   = soltesz.if_cached_else(1, "act_all", lambda : {})
677
678                 # A dict of actions to specific functions. PICKLE doesnt' like lambdas.
679                 self.actions = {}
680                 self.actions['suspendslices'] = lambda args: plc.suspendSlices(args['hostname'])
681                 self.actions['nocreate'] = lambda args: plc.removeSliceCreation(args['hostname'])
682                 self.actions['close_rt'] = lambda args: close_rt_backoff(args)
683                 self.actions['rins'] = lambda args: plc.nodeBootState(args['hostname'], "rins") 
684                 self.actions['noop'] = lambda args: args
685                 self.actions['ticket_waitforever'] = lambda args: args
686                 self.actions['waitforever'] = lambda args: args
687                 self.actions['unknown'] = lambda args: args
688                 self.actions['waitforoneweekaction'] = lambda args: args
689                 self.actions['waitfortwoweeksaction'] = lambda args: args
690                 self.actions['sendmailagain-waitforoneweekaction'] = lambda args: args
691                 self.actions['sendmailagain-waitfortwoweeksaction'] = lambda args: args
692                 self.actions['email-againwaitforever'] = lambda args: args
693                 self.actions['email-againticket_waitforever'] = lambda args: args
694                                 
695
696                 self.sickdb = {}
697                 Thread.__init__(self)
698
699         def run(self):
700                 self.accumSites()
701                 print "Accumulated %d sick sites" % len(self.sickdb.keys())
702                 logger.debug("Accumulated %d sick sites" % len(self.sickdb.keys()))
703
704                 try:
705                         stats = self.analyseSites()
706                 except Exception, err:
707                         print "----------------"
708                         import traceback
709                         print traceback.print_exc()
710                         print err
711                         if config.policysavedb:
712                                 print "Saving Databases... act_all"
713                                 soltesz.dbDump("act_all", self.act_all)
714                         sys.exit(1)
715
716                 self.print_stats("sites", stats)
717                 self.print_stats("sites_diagnosed", stats)
718                 self.print_stats("nodes_diagnosed", stats)
719                 self.print_stats("sites_emailed", stats)
720                 self.print_stats("nodes_actedon", stats)
721                 print string.join(stats['allsites'], ",")
722
723                 if config.policysavedb:
724                         print "Saving Databases... act_all"
725                         #soltesz.dbDump("policy.eventlog", self.eventlog)
726                         # TODO: remove 'diagnose_out', 
727                         #       or at least the entries that were acted on.
728                         soltesz.dbDump("act_all", self.act_all)
729
730         def accumSites(self):
731                 """
732                 Take all nodes, from l_action, look them up in the diagnose_db database, 
733                 and insert them into sickdb[] as:
734
735                 This way only the given l_action nodes will be acted on regardless
736                 of how many from diagnose_db are available.
737
738                         sickdb[loginbase][nodename] = diag_record
739                 """
740                 # TODO: what if l_action == None ?
741                 for nodename in self.l_action:
742
743                         loginbase = self.plcdb_hn2lb[nodename]
744
745                         if loginbase in self.diagnose_db and \
746                                 nodename in self.diagnose_db[loginbase]:
747
748                                 diag_record = self.diagnose_db[loginbase][nodename]
749
750                                 if loginbase not in self.sickdb:
751                                         self.sickdb[loginbase] = {}
752
753                                 self.sickdb[loginbase][nodename] = diag_record
754                 return
755
756         def __emailSite(self, loginbase, roles, message, args):
757                 """
758                 loginbase is the unique site abbreviation, prepended to slice names.
759                 roles contains TECH, PI, USER roles, and derive email aliases.
760                 record contains {'message': [<subj>,<body>], 'args': {...}} 
761                 """
762                 ticket_id = 0
763                 args.update({'loginbase':loginbase})
764
765                 if not config.mail and not config.debug and config.bcc:
766                         roles = ADMIN
767                 if config.mail and config.debug:
768                         roles = ADMIN
769
770                 # build targets
771                 contacts = []
772                 if ADMIN & roles:
773                         contacts += [config.email]
774                 if TECH & roles:
775                         contacts += [TECHEMAIL % loginbase]
776                 if PI & roles:
777                         contacts += [PIEMAIL % loginbase]
778                 if USER & roles:
779                         slices = plc.slices(loginbase)
780                         if len(slices) >= 1:
781                                 for slice in slices:
782                                         contacts += [SLICEMAIL % slice]
783                                 print "SLIC: %20s : %d slices" % (loginbase, len(slices))
784                         else:
785                                 print "SLIC: %20s : 0 slices" % loginbase
786
787                 try:
788                         subject = message[0] % args
789                         body = message[1] % args
790                         if ADMIN & roles:
791                                 # send only to admin
792                                 if 'ticket_id' in args:
793                                         subj = "Re: [PL #%s] %s" % (args['ticket_id'], subject)
794                                 else:
795                                         subj = "Re: [PL noticket] %s" % subject
796                                 mailer.email(subj, body, contacts)
797                                 ticket_id = args['ticket_id']
798                         else:
799                                 #if 'ticket_id' in args and 'ticket_id' != "":
800                                 #       # Reformat Subject to include Ticket_ID for RT
801                                 #       subj = "Re: [PL #%s] %s" % (args['ticket_id'], subject)
802                                 #       # RT remembers old contacts, so only add new users
803                                 #       mailer.email(subj, body, ['monitor@planet-lab.org'] + contacts)
804                                 #       ticket_id = args['ticket_id']
805                                 #else:
806                                 #       ticket_id = mailer.emailViaRT(subject, body, contacts)  
807                                 ticket_id = mailer.emailViaRT(subject, body, contacts, args['ticket_id'])
808                 except Exception, err:
809                         print "exception on message:"
810                         import traceback
811                         print traceback.print_exc()
812                         print message
813
814                 return ticket_id
815
816
817         def _format_diaginfo(self, diag_node):
818                 info = diag_node['info']
819                 if diag_node['stage'] == 'monitor-end-record':
820                         hlist = "    %s went from '%s' to '%s'\n" % (info[0], info[1], info[2]) 
821                 else:
822                         hlist = "    %s %s - %s\n" % (info[0], info[2], info[1]) #(node,ver,daysdn)
823                 return hlist
824
825         def __actOnSite(self, loginbase, site_record):
826                 i_nodes_actedon = 0
827                 i_nodes_emailed = 0
828                 b_squeeze = config.squeeze
829
830                 act_recordlist = []
831
832                 for nodename in site_record.keys():
833                         diag_record = site_record[nodename]
834                         act_record  = self.__actOnNode(diag_record)
835                         act_recordlist += [act_record]
836
837                 count_up = self.currentUpAtSite(loginbase)
838                 if count_up < MINUP:
839                         print "SITE: %20s : %d nodes up" % (loginbase, count_up)
840                 else:
841                         print "SITE: %20s : %d nodes up" % (loginbase, count_up)
842                         # There may be a second penalty regardless of which stage it's in.
843                         # TODO: check how long this has occurred.
844
845                 email_args = {}
846                 email_args['hostname_list'] = ""
847                 for act_record in act_recordlist:
848                         email_args['hostname_list'] += act_record['msg_format']
849                         email_args['hostname'] = act_record['nodename']
850                         if 'ticket_id' in act_record:
851                                 email_args['ticket_id'] = act_record['ticket_id']
852
853                 # Send email, perform node action
854                 # TODO: only send one email per site for a given problem...
855                 if len(act_recordlist) > 0:
856                         act_record = act_recordlist[0]
857
858                         # send message before squeezing, b/c 
859                         if act_record['message'] != None:
860                                 ticket_id = self.__emailSite(loginbase, act_record['email'], 
861                                                          act_record['message'], email_args)
862
863                                 # Add ticket_id to ALL nodenames
864                                 for act_record in act_recordlist:
865                                         nodename = act_record['nodename']
866                                         # update node record with RT ticket_id
867                                         self.act_all[nodename][0]['ticket_id'] = "%s" % ticket_id
868                                         if config.mail: i_nodes_emailed += 1
869
870                         # TODO: perform the most severe action?
871                         if b_squeeze:
872                                 act_key = act_record['action']
873                                 self.actions[act_key](email_args)
874                                 i_nodes_actedon += 1
875                 
876                 if config.policysavedb:
877                         print "Saving Databases... act_all, diagnose_out"
878                         soltesz.dbDump("act_all", self.act_all)
879                         # remove site record from diagnose_out, it's in act_all as done.
880                         del self.diagnose_db[loginbase]
881                         soltesz.dbDump("diagnose_out", self.diagnose_db)
882
883                 print "Hit enter to continue..."
884                 sys.stdout.flush()
885                 line = sys.stdin.readline()
886
887                 return (i_nodes_actedon, i_nodes_emailed)
888
889         def __actOnNode(self, diag_record):
890                 nodename = diag_record['nodename']
891                 message = diag_record['message']
892                 info    = diag_record['info']
893
894                 act_record = {}
895                 act_record.update(diag_record)
896                 act_record['nodename'] = nodename
897                 act_record['msg_format'] = self._format_diaginfo(diag_record)
898
899                 print "%s" % act_record['log'],
900                 print "%15s" % act_record['action']
901
902                 if act_record['stage'] is not 'monitor-end-record':
903                         if nodename not in self.act_all: 
904                                 self.act_all[nodename] = []
905
906                         self.act_all[nodename].insert(0,act_record)
907                 else:
908                         print "Not recording %s in act_all" % nodename
909
910                 return act_record
911
912         def analyseSites(self):
913                 i_sites = 0
914                 i_sites_diagnosed = 0
915                 i_nodes_diagnosed = 0
916                 i_nodes_actedon = 0
917                 i_sites_emailed = 0
918                 l_allsites = []
919
920                 sorted_sites = self.sickdb.keys()
921                 sorted_sites.sort()
922                 for loginbase in sorted_sites:
923                         site_record = self.sickdb[loginbase]
924                         
925                         i_nodes_diagnosed += len(site_record.keys())
926                         i_sites_diagnosed += 1
927
928                         (na,ne) = self.__actOnSite(loginbase, site_record)
929
930                         i_sites += 1
931                         i_nodes_actedon += na
932                         i_sites_emailed += ne
933
934                         l_allsites += [loginbase]
935
936                 return {'sites': i_sites, 
937                                 'sites_diagnosed': i_sites_diagnosed, 
938                                 'nodes_diagnosed': i_nodes_diagnosed, 
939                                 'sites_emailed': i_sites_emailed, 
940                                 'nodes_actedon': i_nodes_actedon, 
941                                 'allsites':l_allsites}
942
943         def print_stats(self, key, stats):
944                 print "%20s : %d" % (key, stats[key])
945
946
947
948         #"""
949         #Prints, logs, and emails status of up nodes, down nodes, and buckets.
950         #"""
951         #def status(self):
952         #       sub = "Monitor Summary"
953         #       msg = "\nThe following nodes were acted upon:  \n\n"
954         #       for (node, (type, date)) in self.emailed.items():
955         #               # Print only things acted on today.
956         #               if (time.gmtime(time.time())[2] == time.gmtime(date)[2]):
957         #                       msg +="%s\t(%s)\t%s\n" %(node, type, time.ctime(date))
958         #       msg +="\n\nThe following sites have been 'squeezed':\n\n"
959         #       for (loginbase, (date, type)) in self.squeezed.items():
960         #               # Print only things acted on today.
961         #               if (time.gmtime(time.time())[2] == time.gmtime(date)[2]):
962         #                       msg +="%s\t(%s)\t%s\n" %(loginbase, type, time.ctime(date))
963         #       mailer.email(sub, msg, [SUMTO])
964         #       logger.info(msg)
965         #       return 
966
967         #"""
968         #Store/Load state of emails.  When, where, what.
969         #"""
970         #def emailedStore(self, action):
971         #       try:
972         #               if action == "LOAD":
973         #                       f = open(DAT, "r+")
974         #                       logger.info("POLICY:  Found and reading " + DAT)
975         #                       self.emailed.update(pickle.load(f))
976         #               if action == "WRITE":
977         #                       f = open(DAT, "w")
978         #                       #logger.debug("Writing " + DAT)
979         #                       pickle.dump(self.emailed, f)
980         #               f.close()
981         #       except Exception, err:
982         #               logger.info("POLICY:  Problem with DAT, %s" %err)
983
984         """
985         Returns number of up nodes as the total number *NOT* in act_all with a
986         stage other than 'steady-state' .
987         """
988         def currentUpAtSite(self, loginbase):
989                 allsitenodes = plc.getSiteNodes(loginbase)
990                 if len(allsitenodes) == 0:
991                         logger.info("Site has no nodes or not in DB")
992                         print "Site has no nodes or not in DB"
993                         return
994
995                 numnodes = len(allsitenodes)
996                 sicknodes = []
997                 # Get all sick nodes at this site
998                 up = 0
999                 down = 0
1000                 for node in allsitenodes:
1001
1002                         nodename = node
1003                         if nodename in self.act_all: # [nodename]:
1004                                 rec = self.act_all[nodename][0]
1005                                 if rec['stage'] != "steady-state":
1006                                         down += 1
1007                                 else:
1008                                         up += 1
1009                         else:
1010                                 up += 1
1011
1012                 if up + down != numnodes:
1013                         print "ERROR: %s total nodes up and down != %d" % (loginbase, numnodes)
1014
1015                 return up
1016
1017 #class Policy(Thread):
1018
1019 def main():
1020         print "policy.py is a module, not a script for running directly."
1021
1022 if __name__ == '__main__':
1023         import os
1024         import plc
1025         try:
1026                 main()
1027         except KeyboardInterrupt:
1028                 print "Killed.  Exitting."
1029                 logger.info('Monitor Killed')
1030                 os._exit(0)