e1dda0e04cc0fe0e1faa744d74f62554c6c699ea
[plstackapi.git] / planetstack / observer / event_loop.py
1 import os
2 import imp
3 import inspect
4 import time
5 import traceback
6 import commands
7 import threading
8 import json
9
10 from datetime import datetime
11 from collections import defaultdict
12 from core.models import *
13 from django.db.models import F, Q
14 #from openstack.manager import OpenStackManager
15 from openstack.driver import OpenStackDriver
16 from util.logger import Logger, logging, logger
17 #from timeout import timeout
18 from planetstack.config import Config
19 from observer.steps import *
20 from syncstep import SyncStep
21
22 debug_mode = False
23
24 logger = Logger(level=logging.INFO)
25
26 class StepNotReady(Exception):
27     pass
28
29 def toposort(g, steps=None):
30     if (not steps):
31         keys = set(g.keys())
32         values = set({})
33         for v in g.values():
34             values=values | set(v)
35         
36         steps=list(keys|values)
37     reverse = {}
38
39     for k,v in g.items():
40         for rk in v:
41             try:
42                 reverse[rk].append(k)
43             except:
44                 reverse[rk]=k
45
46     sources = []
47     for k,v in g.items():
48         if not reverse.has_key(k):
49             sources.append(k)
50
51
52     for k,v in reverse.iteritems():
53         if (not v):
54             sources.append(k)
55
56     order = []
57     marked = []
58
59     while sources:
60         n = sources.pop()
61         try:
62             for m in g[n]:
63                 if m not in marked:
64                     sources.append(m)
65                     marked.append(m)
66         except KeyError:
67             pass
68         if (n in steps):
69             order.append(n)
70
71     order.reverse()
72     order.extend(set(steps)-set(order))
73     return order
74
75 class PlanetStackObserver:
76     #sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps,SyncExternalRoutes,SyncUsers,SyncRoles,SyncNodes,SyncImages,GarbageCollector]
77     sync_steps = []
78
79     def __init__(self):
80         # The Condition object that gets signalled by Feefie events
81         self.step_lookup = {}
82         self.load_sync_step_modules()
83         self.load_sync_steps()
84         self.event_cond = threading.Condition()
85         self.driver = OpenStackDriver()
86
87     def wait_for_event(self, timeout):
88         self.event_cond.acquire()
89         self.event_cond.wait(timeout)
90         self.event_cond.release()
91
92     def wake_up(self):
93         logger.info('Wake up routine called. Event cond %r'%self.event_cond)
94         self.event_cond.acquire()
95         self.event_cond.notify()
96         self.event_cond.release()
97
98     def load_sync_step_modules(self, step_dir=None):
99         if step_dir is None:
100             if hasattr(Config(), "step_dir"):
101                 step_dir = Config().step_dir
102             else:
103                 step_dir = "/opt/planetstack/observer/steps"
104
105         for fn in os.listdir(step_dir):
106             pathname = os.path.join(step_dir,fn)
107             if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py"):
108                 module = imp.load_source(fn[:-3],pathname)
109                 for classname in dir(module):
110                     c = getattr(module, classname, None)
111
112                     # make sure 'c' is a descendent of SyncStep and has a
113                     # provides field (this eliminates the abstract base classes
114                     # since they don't have a provides)
115
116                     if inspect.isclass(c) and issubclass(c, SyncStep) and hasattr(c,"provides") and (c not in self.sync_steps):
117                         self.sync_steps.append(c)
118         logger.info('loaded sync steps: %s' % ",".join([x.__name__ for x in self.sync_steps]))
119         # print 'loaded sync steps: %s' % ",".join([x.__name__ for x in self.sync_steps])
120
121     def load_sync_steps(self):
122         dep_path = Config().observer_dependency_graph
123         logger.info('Loading model dependency graph from %s' % dep_path)
124         try:
125             # This contains dependencies between records, not sync steps
126             self.model_dependency_graph = json.loads(open(dep_path).read())
127         except Exception,e:
128             raise e
129
130         try:
131             backend_path = Config().observer_pl_dependency_graph
132             logger.info('Loading backend dependency graph from %s' % backend_path)
133             # This contains dependencies between backend records
134             self.backend_dependency_graph = json.loads(open(backend_path).read())
135         except Exception,e:
136             logger.info('Backend dependency graph not loaded')
137             # We can work without a backend graph
138             self.backend_dependency_graph = {}
139
140         provides_dict = {}
141         for s in self.sync_steps:
142             self.step_lookup[s.__name__] = s 
143             for m in s.provides:
144                 try:
145                     provides_dict[m.__name__].append(s.__name__)
146                 except KeyError:
147                     provides_dict[m.__name__]=[s.__name__]
148
149                 
150         step_graph = {}
151         for k,v in self.model_dependency_graph.iteritems():
152             try:
153                 for source in provides_dict[k]:
154                     for m in v:
155                         try:
156                             for dest in provides_dict[m]:
157                                 # no deps, pass
158                                 try:
159                                     step_graph[source].append(dest)
160                                 except:
161                                     step_graph[source]=[dest]
162                         except KeyError:
163                             pass
164                     
165             except KeyError:
166                 pass
167                 # no dependencies, pass
168         
169         #import pdb
170         #pdb.set_trace()
171         if (self.backend_dependency_graph):
172             backend_dict = {}
173             for s in self.sync_steps:
174                 for m in s.serves:
175                     backend_dict[m]=s.__name__
176                     
177             for k,v in backend_dependency_graph.iteritems():
178                 try:
179                     source = backend_dict[k]
180                     for m in v:
181                         try:
182                             dest = backend_dict[m]
183                         except KeyError:
184                             # no deps, pass
185                             pass
186                         step_graph[source]=dest
187                         
188                 except KeyError:
189                     pass
190                     # no dependencies, pass
191
192         dependency_graph = step_graph
193
194         self.ordered_steps = toposort(dependency_graph, map(lambda s:s.__name__,self.sync_steps))
195         print "Order of steps=",self.ordered_steps
196         self.load_run_times()
197         
198
199     def check_duration(self, step, duration):
200         try:
201             if (duration > step.deadline):
202                 logger.info('Sync step %s missed deadline, took %.2f seconds'%(step.name,duration))
203         except AttributeError:
204             # S doesn't have a deadline
205             pass
206
207     def update_run_time(self, step):
208         self.last_run_times[step.__name__]=time.time()
209
210     def check_schedule(self, step):
211         time_since_last_run = time.time() - self.last_run_times.get(step.__name__, 0)
212         try:
213             if (time_since_last_run < step.requested_interval):
214                 raise StepNotReady
215         except AttributeError:
216             logger.info('Step %s does not have requested_interval set'%step.__name__)
217             raise StepNotReady
218     
219     def load_run_times(self):
220         try:
221             jrun_times = open('/tmp/observer_run_times').read()
222             self.last_run_times = json.loads(jrun_times)
223         except:
224             self.last_run_times={}
225             for e in self.ordered_steps:
226                 self.last_run_times[e]=0
227
228
229     def save_run_times(self):
230         run_times = json.dumps(self.last_run_times)
231         open('/tmp/observer_run_times','w').write(run_times)
232
233     def check_class_dependency(self, step, failed_steps):
234         step.dependenices = []
235         for obj in step.provides:
236             step.dependenices.extend(self.model_dependency_graph.get(obj.__name__, []))
237         for failed_step in failed_steps:
238             if (failed_step in step.dependencies):
239                 raise StepNotReady
240
241     def run(self):
242         if not self.driver.enabled or not self.driver.has_openstack:
243             return
244         while True:
245             try:
246                 logger.info('Waiting for event')
247                 tBeforeWait = time.time()
248                 self.wait_for_event(timeout=30)
249                 logger.info('Observer woke up')
250
251                 # Set of whole steps that failed
252                 failed_steps = []
253
254                 # Set of individual objects within steps that failed
255                 failed_step_objects = set()
256
257                 for S in self.ordered_steps:
258                     step = self.step_lookup[S]
259                     start_time=time.time()
260                     
261                     sync_step = step(driver=self.driver)
262                     sync_step.__name__ = step.__name__
263                     sync_step.dependencies = []
264                     try:
265                         mlist = sync_step.provides
266                         
267                         for m in mlist:
268                             sync_step.dependencies.extend(self.model_dependency_graph[m.__name__])
269                     except KeyError:
270                         pass
271                     sync_step.debug_mode = debug_mode
272
273                     should_run = False
274                     try:
275                         # Various checks that decide whether
276                         # this step runs or not
277                         self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
278                         self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour
279                         should_run = True
280                     except StepNotReady:
281                         logging.info('Step not ready: %s'%sync_step.__name__)
282                         failed_steps.append(sync_step)
283                     except:
284                         failed_steps.append(sync_step)
285
286                     if (should_run):
287                         try:
288                             duration=time.time() - start_time
289
290                             logger.info('Executing step %s' % sync_step.__name__)
291
292                             # ********* This is the actual sync step
293                             #import pdb
294                             #pdb.set_trace()
295                             failed_objects = sync_step(failed=list(failed_step_objects))
296
297
298                             self.check_duration(sync_step, duration)
299                             if failed_objects:
300                                 failed_step_objects.update(failed_objects)
301                             self.update_run_time(sync_step)
302                         except:
303                             failed_steps.append(S)
304                 self.save_run_times()
305             except:
306                 logger.log_exc("Exception in observer run loop")
307                 traceback.print_exc()