Major bugfix to toposort.
[plstackapi.git] / planetstack / observer / event_loop.py
1 import time
2 import traceback
3 import commands
4 import threading
5 import json
6
7 from datetime import datetime
8 from collections import defaultdict
9 from core.models import *
10 from django.db.models import F, Q
11 #from openstack.manager import OpenStackManager
12 from openstack.driver import OpenStackDriver
13 from util.logger import Logger, logging, logger
14 #from timeout import timeout
15 from planetstack.config import Config
16 from observer.steps import *
17
18 debug_mode = False
19
20 logger = Logger(logfile='observer.log', level=logging.INFO)
21
22 class StepNotReady(Exception):
23     pass
24
25 def toposort(g, steps=None):
26         if (not steps):
27                 keys = set(g.keys())
28                 values = set({})
29                 for v in g.values():
30                         values=values | set(v)
31                 
32                 steps=list(keys|values)
33     reverse = {}
34
35     for k,v in g.items():
36         for rk in v:
37             try:
38                 reverse[rk].append(k)
39             except:
40                 reverse[rk]=k
41
42     sources = []
43     for k,v in g.items():
44         if not reverse.has_key(k):
45             sources.append(k)
46
47
48     for k,v in reverse.iteritems():
49         if (not v):
50             sources.append(k)
51
52     rev_order = []
53     marked = []
54
55     while sources:
56         n = sources.pop()
57         try:
58             for m in g[n]:
59                 if m not in marked:
60                     sources.append(m)
61                     marked.append(m)
62         except KeyError:
63             pass
64                 if (n in steps):
65                         rev_order.append(n)
66
67         order = rev_order.reverse()
68     order.extend(set(steps)-set(order))
69     return order
70
71 class PlanetStackObserver:
72     sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps,SyncExternalRoutes,SyncUsers,GarbageCollector]
73
74     def __init__(self):
75         # The Condition object that gets signalled by Feefie events
76         self.step_lookup = {}
77         self.load_sync_steps()
78         self.event_cond = threading.Condition()
79         self.driver = OpenStackDriver()
80
81     def wait_for_event(self, timeout):
82         self.event_cond.acquire()
83         self.event_cond.wait(timeout)
84         self.event_cond.release()
85         
86     def wake_up(self):
87         logger.info('Wake up routine called. Event cond %r'%self.event_cond)
88         self.event_cond.acquire()
89         self.event_cond.notify()
90         self.event_cond.release()
91
92     def load_sync_steps(self):
93         dep_path = Config().observer_backend_dependency_graph
94         try:
95             # This contains dependencies between records, not sync steps
96             self.model_dependency_graph = json.loads(open(dep_path).read())
97         except Exception,e:
98             raise e
99
100         try:
101             backend_path = Config().observer_pl_dependency_graph
102             # This contains dependencies between backend records
103             self.backend_dependency_graph = json.loads(open(backend_path).read())
104         except Exception,e:
105             # We can work without a backend graph
106             self.backend_dependency_graph = {}
107
108         provides_dict = {}
109         for s in self.sync_steps:
110             self.step_lookup[s.__name__] = s 
111             for m in s.provides:
112                 try:
113                     provides_dict[m.__name__].append(s.__name__)
114                 except KeyError:
115                     provides_dict[m.__name__]=[s.__name__]
116
117                 
118         step_graph = {}
119         for k,v in self.model_dependency_graph.iteritems():
120             try:
121                 for source in provides_dict[k]:
122                     for m in v:
123                         try:
124                             for dest in provides_dict[m]:
125                                 # no deps, pass
126                                 try:
127                                     step_graph[source].append(dest)
128                                 except:
129                                     step_graph[source]=[dest]
130                         except KeyError:
131                             pass
132                     
133             except KeyError:
134                 pass
135                 # no dependencies, pass
136         
137         #import pdb
138         #pdb.set_trace()
139         if (self.backend_dependency_graph):
140             backend_dict = {}
141             for s in self.sync_steps:
142                 for m in s.serves:
143                     backend_dict[m]=s.__name__
144                     
145             for k,v in backend_dependency_graph.iteritems():
146                 try:
147                     source = backend_dict[k]
148                     for m in v:
149                         try:
150                             dest = backend_dict[m]
151                         except KeyError:
152                             # no deps, pass
153                             pass
154                         step_graph[source]=dest
155                         
156                 except KeyError:
157                     pass
158                     # no dependencies, pass
159
160         dependency_graph = step_graph
161
162         self.ordered_steps = toposort(dependency_graph, map(lambda s:s.__name__,self.sync_steps))
163         print "Order of steps=",self.ordered_steps
164         self.load_run_times()
165         
166
167     def check_duration(self, step, duration):
168         try:
169             if (duration > step.deadline):
170                 logger.info('Sync step %s missed deadline, took %.2f seconds'%(step.name,duration))
171         except AttributeError:
172             # S doesn't have a deadline
173             pass
174
175     def update_run_time(self, step):
176         self.last_run_times[step.__name__]=time.time()
177
178     def check_schedule(self, step):
179         time_since_last_run = time.time() - self.last_run_times[step.__name__]
180         try:
181             if (time_since_last_run < step.requested_interval):
182                 raise StepNotReady
183         except AttributeError:
184             logger.info('Step %s does not have requested_interval set'%step.__name__)
185             raise StepNotReady
186     
187     def load_run_times(self):
188         try:
189             jrun_times = open('/tmp/observer_run_times').read()
190             self.last_run_times = json.loads(jrun_times)
191         except:
192             self.last_run_times={}
193             for e in self.ordered_steps:
194                 self.last_run_times[e]=0
195
196
197
198     def save_run_times(self):
199         run_times = json.dumps(self.last_run_times)
200         open('/tmp/observer_run_times','w').write(run_times)
201
202     def check_class_dependency(self, step, failed_steps):
203         for failed_step in failed_steps:
204             step.dependencies = self.model_dependency_graph.get(step.provides[0].__name__, [])
205             if (failed_step in step.dependencies):
206                 raise StepNotReady
207
208     def run(self):
209         if not self.driver.enabled or not self.driver.has_openstack:
210             return
211         while True:
212             try:
213                 logger.info('Waiting for event')
214                 tBeforeWait = time.time()
215                 self.wait_for_event(timeout=30)
216                 logger.info('Observer woke up')
217
218                 # Set of whole steps that failed
219                 failed_steps = []
220
221                 # Set of individual objects within steps that failed
222                 failed_step_objects = []
223
224                 for S in self.ordered_steps:
225                     step = self.step_lookup[S]
226                     start_time=time.time()
227                     
228                     sync_step = step(driver=self.driver)
229                     sync_step.__name__ = step.__name__
230                     sync_step.dependencies = []
231                     try:
232                         mlist = sync_step.provides
233                         
234                         for m in mlist:
235                             sync_step.dependencies.extend(self.model_dependency_graph[m.__name__])
236                     except KeyError:
237                         pass
238                     sync_step.debug_mode = debug_mode
239
240                     should_run = False
241                     try:
242                         # Various checks that decide whether
243                         # this step runs or not
244                         self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
245                         self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour
246                         should_run = True
247                     except StepNotReady:
248                         logging.info('Step not ready: %s'%sync_step.__name__)
249                         failed_steps.append(sync_step)
250                     except:
251                         failed_steps.append(sync_step)
252
253                     if (should_run):
254                         try:
255                             duration=time.time() - start_time
256
257                             # ********* This is the actual sync step
258                             import pdb
259                             pdb.set_trace()
260                             failed_objects = sync_step(failed=failed_step_objects)
261
262
263                             self.check_duration(sync_step, duration)
264                             if failed_objects:
265                                 failed_step_objects.extend(failed_objects)
266                             self.update_run_time(sync_step)
267                         except:
268                             raise
269                             failed_steps.append(S)
270                 self.save_run_times()
271             except:
272                 logger.log_exc("Exception in observer run loop")
273                 traceback.print_exc()