hpc observer, wip
[plstackapi.git] / planetstack / observer / event_loop.py
1 import os
2 import imp
3 import inspect
4 import time
5 import traceback
6 import commands
7 import threading
8 import json
9
10 from datetime import datetime
11 from collections import defaultdict
12 from core.models import *
13 from django.db.models import F, Q
14 #from openstack.manager import OpenStackManager
15 from openstack.driver import OpenStackDriver
16 from util.logger import Logger, logging, logger
17 #from timeout import timeout
18 from planetstack.config import Config
19 from observer.steps import *
20 from syncstep import SyncStep
21
22 debug_mode = False
23
24 logger = Logger(level=logging.INFO)
25
26 class StepNotReady(Exception):
27     pass
28
29 def toposort(g, steps=None):
30     if (not steps):
31         keys = set(g.keys())
32         values = set({})
33         for v in g.values():
34             values=values | set(v)
35         
36         steps=list(keys|values)
37     reverse = {}
38
39     for k,v in g.items():
40         for rk in v:
41             try:
42                 reverse[rk].append(k)
43             except:
44                 reverse[rk]=k
45
46     sources = []
47     for k,v in g.items():
48         if not reverse.has_key(k):
49             sources.append(k)
50
51
52     for k,v in reverse.iteritems():
53         if (not v):
54             sources.append(k)
55
56     order = []
57     marked = []
58
59     while sources:
60         n = sources.pop()
61         try:
62             for m in g[n]:
63                 if m not in marked:
64                     sources.append(m)
65                     marked.append(m)
66         except KeyError:
67             pass
68         if (n in steps):
69             order.append(n)
70
71     order.reverse()
72     order.extend(set(steps)-set(order))
73     return order
74
75 class NoOpDriver:
76     def __init__(self):
77          self.enabled = True
78
79 class PlanetStackObserver:
80     #sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps,SyncExternalRoutes,SyncUsers,SyncRoles,SyncNodes,SyncImages,GarbageCollector]
81     sync_steps = []
82
83     def __init__(self):
84         # The Condition object that gets signalled by Feefie events
85         self.step_lookup = {}
86         self.load_sync_step_modules()
87         self.load_sync_steps()
88         self.event_cond = threading.Condition()
89
90
91         self.driver_kind = getattr(Config(), "observer_driver", "openstack")
92         if self.driver_kind=="openstack":
93             self.driver = OpenStackDriver()
94         else:
95             self.driver = NoOpDriver()
96
97     def wait_for_event(self, timeout):
98         self.event_cond.acquire()
99         self.event_cond.wait(timeout)
100         self.event_cond.release()
101
102     def wake_up(self):
103         logger.info('Wake up routine called. Event cond %r'%self.event_cond)
104         self.event_cond.acquire()
105         self.event_cond.notify()
106         self.event_cond.release()
107
108     def load_sync_step_modules(self, step_dir=None):
109         if step_dir is None:
110             if hasattr(Config(), "observer_steps_dir"):
111                 step_dir = Config().observer_steps_dir
112             else:
113                 step_dir = "/opt/planetstack/observer/steps"
114
115         for fn in os.listdir(step_dir):
116             pathname = os.path.join(step_dir,fn)
117             if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py"):
118                 module = imp.load_source(fn[:-3],pathname)
119                 for classname in dir(module):
120                     c = getattr(module, classname, None)
121
122                     # make sure 'c' is a descendent of SyncStep and has a
123                     # provides field (this eliminates the abstract base classes
124                     # since they don't have a provides)
125
126                     if inspect.isclass(c) and issubclass(c, SyncStep) and hasattr(c,"provides") and (c not in self.sync_steps):
127                         self.sync_steps.append(c)
128         logger.info('loaded sync steps: %s' % ",".join([x.__name__ for x in self.sync_steps]))
129         # print 'loaded sync steps: %s' % ",".join([x.__name__ for x in self.sync_steps])
130
131     def load_sync_steps(self):
132         dep_path = Config().observer_dependency_graph
133         logger.info('Loading model dependency graph from %s' % dep_path)
134         try:
135             # This contains dependencies between records, not sync steps
136             self.model_dependency_graph = json.loads(open(dep_path).read())
137         except Exception,e:
138             raise e
139
140         try:
141             backend_path = Config().observer_pl_dependency_graph
142             logger.info('Loading backend dependency graph from %s' % backend_path)
143             # This contains dependencies between backend records
144             self.backend_dependency_graph = json.loads(open(backend_path).read())
145         except Exception,e:
146             logger.info('Backend dependency graph not loaded')
147             # We can work without a backend graph
148             self.backend_dependency_graph = {}
149
150         provides_dict = {}
151         for s in self.sync_steps:
152             self.step_lookup[s.__name__] = s 
153             for m in s.provides:
154                 try:
155                     provides_dict[m.__name__].append(s.__name__)
156                 except KeyError:
157                     provides_dict[m.__name__]=[s.__name__]
158
159                 
160         step_graph = {}
161         for k,v in self.model_dependency_graph.iteritems():
162             try:
163                 for source in provides_dict[k]:
164                     for m in v:
165                         try:
166                             for dest in provides_dict[m]:
167                                 # no deps, pass
168                                 try:
169                                     step_graph[source].append(dest)
170                                 except:
171                                     step_graph[source]=[dest]
172                         except KeyError:
173                             pass
174                     
175             except KeyError:
176                 pass
177                 # no dependencies, pass
178         
179         #import pdb
180         #pdb.set_trace()
181         if (self.backend_dependency_graph):
182             backend_dict = {}
183             for s in self.sync_steps:
184                 for m in s.serves:
185                     backend_dict[m]=s.__name__
186                     
187             for k,v in backend_dependency_graph.iteritems():
188                 try:
189                     source = backend_dict[k]
190                     for m in v:
191                         try:
192                             dest = backend_dict[m]
193                         except KeyError:
194                             # no deps, pass
195                             pass
196                         step_graph[source]=dest
197                         
198                 except KeyError:
199                     pass
200                     # no dependencies, pass
201
202         dependency_graph = step_graph
203
204         self.ordered_steps = toposort(dependency_graph, map(lambda s:s.__name__,self.sync_steps))
205         print "Order of steps=",self.ordered_steps
206         self.load_run_times()
207         
208
209     def check_duration(self, step, duration):
210         try:
211             if (duration > step.deadline):
212                 logger.info('Sync step %s missed deadline, took %.2f seconds'%(step.name,duration))
213         except AttributeError:
214             # S doesn't have a deadline
215             pass
216
217     def update_run_time(self, step):
218         self.last_run_times[step.__name__]=time.time()
219
220     def check_schedule(self, step):
221         time_since_last_run = time.time() - self.last_run_times.get(step.__name__, 0)
222         try:
223             if (time_since_last_run < step.requested_interval):
224                 raise StepNotReady
225         except AttributeError:
226             logger.info('Step %s does not have requested_interval set'%step.__name__)
227             raise StepNotReady
228     
229     def load_run_times(self):
230         try:
231             jrun_times = open('/tmp/observer_run_times').read()
232             self.last_run_times = json.loads(jrun_times)
233         except:
234             self.last_run_times={}
235             for e in self.ordered_steps:
236                 self.last_run_times[e]=0
237
238
239     def save_run_times(self):
240         run_times = json.dumps(self.last_run_times)
241         open('/tmp/observer_run_times','w').write(run_times)
242
243     def check_class_dependency(self, step, failed_steps):
244         step.dependenices = []
245         for obj in step.provides:
246             step.dependenices.extend(self.model_dependency_graph.get(obj.__name__, []))
247         for failed_step in failed_steps:
248             if (failed_step in step.dependencies):
249                 raise StepNotReady
250
251     def run(self):
252         if not self.driver.enabled:
253             return
254         if (self.driver_kind=="openstack") and (not self.driver.has_openstack):
255             return
256
257         while True:
258             try:
259                 logger.info('Waiting for event')
260                 tBeforeWait = time.time()
261                 self.wait_for_event(timeout=30)
262                 logger.info('Observer woke up')
263
264                 # Set of whole steps that failed
265                 failed_steps = []
266
267                 # Set of individual objects within steps that failed
268                 failed_step_objects = set()
269
270                 for S in self.ordered_steps:
271                     step = self.step_lookup[S]
272                     start_time=time.time()
273                     
274                     sync_step = step(driver=self.driver)
275                     sync_step.__name__ = step.__name__
276                     sync_step.dependencies = []
277                     try:
278                         mlist = sync_step.provides
279                         
280                         for m in mlist:
281                             sync_step.dependencies.extend(self.model_dependency_graph[m.__name__])
282                     except KeyError:
283                         pass
284                     sync_step.debug_mode = debug_mode
285
286                     should_run = False
287                     try:
288                         # Various checks that decide whether
289                         # this step runs or not
290                         self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
291                         self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour
292                         should_run = True
293                     except StepNotReady:
294                         logging.info('Step not ready: %s'%sync_step.__name__)
295                         failed_steps.append(sync_step)
296                     except:
297                         failed_steps.append(sync_step)
298
299                     if (should_run):
300                         try:
301                             duration=time.time() - start_time
302
303                             logger.info('Executing step %s' % sync_step.__name__)
304
305                             # ********* This is the actual sync step
306                             #import pdb
307                             #pdb.set_trace()
308                             failed_objects = sync_step(failed=list(failed_step_objects))
309
310
311                             self.check_duration(sync_step, duration)
312                             if failed_objects:
313                                 failed_step_objects.update(failed_objects)
314                             self.update_run_time(sync_step)
315                         except:
316                             failed_steps.append(S)
317                 self.save_run_times()
318             except:
319                 logger.log_exc("Exception in observer run loop")
320                 traceback.print_exc()