27475ca57f316802bded5a7408a1561c8a8902e0
[plstackapi.git] / planetstack / observer / event_loop.py
1 import time
2 import traceback
3 import commands
4 import threading
5 import json
6
7 from datetime import datetime
8 from collections import defaultdict
9 from core.models import *
10 from django.db.models import F, Q
11 #from openstack.manager import OpenStackManager
12 from openstack.driver import OpenStackDriver
13 from util.logger import Logger, logging, logger
14 #from timeout import timeout
15 from planetstack.config import Config
16 from observer.steps import *
17
18 debug_mode = False
19
20 logger = Logger(level=logging.INFO)
21
22 class StepNotReady(Exception):
23     pass
24
25 def toposort(g, steps=None):
26     if (not steps):
27         keys = set(g.keys())
28         values = set({})
29         for v in g.values():
30             values=values | set(v)
31         
32         steps=list(keys|values)
33     reverse = {}
34
35     for k,v in g.items():
36         for rk in v:
37             try:
38                 reverse[rk].append(k)
39             except:
40                 reverse[rk]=k
41
42     sources = []
43     for k,v in g.items():
44         if not reverse.has_key(k):
45             sources.append(k)
46
47
48     for k,v in reverse.iteritems():
49         if (not v):
50             sources.append(k)
51
52     order = []
53     marked = []
54
55     while sources:
56         n = sources.pop()
57         try:
58             for m in g[n]:
59                 if m not in marked:
60                     sources.append(m)
61                     marked.append(m)
62         except KeyError:
63             pass
64         if (n in steps):
65             order.append(n)
66
67     order.reverse()
68     order.extend(set(steps)-set(order))
69     return order
70
71 class PlanetStackObserver:
72     sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps,SyncExternalRoutes,SyncUsers,SyncRoles,SyncNodes,SyncImages,GarbageCollector]
73
74     def __init__(self):
75         # The Condition object that gets signalled by Feefie events
76         self.step_lookup = {}
77         self.load_sync_steps()
78         self.event_cond = threading.Condition()
79         self.driver = OpenStackDriver()
80
81     def wait_for_event(self, timeout):
82         self.event_cond.acquire()
83         self.event_cond.wait(timeout)
84         self.event_cond.release()
85         
86     def wake_up(self):
87         logger.info('Wake up routine called. Event cond %r'%self.event_cond)
88         self.event_cond.acquire()
89         self.event_cond.notify()
90         self.event_cond.release()
91
92     def load_sync_steps(self):
93         dep_path = Config().observer_dependency_graph
94         try:
95             # This contains dependencies between records, not sync steps
96             self.model_dependency_graph = json.loads(open(dep_path).read())
97         except Exception,e:
98             raise e
99
100         try:
101             backend_path = Config().observer_pl_dependency_graph
102             # This contains dependencies between backend records
103             self.backend_dependency_graph = json.loads(open(backend_path).read())
104         except Exception,e:
105             # We can work without a backend graph
106             self.backend_dependency_graph = {}
107
108         provides_dict = {}
109         for s in self.sync_steps:
110             self.step_lookup[s.__name__] = s 
111             for m in s.provides:
112                 try:
113                     provides_dict[m.__name__].append(s.__name__)
114                 except KeyError:
115                     provides_dict[m.__name__]=[s.__name__]
116
117                 
118         step_graph = {}
119         for k,v in self.model_dependency_graph.iteritems():
120             try:
121                 for source in provides_dict[k]:
122                     for m in v:
123                         try:
124                             for dest in provides_dict[m]:
125                                 # no deps, pass
126                                 try:
127                                     step_graph[source].append(dest)
128                                 except:
129                                     step_graph[source]=[dest]
130                         except KeyError:
131                             pass
132                     
133             except KeyError:
134                 pass
135                 # no dependencies, pass
136         
137         #import pdb
138         #pdb.set_trace()
139         if (self.backend_dependency_graph):
140             backend_dict = {}
141             for s in self.sync_steps:
142                 for m in s.serves:
143                     backend_dict[m]=s.__name__
144                     
145             for k,v in backend_dependency_graph.iteritems():
146                 try:
147                     source = backend_dict[k]
148                     for m in v:
149                         try:
150                             dest = backend_dict[m]
151                         except KeyError:
152                             # no deps, pass
153                             pass
154                         step_graph[source]=dest
155                         
156                 except KeyError:
157                     pass
158                     # no dependencies, pass
159
160         dependency_graph = step_graph
161
162         self.ordered_steps = toposort(dependency_graph, map(lambda s:s.__name__,self.sync_steps))
163         print "Order of steps=",self.ordered_steps
164         self.load_run_times()
165         
166
167     def check_duration(self, step, duration):
168         try:
169             if (duration > step.deadline):
170                 logger.info('Sync step %s missed deadline, took %.2f seconds'%(step.name,duration))
171         except AttributeError:
172             # S doesn't have a deadline
173             pass
174
175     def update_run_time(self, step):
176         self.last_run_times[step.__name__]=time.time()
177
178     def check_schedule(self, step):
179         time_since_last_run = time.time() - self.last_run_times.get(step.__name__, 0)
180         try:
181             if (time_since_last_run < step.requested_interval):
182                 raise StepNotReady
183         except AttributeError:
184             logger.info('Step %s does not have requested_interval set'%step.__name__)
185             raise StepNotReady
186     
187     def load_run_times(self):
188         try:
189             jrun_times = open('/tmp/observer_run_times').read()
190             self.last_run_times = json.loads(jrun_times)
191         except:
192             self.last_run_times={}
193             for e in self.ordered_steps:
194                 self.last_run_times[e]=0
195
196
197     def save_run_times(self):
198         run_times = json.dumps(self.last_run_times)
199         open('/tmp/observer_run_times','w').write(run_times)
200
201     def check_class_dependency(self, step, failed_steps):
202         step.dependenices = []
203         for obj in step.provides:
204             step.dependenices.extend(self.model_dependency_graph.get(obj.__name__, []))
205         for failed_step in failed_steps:
206             if (failed_step in step.dependencies):
207                 raise StepNotReady
208
209     def run(self):
210         if not self.driver.enabled or not self.driver.has_openstack:
211             return
212         while True:
213             try:
214                 logger.info('Waiting for event')
215                 tBeforeWait = time.time()
216                 self.wait_for_event(timeout=30)
217                 logger.info('Observer woke up')
218
219                 # Set of whole steps that failed
220                 failed_steps = []
221
222                 # Set of individual objects within steps that failed
223                 failed_step_objects = set()
224
225                 for S in self.ordered_steps:
226                     step = self.step_lookup[S]
227                     start_time=time.time()
228                     
229                     sync_step = step(driver=self.driver)
230                     sync_step.__name__ = step.__name__
231                     sync_step.dependencies = []
232                     try:
233                         mlist = sync_step.provides
234                         
235                         for m in mlist:
236                             sync_step.dependencies.extend(self.model_dependency_graph[m.__name__])
237                     except KeyError:
238                         pass
239                     sync_step.debug_mode = debug_mode
240
241                     should_run = False
242                     try:
243                         # Various checks that decide whether
244                         # this step runs or not
245                         self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
246                         self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour
247                         should_run = True
248                     except StepNotReady:
249                         logging.info('Step not ready: %s'%sync_step.__name__)
250                         failed_steps.append(sync_step)
251                     except:
252                         failed_steps.append(sync_step)
253
254                     if (should_run):
255                         try:
256                             duration=time.time() - start_time
257
258                             # ********* This is the actual sync step
259                             #import pdb
260                             #pdb.set_trace()
261                             failed_objects = sync_step(failed=list(failed_step_objects))
262
263
264                             self.check_duration(sync_step, duration)
265                             if failed_objects:
266                                 failed_step_objects.update(failed_objects)
267                             self.update_run_time(sync_step)
268                         except:
269                             failed_steps.append(S)
270                 self.save_run_times()
271             except:
272                 logger.log_exc("Exception in observer run loop")
273                 traceback.print_exc()