Steps not in the dependency graph should also be executed
[plstackapi.git] / planetstack / observer / event_loop.py
1 import time
2 import traceback
3 import commands
4 import threading
5 import json
6
7 from datetime import datetime
8 from collections import defaultdict
9 from core.models import *
10 from django.db.models import F, Q
11 #from openstack.manager import OpenStackManager
12 from openstack.driver import OpenStackDriver
13 from util.logger import Logger, logging, logger
14 #from timeout import timeout
15 from planetstack.config import Config
16 from observer.steps import *
17
18 debug_mode = False
19
20 logger = Logger(logfile='observer.log', level=logging.INFO)
21
22 class StepNotReady(Exception):
23     pass
24
25 def toposort(g, steps):
26     reverse = {}
27
28     for k,v in g.items():
29         for rk in v:
30             try:
31                 reverse[rk].append(k)
32             except:
33                 reverse[rk]=k
34
35     sources = []
36     for k,v in g.items():
37         if not reverse.has_key(k):
38             sources.append(k)
39
40
41     for k,v in reverse.iteritems():
42         if (not v):
43             sources.append(k)
44
45     order = []
46     marked = []
47
48     while sources:
49         n = sources.pop()
50         try:
51             for m in g[n]:
52                 if m not in marked:
53                     sources.append(m)
54                     marked.append(m)
55         except KeyError:
56             pass
57         order.append(n)
58
59     order.extend(set(steps)-set(order))
60     return order
61
62 class PlanetStackObserver:
63     sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps,SyncExternalRoutes,SyncUsers,GarbageCollector]
64
65     def __init__(self):
66         # The Condition object that gets signalled by Feefie events
67         self.step_lookup = {}
68         self.load_sync_steps()
69         self.event_cond = threading.Condition()
70         self.driver = OpenStackDriver()
71
72     def wait_for_event(self, timeout):
73         self.event_cond.acquire()
74         self.event_cond.wait(timeout)
75         self.event_cond.release()
76         
77     def wake_up(self):
78         logger.info('Wake up routine called. Event cond %r'%self.event_cond)
79         self.event_cond.acquire()
80         self.event_cond.notify()
81         self.event_cond.release()
82
83     def load_sync_steps(self):
84         dep_path = Config().observer_backend_dependency_graph
85         try:
86             # This contains dependencies between records, not sync steps
87             self.model_dependency_graph = json.loads(open(dep_path).read())
88         except Exception,e:
89             raise e
90
91         try:
92             backend_path = Config().observer_pl_dependency_graph
93             # This contains dependencies between backend records
94             self.backend_dependency_graph = json.loads(open(backend_path).read())
95         except Exception,e:
96             # We can work without a backend graph
97             self.backend_dependency_graph = {}
98
99         provides_dict = {}
100         for s in self.sync_steps:
101             self.step_lookup[s.__name__] = s 
102             for m in s.provides:
103                 try:
104                     provides_dict[m.__name__].append(s.__name__)
105                 except KeyError:
106                     provides_dict[m.__name__]=[s.__name__]
107
108                 
109         step_graph = {}
110         for k,v in self.model_dependency_graph.iteritems():
111             try:
112                 for source in provides_dict[k]:
113                     for m in v:
114                         try:
115                             for dest in provides_dict[m]:
116                                 # no deps, pass
117                                 try:
118                                     step_graph[source].append(dest)
119                                 except:
120                                     step_graph[source]=[dest]
121                         except KeyError:
122                             pass
123                     
124             except KeyError:
125                 pass
126                 # no dependencies, pass
127         
128         #import pdb
129         #pdb.set_trace()
130         if (self.backend_dependency_graph):
131             backend_dict = {}
132             for s in self.sync_steps:
133                 for m in s.serves:
134                     backend_dict[m]=s.__name__
135                     
136             for k,v in backend_dependency_graph.iteritems():
137                 try:
138                     source = backend_dict[k]
139                     for m in v:
140                         try:
141                             dest = backend_dict[m]
142                         except KeyError:
143                             # no deps, pass
144                             pass
145                         step_graph[source]=dest
146                         
147                 except KeyError:
148                     pass
149                     # no dependencies, pass
150
151         dependency_graph = step_graph
152
153         self.ordered_steps = toposort(dependency_graph, map(lambda s:s.__name__,self.sync_steps))
154         print "Order of steps=",self.ordered_steps
155         self.load_run_times()
156         
157
158     def check_duration(self, step, duration):
159         try:
160             if (duration > step.deadline):
161                 logger.info('Sync step %s missed deadline, took %.2f seconds'%(step.name,duration))
162         except AttributeError:
163             # S doesn't have a deadline
164             pass
165
166     def update_run_time(self, step):
167         self.last_run_times[step.__name__]=time.time()
168
169     def check_schedule(self, step):
170         time_since_last_run = time.time() - self.last_run_times[step.__name__]
171         try:
172             if (time_since_last_run < step.requested_interval):
173                 raise StepNotReady
174         except AttributeError:
175             logger.info('Step %s does not have requested_interval set'%step.__name__)
176             raise StepNotReady
177     
178     def load_run_times(self):
179         try:
180             jrun_times = open('/tmp/observer_run_times').read()
181             self.last_run_times = json.loads(jrun_times)
182         except:
183             self.last_run_times={}
184             for e in self.ordered_steps:
185                 self.last_run_times[e]=0
186
187
188
189     def save_run_times(self):
190         run_times = json.dumps(self.last_run_times)
191         open('/tmp/observer_run_times','w').write(run_times)
192
193     def check_class_dependency(self, step, failed_steps):
194         for failed_step in failed_steps:
195             step.dependencies = self.model_dependency_graph.get(step.provides[0].__name__, [])
196             if (failed_step in step.dependencies):
197                 raise StepNotReady
198
199     def run(self):
200         if not self.driver.enabled or not self.driver.has_openstack:
201             return
202         while True:
203             try:
204                 logger.info('Waiting for event')
205                 tBeforeWait = time.time()
206                 self.wait_for_event(timeout=30)
207                 logger.info('Observer woke up')
208
209                 # Set of whole steps that failed
210                 failed_steps = []
211
212                 # Set of individual objects within steps that failed
213                 failed_step_objects = []
214
215                 for S in self.ordered_steps:
216                     step = self.step_lookup[S]
217                     start_time=time.time()
218                     
219                     sync_step = step(driver=self.driver)
220                     sync_step.__name__ = step.__name__
221                     #sync_step.dependencies = self.dependencies[sync_step.name]
222                     sync_step.debug_mode = debug_mode
223
224                     should_run = False
225                     try:
226                         # Various checks that decide whether
227                         # this step runs or not
228                         self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
229                         self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour
230                         should_run = True
231                     except StepNotReady:
232                         logging.info('Step not ready: %s'%sync_step.__name__)
233                         failed_steps.append(sync_step)
234                     except:
235                         failed_steps.append(sync_step)
236
237                     if (should_run):
238                         try:
239                             duration=time.time() - start_time
240
241                             # ********* This is the actual sync step
242                             failed_objects = sync_step(failed=failed_step_objects)
243
244
245                             self.check_duration(sync_step, duration)
246                             if failed_objects:
247                                 failed_step_objects.extend(failed_objects)
248                             self.update_run_time(sync_step)
249                         except:
250                             raise
251                             failed_steps.append(S)
252                 self.save_run_times()
253             except:
254                 logger.log_exc("Exception in observer run loop")
255                 traceback.print_exc()