7 from datetime import datetime
8 from collections import defaultdict
9 from core.models import *
10 from django.db.models import F, Q
11 #from openstack.manager import OpenStackManager
12 from openstack.driver import OpenStackDriver
13 from util.logger import Logger, logging, logger
14 #from timeout import timeout
15 from planetstack.config import Config
16 from observer.steps import *
20 logger = Logger(logfile='observer.log', level=logging.INFO)
22 class StepNotReady(Exception):
25 def toposort(g, steps):
37 if not reverse.has_key(k):
41 for k,v in reverse.iteritems():
60 class PlanetStackObserver:
61 sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps]
64 # The Condition object that gets signalled by Feefie events
65 self.load_sync_steps()
66 self.event_cond = threading.Condition()
67 self.driver = OpenStackDriver()
69 def wait_for_event(self, timeout):
70 self.event_cond.acquire()
71 self.event_cond.wait(timeout)
72 self.event_cond.release()
75 logger.info('Wake up routine called. Event cond %r'%self.event_cond)
76 self.event_cond.acquire()
77 self.event_cond.notify()
78 self.event_cond.release()
80 def load_sync_steps(self):
81 dep_path = Config().observer_backend_dependency_graph
83 # This contains dependencies between records, not sync steps
84 self.model_dependency_graph = json.loads(open(dep_path).read())
88 backend_path = Config().observer_backend_dependency_graph
90 # This contains dependencies between backend records
91 self.backend_dependency_graph = json.loads(open(backend_path).read())
93 # We can work without a backend graph
94 self.backend_dependency_graph = {}
97 for s in self.sync_steps:
100 provides_dict[m.__name__].append(s.__name__)
102 provides_dict[m.__name__]=[s.__name__]
106 for k,v in self.model_dependency_graph.iteritems():
108 for source in provides_dict[k]:
111 for dest in provides_dict[m]:
114 step_graph[source].append(dest)
116 step_graph[source]=[dest]
122 # no dependencies, pass
126 if (self.backend_dependency_graph):
128 for s in self.sync_steps:
130 backend_dict[m]=s.__name__
132 for k,v in backend_dependency_graph.iteritems():
134 source = backend_dict[k]
137 dest = backend_dict[m]
141 step_graph[source]=dest
145 # no dependencies, pass
147 dependency_graph = step_graph
149 self.ordered_steps = toposort(dependency_graph, self.sync_steps)
150 print "Order of steps=",self.ordered_steps
151 self.load_run_times()
154 def check_duration(self):
156 if (duration > S.deadline):
157 logger.info('Sync step %s missed deadline, took %.2f seconds'%(S.name,duration))
158 except AttributeError:
159 # S doesn't have a deadline
162 def update_run_time(self, step):
163 self.last_run_times[step.name]=time.time()
165 def check_schedule(self, step):
166 time_since_last_run = time.time() - self.last_run_times[step.name]
168 if (time_since_last_run < step.requested_interval):
170 except AttributeError:
171 logger.info('Step %s does not have requested_interval set'%step.name)
174 def load_run_times(self):
176 jrun_times = open('/tmp/observer_run_times').read()
177 self.last_run_times = json.loads(jrun_times)
179 self.last_run_times={}
180 for e in self.ordered_steps:
181 self.last_run_times[e.name]=0
185 def save_run_times(self):
186 run_times = json.dumps(self.last_run_times)
187 open('/tmp/observer_run_times','w').write(run_times)
189 def check_class_dependency(self, step, failed_steps):
190 for failed_step in failed_steps:
191 if (failed_step in self.dependency_graph[step.name]):
195 if not self.driver.enabled or not self.driver.has_openstack:
200 logger.info('Waiting for event')
201 tBeforeWait = time.time()
202 self.wait_for_event(timeout=300)
203 logger.info('Observer woke up')
205 # Set of whole steps that failed
208 # Set of individual objects within steps that failed
209 failed_step_objects = []
211 for S in self.ordered_steps:
212 start_time=time.time()
214 sync_step = S(driver=self.driver)
215 sync_step.dependencies = self.dependencies[sync_step.name]
216 sync_step.debug_mode = debug_mode
220 # Various checks that decide whether
221 # this step runs or not
222 self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
223 self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour
226 logging.info('Step not ready: %s'%sync_step.name)
227 failed_steps.add(sync_step)
229 failed_steps.add(sync_step)
233 duration=time.time() - start_time
235 # ********* This is the actual sync step
236 failed_objects = sync_step(failed=failed_step_objects)
239 check_deadline(sync_step, duration)
240 failed_step_objects.extend(failed_objects)
241 self.update_run_time(sync_step)
244 self.save_run_times()
246 logger.log_exc("Exception in observer run loop")
247 traceback.print_exc()