start working on xoslib objects
[plstackapi.git] / planetstack / observer / event_loop.py
1 import os
2 import imp
3 import inspect
4 import time
5 import traceback
6 import commands
7 import threading
8 import json
9
10 from datetime import datetime
11 from collections import defaultdict
12 from core.models import *
13 from django.db.models import F, Q
14 #from openstack.manager import OpenStackManager
15 from openstack.driver import OpenStackDriver
16 from util.logger import Logger, logging, logger
17 #from timeout import timeout
18 from planetstack.config import Config
19 #from observer.steps import *
20 from syncstep import SyncStep
21 from toposort import toposort
22
23 debug_mode = False
24
25 logger = Logger(level=logging.INFO)
26
27 class StepNotReady(Exception):
28     pass
29
30 class NoOpDriver:
31     def __init__(self):
32          self.enabled = True
33
34 class PlanetStackObserver:
35     #sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps,SyncExternalRoutes,SyncUsers,SyncRoles,SyncNodes,SyncImages,GarbageCollector]
36     sync_steps = []
37
38     def __init__(self):
39         # The Condition object that gets signalled by Feefie events
40         self.step_lookup = {}
41         self.load_sync_step_modules()
42         self.load_sync_steps()
43         self.event_cond = threading.Condition()
44
45
46         self.driver_kind = getattr(Config(), "observer_driver", "openstack")
47         if self.driver_kind=="openstack":
48             self.driver = OpenStackDriver()
49         else:
50             self.driver = NoOpDriver()
51
52     def wait_for_event(self, timeout):
53         logger.info('Waiting for event')
54         self.event_cond.acquire()
55         self.event_cond.wait(timeout)
56         self.event_cond.release()
57
58     def wake_up(self):
59         logger.info('Wake up routine called. Event cond %r'%self.event_cond)
60         self.event_cond.acquire()
61         self.event_cond.notify()
62         self.event_cond.release()
63
64     def load_sync_step_modules(self, step_dir=None):
65         if step_dir is None:
66             if hasattr(Config(), "observer_steps_dir"):
67                 step_dir = Config().observer_steps_dir
68             else:
69                 step_dir = "/opt/planetstack/observer/steps"
70
71         for fn in os.listdir(step_dir):
72             pathname = os.path.join(step_dir,fn)
73             if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py"):
74                 module = imp.load_source(fn[:-3],pathname)
75                 for classname in dir(module):
76                     c = getattr(module, classname, None)
77
78                     # make sure 'c' is a descendent of SyncStep and has a
79                     # provides field (this eliminates the abstract base classes
80                     # since they don't have a provides)
81
82                     if inspect.isclass(c) and issubclass(c, SyncStep) and hasattr(c,"provides") and (c not in self.sync_steps):
83                         self.sync_steps.append(c)
84         logger.info('loaded sync steps: %s' % ",".join([x.__name__ for x in self.sync_steps]))
85         # print 'loaded sync steps: %s' % ",".join([x.__name__ for x in self.sync_steps])
86
87     def load_sync_steps(self):
88         dep_path = Config().observer_dependency_graph
89         logger.info('Loading model dependency graph from %s' % dep_path)
90         try:
91             # This contains dependencies between records, not sync steps
92             self.model_dependency_graph = json.loads(open(dep_path).read())
93         except Exception,e:
94             raise e
95
96         try:
97             backend_path = Config().observer_pl_dependency_graph
98             logger.info('Loading backend dependency graph from %s' % backend_path)
99             # This contains dependencies between backend records
100             self.backend_dependency_graph = json.loads(open(backend_path).read())
101         except Exception,e:
102             logger.info('Backend dependency graph not loaded')
103             # We can work without a backend graph
104             self.backend_dependency_graph = {}
105
106         provides_dict = {}
107         for s in self.sync_steps:
108             self.step_lookup[s.__name__] = s 
109             for m in s.provides:
110                 try:
111                     provides_dict[m.__name__].append(s.__name__)
112                 except KeyError:
113                     provides_dict[m.__name__]=[s.__name__]
114
115                 
116         step_graph = {}
117         for k,v in self.model_dependency_graph.iteritems():
118             try:
119                 for source in provides_dict[k]:
120                     for m in v:
121                         try:
122                             for dest in provides_dict[m]:
123                                 # no deps, pass
124                                 try:
125                                     if (dest not in step_graph[source]):
126                                         step_graph[source].append(dest)
127                                 except:
128                                     step_graph[source]=[dest]
129                         except KeyError:
130                             pass
131                     
132             except KeyError:
133                 pass
134                 # no dependencies, pass
135         
136         #import pdb
137         #pdb.set_trace()
138         if (self.backend_dependency_graph):
139             backend_dict = {}
140             for s in self.sync_steps:
141                 for m in s.serves:
142                     backend_dict[m]=s.__name__
143                     
144             for k,v in backend_dependency_graph.iteritems():
145                 try:
146                     source = backend_dict[k]
147                     for m in v:
148                         try:
149                             dest = backend_dict[m]
150                         except KeyError:
151                             # no deps, pass
152                             pass
153                         step_graph[source]=dest
154                         
155                 except KeyError:
156                     pass
157                     # no dependencies, pass
158
159         dependency_graph = step_graph
160
161         self.ordered_steps = toposort(dependency_graph, map(lambda s:s.__name__,self.sync_steps))
162         print "Order of steps=",self.ordered_steps
163         self.load_run_times()
164         
165
166     def check_duration(self, step, duration):
167         try:
168             if (duration > step.deadline):
169                 logger.info('Sync step %s missed deadline, took %.2f seconds'%(step.name,duration))
170         except AttributeError:
171             # S doesn't have a deadline
172             pass
173
174     def update_run_time(self, step):
175         self.last_run_times[step.__name__]=time.time()
176
177     def check_schedule(self, step):
178         time_since_last_run = time.time() - self.last_run_times.get(step.__name__, 0)
179         try:
180             if (time_since_last_run < step.requested_interval):
181                 raise StepNotReady
182         except AttributeError:
183             logger.info('Step %s does not have requested_interval set'%step.__name__)
184             raise StepNotReady
185     
186     def load_run_times(self):
187         try:
188             jrun_times = open('/tmp/observer_run_times').read()
189             self.last_run_times = json.loads(jrun_times)
190         except:
191             self.last_run_times={}
192             for e in self.ordered_steps:
193                 self.last_run_times[e]=0
194
195
196     def save_run_times(self):
197         run_times = json.dumps(self.last_run_times)
198         open('/tmp/observer_run_times','w').write(run_times)
199
200     def check_class_dependency(self, step, failed_steps):
201         step.dependenices = []
202         for obj in step.provides:
203             step.dependenices.extend(self.model_dependency_graph.get(obj.__name__, []))
204         for failed_step in failed_steps:
205             if (failed_step in step.dependencies):
206                 raise StepNotReady
207
208
209     def run_steps(self):
210         try:
211             logger.info('Observer run steps')
212
213             # Set of whole steps that failed
214             failed_steps = []
215
216             # Set of individual objects within steps that failed
217             failed_step_objects = set()
218
219             for S in self.ordered_steps:
220                 step = self.step_lookup[S]
221                 start_time=time.time()
222                 
223                 sync_step = step(driver=self.driver)
224                 sync_step.__name__ = step.__name__
225                 sync_step.dependencies = []
226                 try:
227                     mlist = sync_step.provides
228                     
229                     for m in mlist:
230                         sync_step.dependencies.extend(self.model_dependency_graph[m.__name__])
231                 except KeyError:
232                     pass
233                 sync_step.debug_mode = debug_mode
234
235                 should_run = False
236                 try:
237                     # Various checks that decide whether
238                     # this step runs or not
239                     self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
240                     self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour
241                     should_run = True
242                 except StepNotReady:
243                     logger.info('Step not ready: %s'%sync_step.__name__)
244                     failed_steps.append(sync_step)
245                 except:
246                     logger.info('Exception when checking schedule: %s'%sync_step.__name__)
247                     failed_steps.append(sync_step)
248
249                 if (should_run):
250                     try:
251                         duration=time.time() - start_time
252
253                         logger.info('Executing step %s' % sync_step.__name__)
254
255                         # ********* This is the actual sync step
256                         #import pdb
257                         #pdb.set_trace()
258                         failed_objects = sync_step(failed=list(failed_step_objects))
259
260
261                         self.check_duration(sync_step, duration)
262                         if failed_objects:
263                             failed_step_objects.update(failed_objects)
264                         self.update_run_time(sync_step)
265                     except:
266                         logger.log_exc('Failure in step: %s'%sync_step.__name__)
267                         failed_steps.append(S)
268             self.save_run_times()
269         except:
270             logger.log_exc("Exception in observer run loop")
271             traceback.print_exc()
272
273     def run(self):
274         try:
275             logger.info('Observer start run loop')
276             if not self.driver.enabled:
277                 return
278             if (self.driver_kind=="openstack") and (not self.driver.has_openstack):
279                 return
280
281             while True:
282                 try:  
283                     self.wait_for_event(timeout=30)       
284                 except: 
285                     logger.log_exc("Exception in observer wait for event") 
286                     traceback.print_exc()
287
288                 try: 
289                     self.run_steps()            
290                 except: 
291                     logger.log_exc("Exception in observer run steps")
292                     traceback.print_exc()
293         except:
294             logger.log_exc("Exception in observer run loop")
295             traceback.print_exc()