f4f7f02b1db757f57e4cb80bfcaedd6aab302cf7
[plstackapi.git] / planetstack / observer / event_loop.py
1 import time
2 import traceback
3 import commands
4 import threading
5 import json
6
7 from datetime import datetime
8 from collections import defaultdict
9 from core.models import *
10 from django.db.models import F, Q
11 #from openstack.manager import OpenStackManager
12 from openstack.driver import OpenStackDriver
13 from util.logger import Logger, logging, logger
14 #from timeout import timeout
15 from planetstack.config import Config
16 from observer.steps import *
17
18 debug_mode = False
19
20 logger = Logger(logfile='observer.log', level=logging.INFO)
21
22 class StepNotReady(Exception):
23     pass
24
25 def toposort(g, steps):
26     reverse = {}
27
28     for k,v in g.items():
29         for rk in v:
30             try:
31                 reverse[rk].append(k)
32             except:
33                 reverse[rk]=k
34
35     sources = []
36     for k,v in g.items():
37         if not reverse.has_key(k):
38             sources.append(k)
39
40
41     for k,v in reverse.iteritems():
42         if (not v):
43             sources.append(k)
44
45     order = []
46     marked = []
47
48     while sources:
49         n = sources.pop()
50         try:
51             for m in g[n]:
52                 if m not in marked:
53                     sources.append(m)
54                     marked.append(m)
55         except KeyError:
56             pass
57         order.append(n)
58     return order
59
60 class PlanetStackObserver:
61     sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps,SyncExternalRoutes,SyncUsers,GarbageCollector]
62
63     def __init__(self):
64         # The Condition object that gets signalled by Feefie events
65         self.load_sync_steps()
66         self.event_cond = threading.Condition()
67         self.driver = OpenStackDriver()
68
69     def wait_for_event(self, timeout):
70         self.event_cond.acquire()
71         self.event_cond.wait(timeout)
72         self.event_cond.release()
73         
74     def wake_up(self):
75         logger.info('Wake up routine called. Event cond %r'%self.event_cond)
76         self.event_cond.acquire()
77         self.event_cond.notify()
78         self.event_cond.release()
79
80     def load_sync_steps(self):
81         dep_path = Config().observer_backend_dependency_graph
82         try:
83             # This contains dependencies between records, not sync steps
84             self.model_dependency_graph = json.loads(open(dep_path).read())
85         except Exception,e:
86             raise e
87
88         try:
89             backend_path = Config().observer_pl_dependency_graph
90             # This contains dependencies between backend records
91             self.backend_dependency_graph = json.loads(open(backend_path).read())
92         except Exception,e:
93             # We can work without a backend graph
94             self.backend_dependency_graph = {}
95
96         provides_dict = {}
97         for s in self.sync_steps:
98             for m in s.provides:
99                 try:
100                     provides_dict[m.__name__].append(s.__name__)
101                 except KeyError:
102                     provides_dict[m.__name__]=[s.__name__]
103
104                 
105         step_graph = {}
106         for k,v in self.model_dependency_graph.iteritems():
107             try:
108                 for source in provides_dict[k]:
109                     for m in v:
110                         try:
111                             for dest in provides_dict[m]:
112                                 # no deps, pass
113                                 try:
114                                     step_graph[source].append(dest)
115                                 except:
116                                     step_graph[source]=[dest]
117                         except KeyError:
118                             pass
119                     
120             except KeyError:
121                 pass
122                 # no dependencies, pass
123         
124         #import pdb
125         #pdb.set_trace()
126         if (self.backend_dependency_graph):
127             backend_dict = {}
128             for s in self.sync_steps:
129                 for m in s.serves:
130                     backend_dict[m]=s.__name__
131                     
132             for k,v in backend_dependency_graph.iteritems():
133                 try:
134                     source = backend_dict[k]
135                     for m in v:
136                         try:
137                             dest = backend_dict[m]
138                         except KeyError:
139                             # no deps, pass
140                             pass
141                         step_graph[source]=dest
142                         
143                 except KeyError:
144                     pass
145                     # no dependencies, pass
146
147         dependency_graph = step_graph
148
149         self.ordered_steps = toposort(dependency_graph, self.sync_steps)
150         print "Order of steps=",self.ordered_steps
151         self.load_run_times()
152         
153
154     def check_duration(self):
155         try:
156             if (duration > S.deadline):
157                 logger.info('Sync step %s missed deadline, took %.2f seconds'%(S.name,duration))
158         except AttributeError:
159             # S doesn't have a deadline
160             pass
161
162     def update_run_time(self, step):
163         self.last_run_times[step.name]=time.time()
164
165     def check_schedule(self, step):
166         time_since_last_run = time.time() - self.last_run_times[step.name]
167         try:
168             if (time_since_last_run < step.requested_interval):
169                 raise StepNotReady
170         except AttributeError:
171             logger.info('Step %s does not have requested_interval set'%step.name)
172             raise StepNotReady
173     
174     def load_run_times(self):
175         try:
176             jrun_times = open('/tmp/observer_run_times').read()
177             self.last_run_times = json.loads(jrun_times)
178         except:
179             self.last_run_times={}
180             for e in self.ordered_steps:
181                 self.last_run_times[e]=0
182
183
184
185     def save_run_times(self):
186         run_times = json.dumps(self.last_run_times)
187         open('/tmp/observer_run_times','w').write(run_times)
188
189     def check_class_dependency(self, step, failed_steps):
190         for failed_step in failed_steps:
191             if (failed_step in self.dependency_graph[step.name]):
192                 raise StepNotReady
193
194     def run(self):
195         if not self.driver.enabled or not self.driver.has_openstack:
196             return
197
198         while True:
199             try:
200                 logger.info('Waiting for event')
201                 tBeforeWait = time.time()
202                 self.wait_for_event(timeout=300)
203                 logger.info('Observer woke up')
204
205                 # Set of whole steps that failed
206                 failed_steps = []
207
208                 # Set of individual objects within steps that failed
209                 failed_step_objects = []
210
211                 for S in self.ordered_steps:
212                     start_time=time.time()
213                     
214                     sync_step = S(driver=self.driver)
215                     sync_step.dependencies = self.dependencies[sync_step.name]
216                     sync_step.debug_mode = debug_mode
217
218                     should_run = False
219                     try:
220                         # Various checks that decide whether
221                         # this step runs or not
222                         self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
223                         self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour
224                         should_run = True
225                     except StepNotReady:
226                         logging.info('Step not ready: %s'%sync_step.name)
227                         failed_steps.add(sync_step)
228                     except:
229                         failed_steps.add(sync_step)
230
231                     if (should_run):
232                         try:
233                             duration=time.time() - start_time
234
235                             # ********* This is the actual sync step
236                             failed_objects = sync_step(failed=failed_step_objects)
237
238
239                             check_deadline(sync_step, duration)
240                             failed_step_objects.extend(failed_objects)
241                             self.update_run_time(sync_step)
242                         except:
243                             failed_steps.add(S)
244                 self.save_run_times()
245             except:
246                 logger.log_exc("Exception in observer run loop")
247                 traceback.print_exc()