Rewrite of policy engine.
[monitor.git] / policy.py
1 #
2 # Copyright (c) 2004  The Trustees of Princeton University (Trustees).
3 #
4 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
5 #
6 # $Id: policy.py,v 1.12 2007/04/06 17:38:14 faiyaza Exp $
7 #
8 # Policy Engine.
9
10 #from monitor import *
11 from threading import *
12 import time
13 import logging
14 import mailer
15 import emailTxt
16 import pickle
17 import Queue
18 import plc
19 import reboot
20 import config
21
22 DAT="./monitor.dat"
23
24 logger = logging.getLogger("monitor")
25
26 # Time to enforce policy
27 POLSLEEP = 7200
28
29 # Where to email the summary
30 SUMTO = "faiyaza@cs.princeton.edu"
31 TECHEMAIL="tech-%s@sites.planet-lab.org"
32 PIEMAIL="pi-%s@sites.planet-lab.org"
33 SLICEMAIL="%s@slices.planet-lab.org"
34 PLCEMAIL="support@planet-lab.org"
35
36 #Thresholds (DAYS)
37 SPERDAY = 86400
38 PITHRESH = 7 * SPERDAY
39 SLICETHRESH = 7 * SPERDAY
40 # Days before attempting rins again
41 RINSTHRESH = 5 * SPERDAY
42
43 # Days before calling the node dead.
44 DEADTHRESH = 30 * SPERDAY
45 # Minimum number of nodes up before squeezing
46 MINUP = 2
47
48 # IF:
49 #  no SSH, down.
50 #  bad disk, down
51 #  DNS, kinda down (sick)
52 #  clock, kinda down (sick)
53 #  Full disk, going to be down
54
55 # Actions:
56 #  Email
57 #  suspend slice creation
58 #  kill slices
59
60
61 class Policy(Thread):
62         def __init__(self, comonthread, sickNoTicket, emailed):
63                 self.cmn = comonthread
64                 # host - > (time of email, type of email)
65                 self.emailed = emailed 
66                 # all sick nodes w/o tickets
67                 # from thread 
68                 self.sickNoTicket = sickNoTicket
69                 # Actions taken on nodes.
70                 # actionlogdb{node: [action, date]} 
71                 self.actionlogdb = {}
72                 # Actions taken on sites.
73                 # sitelogdb{site: [action, daysdown, date]} 
74                 self.sitelogdb = {}
75                 # sick nodes with no tickets 
76                 # sickdb{loginbase: [{hostname1: [buckets]}, {...}]}
77                 self.sickdb = {}
78                 Thread.__init__(self)
79
80
81         def accumSickSites(self):
82                 """
83                 Take all sick nodes, find their sites, and put in 
84                 sickdb{loginbase: [{hostname1: [buckets]}, {...}]}
85                 """
86                 while self.sickNoTicket.empty() == False:
87                         node = self.sickNoTicket.get(block = True)
88                         bkts= []
89                         for bkt in self.cmn.comonbkts.keys():
90                                 if (node in getattr(self.cmn, bkt)):
91                                         bkts.append("%s" % bkt)
92                         self.sickdb[plc.siteId(node)] = {node: bkts}
93
94
95         def __actOnDebug(self, node):
96                 """
97                 If in debug, set the node to rins, reboot via PCU/POD
98                 """
99                 daysdown = self.cmn.codata[node]['sshstatus'] // (60*60*24)
100                 logger.info("POLICY:  Node %s in dbg.  down for %s" %(node,daysdown))
101                 plc.nodeBootState(node, "rins") 
102                 # If it has a PCU
103                 reboot.reboot(node)
104                 # Log it 
105                 self.actionlogdb[node] = ['rins', daysdown, time.time()] 
106
107
108         def __actOnDown(self, node):
109                 """
110                 If down (not debug), do the same as actOnDebug for now
111                 """
112                 self.__actOnDebug(node) 
113
114
115         def __actOnFilerw(self, node):
116                 """
117                 Report to PLC when node needs disk checked.     
118                 """
119                 target = [PLCEMAIL]     
120                 logger.info("POLICY:  Emailing PLC for " + node)
121                 tmp = emailTxt.mailtxt.filerw
122                 sbj = tmp[0] % {'hostname': node}
123                 msg = tmp[1] % {'hostname': node}
124                 mailer.email(sbj, msg, target)  
125                 self.actionlogdb[node] = ["filerw", None, time.time()]
126
127
128         def __actOnDNS(self, node):
129                 """
130                 """
131
132
133         def __policy(self, node, loginbase, bkt):
134                 # ...and spam 'em
135                 target = [TECHEMAIL % loginbase]
136                 tmp = emailTxt.mailtxt.down
137                 sbj = tmp[0] % {'hostname': node}
138                 msg = tmp[1] % {'hostname': node, 'days': daysdown}
139                 mailer.email(sbj, msg, target)  
140
141
142
143
144         def actOnSick(self):
145                 """
146                 Acts on sick nodes.
147                 """
148                 global TECHEMAIL, PIEMAIL
149                 
150                 # Princeton Backdoor
151                 if loginbase == "princeton": return
152
153                 # Send appropriate message for node if in appropriate bucket.
154                 # If we know where to send a message
155                 if not loginbase: 
156                         logger.info("POLICY:  loginbase for %s not found" %node)
157                 # And we didn't email already.
158                 else:
159                         # If first email, send to Tech
160                         target = [TECHEMAIL % loginbase]
161                         
162                         # If disk is foobarred, PLC should check it.
163                         if (node in self.cmn.filerw) and \
164                         (node not in self.emailed.keys()):
165                                 self.__actOnFilerw(node)
166                                 return 
167
168                         # If in dbg, set to rins, then reboot.  Inform PLC.
169                         if (node in self.cmn.dbg):
170                                 self.__actOnDebug(node)
171
172                         if (node in self.emailed.keys()) and \
173                         (node not in self.cmn.filerw)    and \
174                         (node not in self.cmn.clock_drift):
175                                 # If we emailed before, how long ago?   
176                                 delta = time.time() - self.emailed[node][1]
177                                 if delta < SPERDAY:  
178                                         logger.info("POLICY:  already acted on %s today." % node)
179                                         return
180
181                                 logger.info("POLICY:  acted %s on %s days ago" % (node, 
182                                 delta // SPERDAY))
183                         
184                                 # If no luck with tech, email PI
185                                 if (delta >= SPERDAY):
186                                         target.append(PIEMAIL % loginbase)
187
188                                 if (delta >= 7 * SPERDAY): 
189                                         #remove slice creation if enough nodes arent up
190                                         if not self.enoughUp(loginbase):
191                                                 slices = plc.slices(loginbase)
192                                                 if len(slices) >= 1:
193                                                         for slice in slices:
194                                                                 target.append(SLICEMAIL % slice)
195                                                 logger.info("POLICY:  Removing slice creation from %s" % loginbase)
196                                                 tmp = emailTxt.mailtxt.removedSliceCreation
197                                                 sbj = tmp[0] 
198                                                 msg = tmp[1] % {'loginbase': loginbase}
199                                                 plc.removeSliceCreation(node)
200                                                 mailer.email(sbj, msg, target)  
201                                                 self.squeezed[loginbase] = (time.time(), "creation")
202                                                 self.emailed[node] = ("creation", time.time())  
203                                                 logger.info("POLICY: Emailing (%s) %s - %s"\
204                                                         %("creation", node, target))
205                                                 return
206
207                                 if (delta >= 14 * SPERDAY):
208                                         target.append(PIEMAIL % loginbase)
209                                         # Email slices at site.
210                                         slices = plc.slices([loginbase])
211                                         if len(slices) >= 1:
212                                                 for slice in slices:
213                                                         target.append(SLICEMAIL % slice)
214                                         # If not enough up, freeze slices and email everyone.
215                                         if not self.enoughUp(loginbase):
216                                                 logger.info("POLICY:  Suspending %s slices." % loginbase)
217                                                 tmp = emailTxt.mailtxt.suspendSlices
218                                                 sbj = tmp[0] 
219                                                 msg = tmp[1] % {'loginbase': loginbase}
220                                                 plc.suspendSlices([node])
221                                                 self.squeezed[loginbase] = (time.time(), "freeze")
222                                                 mailer.email(sbj, msg, target)  
223                                                 self.emailed[node] = ("freeze", time.time())
224                                                 logger.info("POLICY: Emailing (%s) %s - %s"\
225                                                         %("freeze", node, target))
226
227                                                 return
228
229                         # Find the bucket the node is in and send appropriate email
230                         # to approriate list of people.
231                         for bkt in self.cmn.comonbkts.keys():
232                                 if (node in getattr(self.cmn, bkt)):
233                                         # Send predefined message for that bucket.
234                                         logger.info("POLICY: Emailing (%s) %s - %s"\
235                                                 %(bkt, node, target))
236                                         tmp = getattr(emailTxt.mailtxt, bkt)
237                                         sbj = tmp[0] % {'hostname': node}
238                                         msg = tmp[1] % {'hostname': node}
239                                         mailer.email(sbj, msg, target)  
240                                         self.emailed[node] = (bkt , time.time())
241                                         return
242
243
244         """
245         Prints, logs, and emails status of up nodes, down nodes, and buckets.
246         """
247         def status(self):
248                 sub = "Monitor Summary"
249                 msg = "\nThe following nodes were acted upon:  \n\n"
250                 for (node, (type, date)) in self.emailed.items():
251                         # Print only things acted on today.
252                         if (time.gmtime(time.time())[2] == time.gmtime(date)[2]):
253                                 msg +="%s\t(%s)\t%s\n" %(node, type, time.ctime(date))
254                 msg +="\n\nThe following sites have been 'squeezed':\n\n"
255                 for (loginbase, (date, type)) in self.squeezed.items():
256                         # Print only things acted on today.
257                         if (time.gmtime(time.time())[2] == time.gmtime(date)[2]):
258                                 msg +="%s\t(%s)\t%s\n" %(loginbase, type, time.ctime(date))
259                 mailer.email(sub, msg, [SUMTO])
260                 logger.info(msg)
261                 return 
262
263         """
264         Store/Load state of emails.  When, where, what.
265         """
266         def emailedStore(self, action):
267                 try:
268                         if action == "LOAD":
269                                 f = open(DAT, "r+")
270                                 logger.info("POLICY:  Found and reading " + DAT)
271                                 self.emailed.update(pickle.load(f))
272                         if action == "WRITE":
273                                 f = open(DAT, "w")
274                                 #logger.debug("Writing " + DAT)
275                                 pickle.dump(self.emailed, f)
276                         f.close()
277                 except Exception, err:
278                         logger.info("POLICY:  Problem with DAT, %s" %err)
279
280         """
281         Returns True if more than MINUP nodes are up at a site.
282         """
283         def enoughUp(self, loginbase):
284                 allsitenodes = plc.getSiteNodes([loginbase])
285                 if len(allsitenodes) == 0:
286                         logger.info("Node not in db")
287                         return
288
289                 numnodes = len(allsitenodes)
290                 sicknodes = []
291                 # Get all sick nodes from comon
292                 for bucket in self.cmn.comonbkts.keys():
293                         for host in getattr(self.cmn, bucket):
294                                 sicknodes.append(host)
295                 # Diff.
296                 for node in allsitenodes:
297                         if node in sicknodes:
298                                 numnodes -= 1
299
300                 if numnodes < MINUP:
301                         logger.info(\
302 "POLICY:  site with %s has nodes %s up." %(loginbase, numnodes))
303                         return False 
304                 else: 
305                         return True 
306                         
307                 
308
309
310         def run(self):
311                 self.accumSickSites()
312                 #self.actOnSick()
313                 #self.emailedStore("WRITE")
314                 print self.sickdb
315         
316
317
318 def main():
319         logger.setLevel(logging.DEBUG)
320         ch = logging.StreamHandler()
321         ch.setLevel(logging.DEBUG)
322         formatter = logging.Formatter('%(message)s')
323         ch.setFormatter(formatter)
324         logger.addHandler(ch)
325
326         #print NodesDebug()
327         #tmp = Queue.Queue()
328         #a = Policy(None, tmp) 
329         #a.emailedStore("LOAD")
330         #print a.emailed
331
332         #print plc.slices([plc.siteId(["alice.cs.princeton.edu"])])
333         os._exit(0)
334 if __name__ == '__main__':
335         import os
336         import plc
337         try:
338                 main()
339         except KeyboardInterrupt:
340                 print "Killed.  Exitting."
341                 logger.info('Monitor Killed')
342                 os._exit(0)