fix bugs
[plstackapi.git] / planetstack / observer / event_loop.py
1 import time
2 import traceback
3 import commands
4 import threading
5 import json
6
7 from datetime import datetime
8 from collections import defaultdict
9 from core.models import *
10 from django.db.models import F, Q
11 #from openstack.manager import OpenStackManager
12 from openstack.driver import OpenStackDriver
13 from util.logger import Logger, logging, logger
14 #from timeout import timeout
15 from planetstack.config import Config
16 from observer.steps import *
17
18 debug_mode = False
19
20 logger = Logger(logfile='observer.log', level=logging.INFO)
21
22 class StepNotReady(Exception):
23     pass
24
25 def toposort(g, steps):
26     reverse = {}
27
28     for k,v in g.items():
29         for rk in v:
30             try:
31                 reverse[rk].append(k)
32             except:
33                 reverse[rk]=k
34
35     sources = []
36     for k,v in g.items():
37         if not reverse.has_key(k):
38             sources.append(k)
39
40
41     for k,v in reverse.iteritems():
42         if (not v):
43             sources.append(k)
44
45     order = []
46     marked = []
47
48     while sources:
49         n = sources.pop()
50         try:
51             for m in g[n]:
52                 if m not in marked:
53                     sources.append(m)
54                     marked.append(m)
55         except KeyError:
56             pass
57         order.append(n)
58     return order
59
60 class PlanetStackObserver:
61     sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps,SyncExternalRoutes,SyncUsers,GarbageCollector]
62
63     def __init__(self):
64         # The Condition object that gets signalled by Feefie events
65         self.step_lookup = {}
66         self.load_sync_steps()
67         self.event_cond = threading.Condition()
68         self.driver = OpenStackDriver()
69
70     def wait_for_event(self, timeout):
71         self.event_cond.acquire()
72         self.event_cond.wait(timeout)
73         self.event_cond.release()
74         
75     def wake_up(self):
76         logger.info('Wake up routine called. Event cond %r'%self.event_cond)
77         self.event_cond.acquire()
78         self.event_cond.notify()
79         self.event_cond.release()
80
81     def load_sync_steps(self):
82         dep_path = Config().observer_backend_dependency_graph
83         try:
84             # This contains dependencies between records, not sync steps
85             self.model_dependency_graph = json.loads(open(dep_path).read())
86         except Exception,e:
87             raise e
88
89         try:
90             backend_path = Config().observer_pl_dependency_graph
91             # This contains dependencies between backend records
92             self.backend_dependency_graph = json.loads(open(backend_path).read())
93         except Exception,e:
94             # We can work without a backend graph
95             self.backend_dependency_graph = {}
96
97         provides_dict = {}
98         for s in self.sync_steps:
99             self.step_lookup[s.__name__] = s
100             for m in s.provides:
101                 try:
102                     provides_dict[m.__name__].append(s.__name__)
103                 except KeyError:
104                     provides_dict[m.__name__]=[s.__name__]
105
106                 
107         step_graph = {}
108         for k,v in self.model_dependency_graph.iteritems():
109             try:
110                 for source in provides_dict[k]:
111                     for m in v:
112                         try:
113                             for dest in provides_dict[m]:
114                                 # no deps, pass
115                                 try:
116                                     step_graph[source].append(dest)
117                                 except:
118                                     step_graph[source]=[dest]
119                         except KeyError:
120                             pass
121                     
122             except KeyError:
123                 pass
124                 # no dependencies, pass
125         
126         #import pdb
127         #pdb.set_trace()
128         if (self.backend_dependency_graph):
129             backend_dict = {}
130             for s in self.sync_steps:
131                 for m in s.serves:
132                     backend_dict[m]=s.__name__
133                     
134             for k,v in backend_dependency_graph.iteritems():
135                 try:
136                     source = backend_dict[k]
137                     for m in v:
138                         try:
139                             dest = backend_dict[m]
140                         except KeyError:
141                             # no deps, pass
142                             pass
143                         step_graph[source]=dest
144                         
145                 except KeyError:
146                     pass
147                     # no dependencies, pass
148
149         dependency_graph = step_graph
150
151         self.ordered_steps = toposort(dependency_graph, self.sync_steps)
152         print "Order of steps=",self.ordered_steps
153         self.load_run_times()
154         
155
156     def check_duration(self, step, duration):
157         try:
158             if (duration > step.deadline):
159                 logger.info('Sync step %s missed deadline, took %.2f seconds'%(step.name,duration))
160         except AttributeError:
161             # S doesn't have a deadline
162             pass
163
164     def update_run_time(self, step):
165         self.last_run_times[step.__name__]=time.time()
166
167     def check_schedule(self, step):
168         time_since_last_run = time.time() - self.last_run_times[step.__name__]
169         try:
170             if (time_since_last_run < step.requested_interval):
171                 raise StepNotReady
172         except AttributeError:
173             logger.info('Step %s does not have requested_interval set'%step.__name__)
174             raise StepNotReady
175     
176     def load_run_times(self):
177         try:
178             jrun_times = open('/tmp/observer_run_times').read()
179             self.last_run_times = json.loads(jrun_times)
180         except:
181             self.last_run_times={}
182             for e in self.ordered_steps:
183                 self.last_run_times[e]=0
184
185
186
187     def save_run_times(self):
188         run_times = json.dumps(self.last_run_times)
189         open('/tmp/observer_run_times','w').write(run_times)
190
191     def check_class_dependency(self, step, failed_steps):
192         for failed_step in failed_steps:
193             step.dependencies = self.model_dependency_graph.get(step.provides[0].__name__, [])
194             if (failed_step in step.dependencies):
195                 raise StepNotReady
196
197     def run(self):
198         if not self.driver.enabled or not self.driver.has_openstack:
199             return
200
201         while True:
202             try:
203                 logger.info('Waiting for event')
204                 tBeforeWait = time.time()
205                 self.wait_for_event(timeout=300)
206                 logger.info('Observer woke up')
207
208                 # Set of whole steps that failed
209                 failed_steps = []
210
211                 # Set of individual objects within steps that failed
212                 failed_step_objects = []
213
214                 for S in self.ordered_steps:
215                     step = self.step_lookup[S]
216                     start_time=time.time()
217                     
218                     sync_step = step(driver=self.driver)
219                     sync_step.__name__ = step.__name__    
220                     #sync_step.dependencies = self.dependencies[sync_step.name]
221                     sync_step.debug_mode = debug_mode
222
223                     should_run = False
224                     try:
225                         # Various checks that decide whether
226                         # this step runs or not
227                         self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
228                         self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour
229                         should_run = True
230                     except StepNotReady:
231                         logging.info('Step not ready: %s'%sync_step.__name__)
232                         failed_steps.append(sync_step)
233                     except:
234                         failed_steps.append(sync_step)
235
236                     if (should_run):
237                         try:
238                             duration=time.time() - start_time
239
240                             # ********* This is the actual sync step
241                             failed_objects = sync_step(failed=failed_step_objects)
242
243
244                             self.check_duration(sync_step, duration)
245                             if failed_objects:
246                                 failed_step_objects.extend(failed_objects)
247                             self.update_run_time(sync_step)
248                         except:
249                             failed_steps.append(S)
250                 self.save_run_times()
251             except:
252                 logger.log_exc("Exception in observer run loop")
253                 traceback.print_exc()