3 # Copyright (c) 2004 The Trustees of Princeton University (Trustees).
5 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
6 # Stephen Soltesz <soltesz@cs.princeton.edu>
11 from threading import *
17 # Global config options
18 import parser as parsermodule
20 parser = parsermodule.getParser()
22 parser.set_defaults(nodelist=None,
28 parser.add_option("", "--nodelist", dest="nodelist",
29 help="Read nodes to act on from specified file")
30 parser.add_option("", "--cachert", action="store_true",
31 help="Cache the RT database query")
32 parser.add_option("", "--cachenodes", action="store_true",
33 help="Cache node lookup from PLC")
34 parser.add_option("", "--ticketlist", dest="ticketlist",
35 help="Whitelist all RT tickets in this file")
36 parser.add_option("", "--blacklist", dest="blacklist",
37 help="Blacklist all nodes in this file")
39 config = parsermodule.parse_args(parser)
42 #from util.process import *
46 # Correlates input with policy to form actions
54 # Time to refresh DB and remove unused entries
56 # Time between policy enforce/update
57 #POLSLEEP=43200 #12hrs
60 # Global list of all running threads. Any threads added to
61 # list will be monitored.
63 # Seconds between checking threads
67 logger = logging.getLogger("monitor")
68 logger.setLevel(logging.DEBUG)
69 fh = logging.FileHandler(LOG, mode = 'a')
70 fh.setLevel(logging.DEBUG)
71 formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
72 fh.setFormatter(formatter)
77 Launches threads and adds them to the runningthreads global list.
78 Assigns name for thread, starts.
80 def startThread(fnct, name):
81 runningthreads[name] = fnct
82 runningthreads[name].setName(name)
84 logger.info("Starting thread " + name)
85 runningthreads[name].start()
86 except Exception, err:
87 logger.error("Thread: " + name + " " + error)
91 Watches threads and catches exceptions. Each launched thread is
92 watched and state is logged.
94 class ThreadWatcher(Thread):
101 time.sleep(WATCHSLEEP)
103 def checkThreads(self):
104 # Iterate through treads, compare with last running.
105 for thread in runningthreads.keys():
106 # If thread found dead, remove from queue
107 #print "found %s" % thread
108 if not runningthreads[thread].isAlive():
109 logger.error("***********Thread died: %s**********" %(thread))
110 del runningthreads[thread]
111 return len(runningthreads.keys())
116 Thread.__init__(self)
121 def dict_from_nodelist(nl):
129 Start threads, do some housekeeping, then daemonize.
133 global status, logger
136 logger.info('Action Started')
137 print 'Action Started'
139 ######### GET NODES ########################################
140 logger.info('Get Nodes from PLC')
141 print "getnode from plc"
142 l_plcnodes = database.if_cached_else(True,
144 lambda : plc.getNodes({'peer_id':None}))
146 s_plcnodenames = Set([x['hostname'] for x in l_plcnodes])
148 # List of nodes from a user-provided file.
150 file = config.nodelist
151 nodelist = config.getListFromFile(file)
152 #for node in nodelist:
155 s_usernodes = Set(nodelist)
156 # SAFE nodes are in PLC and the list
157 s_safe_usernodes = s_plcnodenames & s_usernodes
158 # UNSAFE nodes are in list but not in PLC. i.e. ignore them.
159 s_unsafe_usernodes = s_usernodes - s_plcnodenames
160 if len(s_unsafe_usernodes) > 0 :
161 for node in s_unsafe_usernodes:
162 print "WARNING: User provided: %s but not found in PLC" % node
164 l_nodes = filter(lambda x: x['hostname'] in s_safe_usernodes,l_plcnodes)
168 print "len of l_nodes: %d" % len(l_nodes)
169 # Minus blacklisted ones..
170 l_ticket_blacklist = database.if_cached_else(1,"l_ticket_blacklist",lambda : [])
172 l_blacklist = database.if_cached_else(1, "l_blacklist", lambda : [])
173 l_nodes = filter(lambda x : not x['hostname'] in l_blacklist, l_nodes)
175 ####### Get RT tickets #########################################
176 #logger.info('Get Tickets from RT')
177 #t = commands.MyTimer()
178 #ad_dbTickets = database.if_cached_else(config.cachert, "ad_dbTickets", rt.rt_tickets)
179 #print "Getting tickets from RT took: %f sec" % t.diff() ; del t
181 logger.info('Start Action thread')
183 action = policy.Action( [node['hostname'] for node in l_nodes] )
184 startThread(action,"action")
189 if tw.checkThreads() == 0:
191 time.sleep(WATCHSLEEP)
193 logger.info('Action Exitting')
196 if __name__ == '__main__':
199 except KeyboardInterrupt:
200 print "Killed. Exitting."
201 logger.info('Action Killed')