f67e2ebfb19a1b4962b00287fab46bc1b4eb68b2
[plstackapi.git] / planetstack / observer / event_loop.py
1 import time
2 import traceback
3 import commands
4 import threading
5 import json
6
7 from datetime import datetime
8 from collections import defaultdict
9 from core.models import *
10 from django.db.models import F, Q
11 #from openstack.manager import OpenStackManager
12 from openstack.driver import OpenStackDriver
13 from util.logger import Logger, logging, logger
14 #from timeout import timeout
15 from planetstack.config import Config
16 from observer.steps import *
17
18 debug_mode = False
19
20 logger = Logger(logfile='observer.log', level=logging.INFO)
21
22 class StepNotReady(Exception):
23         pass
24
25 def toposort(g, steps=None):
26         if (not steps):
27                 keys = set(g.keys())
28                 values = set({})
29                 for v in g.values():
30                         values=values | set(v)
31                 
32                 steps=list(keys|values)
33         reverse = {}
34
35         for k,v in g.items():
36                 for rk in v:
37                         try:
38                                 reverse[rk].append(k)
39                         except:
40                                 reverse[rk]=k
41
42         sources = []
43         for k,v in g.items():
44                 if not reverse.has_key(k):
45                         sources.append(k)
46
47
48         for k,v in reverse.iteritems():
49                 if (not v):
50                         sources.append(k)
51
52         order = []
53         marked = []
54
55         while sources:
56                 n = sources.pop()
57                 try:
58                         for m in g[n]:
59                                 if m not in marked:
60                                         sources.append(m)
61                                         marked.append(m)
62                 except KeyError:
63                         pass
64                 if (n in steps):
65                         order.append(n)
66
67         order.reverse()
68         order.extend(set(steps)-set(order))
69         return order
70
71 class PlanetStackObserver:
72         sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps,SyncExternalRoutes,SyncUsers,GarbageCollector]
73
74         def __init__(self):
75                 # The Condition object that gets signalled by Feefie events
76                 self.step_lookup = {}
77                 self.load_sync_steps()
78                 self.event_cond = threading.Condition()
79                 self.driver = OpenStackDriver()
80
81         def wait_for_event(self, timeout):
82                 self.event_cond.acquire()
83                 self.event_cond.wait(timeout)
84                 self.event_cond.release()
85                 
86         def wake_up(self):
87                 logger.info('Wake up routine called. Event cond %r'%self.event_cond)
88                 self.event_cond.acquire()
89                 self.event_cond.notify()
90                 self.event_cond.release()
91
92         def load_sync_steps(self):
93                 dep_path = Config().observer_backend_dependency_graph
94                 try:
95                         # This contains dependencies between records, not sync steps
96                         self.model_dependency_graph = json.loads(open(dep_path).read())
97                 except Exception,e:
98                         raise e
99
100                 try:
101                         backend_path = Config().observer_pl_dependency_graph
102                         # This contains dependencies between backend records
103                         self.backend_dependency_graph = json.loads(open(backend_path).read())
104                 except Exception,e:
105                         # We can work without a backend graph
106                         self.backend_dependency_graph = {}
107
108                 provides_dict = {}
109                 for s in self.sync_steps:
110                         self.step_lookup[s.__name__] = s 
111                         for m in s.provides:
112                                 try:
113                                         provides_dict[m.__name__].append(s.__name__)
114                                 except KeyError:
115                                         provides_dict[m.__name__]=[s.__name__]
116
117                                 
118                 step_graph = {}
119                 for k,v in self.model_dependency_graph.iteritems():
120                         try:
121                                 for source in provides_dict[k]:
122                                         for m in v:
123                                                 try:
124                                                         for dest in provides_dict[m]:
125                                                                 # no deps, pass
126                                                                 try:
127                                                                         step_graph[source].append(dest)
128                                                                 except:
129                                                                         step_graph[source]=[dest]
130                                                 except KeyError:
131                                                         pass
132                                         
133                         except KeyError:
134                                 pass
135                                 # no dependencies, pass
136                 
137                 #import pdb
138                 #pdb.set_trace()
139                 if (self.backend_dependency_graph):
140                         backend_dict = {}
141                         for s in self.sync_steps:
142                                 for m in s.serves:
143                                         backend_dict[m]=s.__name__
144                                         
145                         for k,v in backend_dependency_graph.iteritems():
146                                 try:
147                                         source = backend_dict[k]
148                                         for m in v:
149                                                 try:
150                                                         dest = backend_dict[m]
151                                                 except KeyError:
152                                                         # no deps, pass
153                                                         pass
154                                                 step_graph[source]=dest
155                                                 
156                                 except KeyError:
157                                         pass
158                                         # no dependencies, pass
159
160                 dependency_graph = step_graph
161
162                 self.ordered_steps = toposort(dependency_graph, map(lambda s:s.__name__,self.sync_steps))
163                 print "Order of steps=",self.ordered_steps
164                 self.load_run_times()
165                 
166
167         def check_duration(self, step, duration):
168                 try:
169                         if (duration > step.deadline):
170                                 logger.info('Sync step %s missed deadline, took %.2f seconds'%(step.name,duration))
171                 except AttributeError:
172                         # S doesn't have a deadline
173                         pass
174
175         def update_run_time(self, step):
176                 self.last_run_times[step.__name__]=time.time()
177
178         def check_schedule(self, step):
179                 time_since_last_run = time.time() - self.last_run_times.get(step.__name__, 0)
180                 try:
181                         if (time_since_last_run < step.requested_interval):
182                                 raise StepNotReady
183                 except AttributeError:
184                         logger.info('Step %s does not have requested_interval set'%step.__name__)
185                         raise StepNotReady
186         
187         def load_run_times(self):
188                 try:
189                         jrun_times = open('/tmp/observer_run_times').read()
190                         self.last_run_times = json.loads(jrun_times)
191                 except:
192                         self.last_run_times={}
193                         for e in self.ordered_steps:
194                                 self.last_run_times[e]=0
195
196
197         def save_run_times(self):
198                 run_times = json.dumps(self.last_run_times)
199                 open('/tmp/observer_run_times','w').write(run_times)
200
201         def check_class_dependency(self, step, failed_steps):
202         step.dependenices = []
203         for obj in step.provides:
204             step.dependenices.extend(self.model_dependency_graph.get(obj.__name__, []))
205         for failed_step in failed_steps:
206             if (failed_step in step.dependencies):
207                 raise StepNotReady
208
209         def run(self):
210                 if not self.driver.enabled or not self.driver.has_openstack:
211                         return
212                 while True:
213                         try:
214                                 logger.info('Waiting for event')
215                                 tBeforeWait = time.time()
216                                 self.wait_for_event(timeout=30)
217                                 logger.info('Observer woke up')
218
219                                 # Set of whole steps that failed
220                                 failed_steps = []
221
222                                 # Set of individual objects within steps that failed
223                                 failed_step_objects = []
224
225                                 for S in self.ordered_steps:
226                                         step = self.step_lookup[S]
227                                         start_time=time.time()
228                                         
229                                         sync_step = step(driver=self.driver)
230                                         sync_step.__name__ = step.__name__
231                                         sync_step.dependencies = []
232                                         try:
233                                                 mlist = sync_step.provides
234                                                 
235                                                 for m in mlist:
236                                                         sync_step.dependencies.extend(self.model_dependency_graph[m.__name__])
237                                         except KeyError:
238                                                 pass
239                                         sync_step.debug_mode = debug_mode
240
241                                         should_run = False
242                                         try:
243                                                 # Various checks that decide whether
244                                                 # this step runs or not
245                                                 self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
246                                                 self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour
247                                                 should_run = True
248                                         except StepNotReady:
249                                                 logging.info('Step not ready: %s'%sync_step.__name__)
250                                                 failed_steps.append(sync_step)
251                                         except:
252                                                 failed_steps.append(sync_step)
253
254                                         if (should_run):
255                                                 try:
256                                                         duration=time.time() - start_time
257
258                                                         # ********* This is the actual sync step
259                                                         #import pdb
260                                                         #pdb.set_trace()
261                                                         failed_objects = sync_step(failed=failed_step_objects)
262
263
264                                                         self.check_duration(sync_step, duration)
265                                                         if failed_objects:
266                                                                 failed_step_objects.extend(failed_objects)
267                                                         self.update_run_time(sync_step)
268                                                 except:
269                                                         raise
270                                                         failed_steps.append(S)
271                                 self.save_run_times()
272                         except:
273                                 logger.log_exc("Exception in observer run loop")
274                                 traceback.print_exc()