3 # Copyright (c) 2004 The Trustees of Princeton University (Trustees).
5 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
13 from threading import *
18 from util.process import *
24 # Correlates input with policy to form actions
40 FROM="support@planet-lab.org"
43 XMLRPC_SERVER = 'https://www.planet-lab.org/PLCAPI/'
45 # Time between comon refresh
47 # Time to refresh DB and remove unused entries
49 # Time between policy enforce/update
50 #POLSLEEP=43200 #12hrs
53 # Global list of all running threads. Any threads added to
54 # list will be monitored.
56 # Seconds between checking threads
60 logger = logging.getLogger("monitor")
61 logger.setLevel(logging.DEBUG)
62 fh = logging.FileHandler(LOG, mode = 'a')
63 fh.setLevel(logging.DEBUG)
64 formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
65 fh.setFormatter(formatter)
70 Usage: %s [OPTIONS]...
73 -d, --debug Enable debugging (default: %s)
74 --status Print memory usage statistics and exit
75 -h, --help This message
76 """.lstrip() % (sys.argv[0], debug)
80 Launches threads and adds them to the runningthreads global list.
81 Assigns name for thread, starts.
83 def startThread(fnct, name):
84 runningthreads[name] = fnct
85 runningthreads[name].setName(name)
87 logger.info("Starting thread " + name)
88 runningthreads[name].start()
89 except Exception, err:
90 logger.error("Thread: " + name + " " + error)
94 Watches threads and catches exceptions. Each launched thread is
95 watched and state is logged.
97 class ThreadWatcher(Thread):
104 time.sleep(WATCHSLEEP)
106 def checkThreads(self):
107 # Iterate through treads, compare with last running.
108 for thread in runningthreads.keys():
109 # If thread found dead, remove from queue
110 if not runningthreads[thread].isAlive():
111 logger.error("Thread Died: %s" %(thread))
112 del runningthreads[thread]
117 Thread.__init__(self)
124 Start threads, do some housekeeping, then daemonize.
128 global debug, status, logger
131 longopts = ["debug", "status", "help"]
132 (opts, argv) = getopt.getopt(sys.argv[1:], "dvf:s:ph", longopts)
133 except getopt.GetoptError, err:
134 print "Error: " + err.msg
138 for (opt, optval) in opts:
139 if opt == "-d" or opt == "--debug":
141 elif opt == "--status":
142 #print summary(names)
150 # writepid("monitor")
152 # Init stuff. Watch Threads to see if they die. Perhaps send email?
153 logger.info('Monitor Started')
154 startThread(ThreadWatcher(), "Watcher")
157 # Nodes to check. Queue of all sick nodes.
158 toCheck = Queue.Queue()
159 # Nodes that are sick w/o tickets
160 sickNoTicket = Queue.Queue()
161 # Comon DB of all nodes
163 # Nodes that are down. Use this to maintain DB; cleanup.
164 #alldown = Queue.Queue()
167 # Nodes we've emailed.
168 # host - > (type of email, time)
173 # Event based. Add to queue(toCheck) and hosts are queried.
174 rt1 = rt.RT(tickets, toCheck, sickNoTicket)
175 rt2 = rt.RT(tickets, toCheck, sickNoTicket)
176 rt3 = rt.RT(tickets, toCheck, sickNoTicket)
177 rt4 = rt.RT(tickets, toCheck, sickNoTicket)
178 rt5 = rt.RT(tickets, toCheck, sickNoTicket)
179 # Kind of a hack. Cleans the DB for stale entries and updates db.
180 clean = Thread(target=rt5.cleanTickets)
181 # Poll Comon. Refreshes Comon data every COSLEEP seconds
182 cm1 = comon.Comon(cdb, toCheck)
184 # Actually digest the info and do something with it.
185 pol = policy.Policy(cm1, sickNoTicket, emailed)
187 # Load emailed sites from last run.
188 pol.emailedStore("LOAD")
191 startThread(rt1,"rt1")
192 startThread(rt2,"rt2")
193 startThread(rt3,"rt3")
194 startThread(rt4,"rt4")
195 startThread(rt5,"rt5")
196 startThread(clean,"cleanrt5")
199 startThread(cm1,"comon")
201 # Wait for threads to init. Probably should join, but work on that later.
203 # Start Sending Emails
204 startThread(pol, "policy")
207 while (sickNoTicket.empty() == False) or (toCheck.empty() == False):
211 pol.emailedStore("WRITE")
212 logger.info('Monitor Exitted')
214 # removepid("monitor")
217 if __name__ == '__main__':
220 except KeyboardInterrupt:
221 print "Killed. Exitting."
222 logger.info('Monitor Killed')