2 # Copyright (c) 2004 The Trustees of Princeton University (Trustees).
4 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
12 from threading import *
17 from util.process import *
23 # Correlates input with policy to form actions
36 FROM="support@planet-lab.org"
38 XMLRPC_SERVER = 'https://www.planet-lab.org/PLCAPI/'
39 # Time between comon refresh
41 # Time to refresh DB and remove unused entries
43 # Time between policy enforce/update
44 #POLSLEEP=43200 #12hrs
47 # Global list of all running threads. Any threads added to
48 # list will be monitored.
50 # Seconds between checking threads
54 logger = logging.getLogger("monitor")
55 logger.setLevel(logging.DEBUG)
56 fh = logging.FileHandler(LOG, mode = 'a')
57 fh.setLevel(logging.DEBUG)
58 formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
59 fh.setFormatter(formatter)
64 Usage: %s [OPTIONS]...
67 -d, --debug Enable debugging (default: %s)
68 --status Print memory usage statistics and exit
69 -h, --help This message
70 """.lstrip() % (sys.argv[0], debug)
74 Launches threads and adds them to the runningthreads global list.
75 Assigns name for thread, starts.
77 def startThread(fnct, name):
78 runningthreads[name] = fnct
79 runningthreads[name].setName(name)
81 logger.info("Starting thread " + name)
82 runningthreads[name].start()
83 except Exception, err:
84 logger.error("Thread: " + name + " " + error)
88 Watches threads and catches exceptions. Each launched thread is
89 watched and state is logged.
91 class ThreadWatcher(Thread):
98 time.sleep(WATCHSLEEP)
100 def checkThreads(self):
101 # Iterate through treads, compare with last running.
102 for thread in runningthreads.keys():
103 # If thread found dead, remove from queue
104 if not runningthreads[thread].isAlive():
105 logger.error("Thread Died: %s" %(thread))
106 del runningthreads[thread]
111 Thread.__init__(self)
118 Start threads, do some housekeeping, then daemonize.
122 global debug, status, logger
125 longopts = ["debug", "status", "help"]
126 (opts, argv) = getopt.getopt(sys.argv[1:], "dvf:s:ph", longopts)
127 except getopt.GetoptError, err:
128 print "Error: " + err.msg
132 for (opt, optval) in opts:
133 if opt == "-d" or opt == "--debug":
135 elif opt == "--status":
136 #print summary(names)
144 # writepid("monitor")
146 # Init stuff. Watch Threads to see if they die. Perhaps send email?
147 logger.info('Monitor Started')
148 startThread(ThreadWatcher(), "Watcher")
152 bucket = Queue.Queue()
153 # Comon DB of all nodes
155 # Nodes that are down. Use this to maintain DB; cleanup.
156 alldown = Queue.Queue()
161 # Event based. Add to queue(bucket) and hosts are queried.
162 rt1 = rt.RT(tickets, bucket)
163 rt2 = rt.RT(tickets, bucket)
164 rt3 = rt.RT(tickets, bucket)
165 rt4 = rt.RT(tickets, bucket)
166 rt5 = rt.RT(tickets, bucket)
167 # Kind of a hack. Cleans the DB for stale entries and updates db.
168 clean = Thread(target=rt5.cleanTickets)
169 # Poll Comon. Refreshes Comon data every COSLEEP seconds
170 cm1 = comon.Comon(cdb, bucket)
173 startThread(rt1,"rt1")
174 startThread(rt2,"rt2")
175 startThread(rt3,"rt3")
176 startThread(rt4,"rt4")
177 startThread(rt5,"rt5")
178 startThread(clean,"cleanrt5")
179 startThread(cm1,"rt5")
182 # Actually digest the info and do something with it.
183 pol = policy.Policy(cm1, tickets)
185 while bucket.empty() == False:
188 startThread(pol, "policy")
191 #print runningthreads["RT"].ssh
193 logger.info('Monitor Exitted')
195 # removepid("monitor")
198 if __name__ == '__main__':
201 except KeyboardInterrupt:
202 print "Killed. Exitting."
203 logger.info('Monitor Killed')