Wide range of debugging changes
[plstackapi.git] / planetstack / observer / event_loop.py
1 import time
2 import traceback
3 import commands
4 import threading
5 import json
6
7 from datetime import datetime
8 from collections import defaultdict
9 from core.models import *
10 from django.db.models import F, Q
11 from openstack.manager import OpenStackManager
12 from util.logger import Logger, logging, logger
13 #from timeout import timeout
14 from planetstack.config import Config
15 from observer.steps import *
16
17 debug_mode = False
18
19 logger = Logger(logfile='observer.log', level=logging.INFO)
20
21 class StepNotReady(Exception):
22         pass
23
24 def toposort(g, steps):
25         reverse = {}
26
27         for k,v in g.items():
28                 for rk in v:
29                         try:
30                                 reverse[rk].append(k)
31                         except:
32                                 reverse[rk]=k
33
34         sources = []
35         for k,v in g.items():
36                 if not reverse.has_key(k):
37                         sources.append(k)
38
39
40         for k,v in reverse.iteritems():
41                 if (not v):
42                         sources.append(k)
43
44         order = []
45         marked = []
46
47         while sources:
48                 n = sources.pop()
49                 try:
50                         for m in g[n]:
51                                 if m not in marked:
52                                         sources.append(m)
53                                         marked.append(m)
54                 except KeyError:
55                         pass
56                 order.append(n)
57         return order
58
59 class PlanetStackObserver:
60         sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps]
61
62         def __init__(self):
63                 # The Condition object that gets signalled by Feefie events
64                 self.load_sync_steps()
65                 self.event_cond = threading.Condition()
66
67         def wait_for_event(self, timeout):
68                 self.event_cond.acquire()
69                 self.event_cond.wait(timeout)
70                 self.event_cond.release()
71                 
72         def wake_up(self):
73                 logger.info('Wake up routine called. Event cond %r'%self.event_cond)
74                 self.event_cond.acquire()
75                 self.event_cond.notify()
76                 self.event_cond.release()
77
78         def load_sync_steps(self):
79                 dep_path = Config().observer_dependency_path
80                 try:
81                         # This contains dependencies between records, not sync steps
82                         self.model_dependency_graph = json.loads(open(dep_path).read())
83                 except Exception,e:
84                         raise e
85
86                 backend_path = Config().observer_backend_dependency_path
87                 try:
88                         # This contains dependencies between backend records
89                         self.backend_dependency_graph = json.loads(open(backend_path).read())
90                 except Exception,e:
91                         # We can work without a backend graph
92                         self.backend_dependency_graph = {}
93
94                 provides_dict = {}
95                 for s in self.sync_steps:
96                         for m in s.provides:
97                                 try:
98                                         provides_dict[m.__name__].append(s.__name__)
99                                 except KeyError:
100                                         provides_dict[m.__name__]=[s.__name__]
101
102                                 
103                 step_graph = {}
104                 for k,v in self.model_dependency_graph.iteritems():
105                         try:
106                                 for source in provides_dict[k]:
107                                         for m in v:
108                                                 try:
109                                                         for dest in provides_dict[m]:
110                                                                 # no deps, pass
111                                                                 try:
112                                                                         step_graph[source].append(dest)
113                                                                 except:
114                                                                         step_graph[source]=[dest]
115                                                 except KeyError:
116                                                         pass
117                                         
118                         except KeyError:
119                                 pass
120                                 # no dependencies, pass
121                 
122                 import pdb
123                 pdb.set_trace()
124                 if (self.backend_dependency_graph):
125                         backend_dict = {}
126                         for s in sync_steps:
127                                 for m in s.serves:
128                                         backend_dict[m]=s.__name__
129                                         
130                         for k,v in backend_dependency_graph.iteritems():
131                                 try:
132                                         source = backend_dict[k]
133                                         for m in v:
134                                                 try:
135                                                         dest = backend_dict[m]
136                                                 except KeyError:
137                                                         # no deps, pass
138                                                         pass
139                                                 step_graph[source]=dest
140                                                 
141                                 except KeyError:
142                                         pass
143                                         # no dependencies, pass
144
145                 dependency_graph = step_graph
146
147                 self.ordered_steps = toposort(dependency_graph, self.sync_steps)
148                 print "Order of steps=",self.ordered_steps
149                 self.load_run_times()
150                 
151
152         def check_duration(self):
153                 try:
154                         if (duration > S.deadline):
155                                 logger.info('Sync step %s missed deadline, took %.2f seconds'%(S.name,duration))
156                 except AttributeError:
157                         # S doesn't have a deadline
158                         pass
159
160         def update_run_time(self, step):
161                 self.last_run_times[step.name]=time.time()
162
163         def check_schedule(self, step):
164                 time_since_last_run = time.time() - self.last_run_times[step.name]
165                 try:
166                         if (time_since_last_run < step.requested_interval):
167                                 raise StepNotReady
168                 except AttributeError:
169                         logger.info('Step %s does not have requested_interval set'%step.name)
170                         raise StepNotReady
171         
172         def load_run_times(self):
173                 try:
174                         jrun_times = open('/tmp/observer_run_times').read()
175                         self.last_run_times = json.loads(jrun_times)
176                 except:
177                         self.last_run_times={}
178                         for e in self.ordered_steps:
179                                 self.last_run_times[e.name]=0
180
181
182
183         def save_run_times(self):
184                 run_times = json.dumps(self.last_run_times)
185                 open('/tmp/observer_run_times','w').write(run_times)
186
187         def check_class_dependency(self, step, failed_steps):
188                 for failed_step in failed_steps:
189                         if (failed_step in self.dependency_graph[step.name]):
190                                 raise StepNotReady
191
192         def run(self):
193                 if not self.manager.enabled or not self.manager.has_openstack:
194                         return
195
196                 while True:
197                         try:
198                                 logger.info('Waiting for event')
199                                 tBeforeWait = time.time()
200                                 self.wait_for_event(timeout=300)
201                                 logger.info('Observer woke up')
202
203                                 # Set of whole steps that failed
204                                 failed_steps = []
205
206                                 # Set of individual objects within steps that failed
207                                 failed_step_objects = []
208
209                                 for S in self.ordered_steps:
210                                         start_time=time.time()
211                                         
212                                         sync_step = S()
213                                         sync_step.dependencies = self.dependencies[sync_step.name]
214                                         sync_step.debug_mode = debug_mode
215
216                                         should_run = False
217                                         try:
218                                                 # Various checks that decide whether
219                                                 # this step runs or not
220                                                 self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
221                                                 self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour
222                                                 should_run = True
223                                         except StepNotReady:
224                                                 logging.info('Step not ready: %s'%sync_step.name)
225                                                 failed_steps.add(sync_step)
226                                         except:
227                                                 failed_steps.add(sync_step)
228
229                                         if (should_run):
230                                                 try:
231                                                         duration=time.time() - start_time
232
233                                                         # ********* This is the actual sync step
234                                                         failed_objects = sync_step(failed=failed_step_objects)
235
236
237                                                         check_deadline(sync_step, duration)
238                                                         failed_step_objects.extend(failed_objects)
239                                                         self.update_run_time(sync_step)
240                                                 except:
241                                                         failed_steps.add(S)
242                                 self.save_run_times()
243                         except:
244                                 logger.log_exc("Exception in observer run loop")
245                                 traceback.print_exc()