X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=planetstack%2Fec2_observer%2Fevent_loop.py;h=1319fecf8befdbe78a24d65d2b008353a85511ac;hb=e42cedcb2caf20b1118db570a9650348958d7590;hp=f6b92ea1ba95561ca14f293022f4158fb9716ba8;hpb=6ff37c462f6b230988933c31e9c2f433dc4e5900;p=plstackapi.git diff --git a/planetstack/ec2_observer/event_loop.py b/planetstack/ec2_observer/event_loop.py index f6b92ea..1319fec 100644 --- a/planetstack/ec2_observer/event_loop.py +++ b/planetstack/ec2_observer/event_loop.py @@ -6,6 +6,7 @@ import traceback import commands import threading import json +import pdb from datetime import datetime from collections import defaultdict @@ -33,14 +34,25 @@ class NoOpDriver: self.enabled = True self.dependency_graph = None +STEP_STATUS_WORKING=1 +STEP_STATUS_OK=2 +STEP_STATUS_KO=3 + +def invert_graph(g): + ig = {} + for k,v in g.items(): + for v0 in v: + try: + ig[v0].append(k) + except: + ig=[k] + return ig + class PlanetStackObserver: - #sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps,SyncExternalRoutes,SyncUsers,SyncRoles,SyncNodes,SyncImages,GarbageCollector] + #sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivilege,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps,SyncExternalRoutes,SyncUsers,SyncRoles,SyncNodes,SyncImages,GarbageCollector] sync_steps = [] - STEP_STATUS_WORKING=1 - STEP_STATUS_OK=2 - STEP_STATUS_KO=3 - + def __init__(self): # The Condition object that gets signalled by Feefie events self.step_lookup = {} @@ -54,16 +66,16 @@ class PlanetStackObserver: else: self.driver = NoOpDriver() - def wait_for_event(self, timeout, cond=self.event_cond): - cond.acquire() - cond.wait(timeout) - cond.release() + def wait_for_event(self, timeout): + self.event_cond.acquire() + self.event_cond.wait(timeout) + self.event_cond.release() - def wake_up(self, cond=self.event_cond): + def wake_up(self): logger.info('Wake up routine called. Event cond %r'%self.event_cond) - cond.acquire() - cond.notify() - cond.release() + self.event_cond.acquire() + self.event_cond.notify() + self.event_cond.release() def load_sync_step_modules(self, step_dir=None): if step_dir is None: @@ -160,6 +172,7 @@ class PlanetStackObserver: # no dependencies, pass self.dependency_graph = step_graph + self.deletion_dependency_graph = invert_graph(step_graph) self.ordered_steps = toposort(self.dependency_graph, map(lambda s:s.__name__,self.sync_steps)) print "Order of steps=",self.ordered_steps @@ -209,7 +222,6 @@ class PlanetStackObserver: self.last_deletion_run_times[e]=0 - def save_run_times(self): run_times = json.dumps(self.last_run_times) open('/tmp/observer_run_times','w').write(run_times) @@ -229,20 +241,31 @@ class PlanetStackObserver: step = self.step_lookup[S] start_time=time.time() + dependency_graph = self.dependency_graph if not deletion else self.deletion_dependency_graph + # Wait for step dependencies to be met - deps = self.dependency_graph[S] - for d in deps: - cond = self.step_conditions[d] - acquire(cond) - if (self.step_status[S] is STEP_STATUS_WORKING): - cond.wait() - cond.release() - - if (self.step_status[S] is not STEP_STATUS_OK): + try: + deps = self.dependency_graph[S] + has_deps = True + except KeyError: + has_deps = False + + if (has_deps): + for d in deps: + cond = self.step_conditions[d] + cond.acquire() + if (self.step_status[d] is STEP_STATUS_WORKING): + cond.wait() + cond.release() + go = self.step_status[d] == STEP_STATUS_OK + else: + go = True + + if (not go): self.failed_steps.append(sync_step) my_status = STEP_STATUS_KO else: - sync_step = step(driver=self.driver,error_map=error_mapper) + sync_step = step(driver=self.driver,error_map=self.error_mapper) sync_step.__name__ = step.__name__ sync_step.dependencies = [] try: @@ -330,11 +353,12 @@ class PlanetStackObserver: for v in self.dependency_graph.values(): if (v): providers.update(v) + self.step_conditions = {} self.step_status = {} for p in list(providers): self.step_conditions[p] = threading.Condition() - self.step_status[p] = STEP_STATUS_IDLE + self.step_status[p] = STEP_STATUS_WORKING logger.info('Waiting for event') @@ -346,12 +370,13 @@ class PlanetStackObserver: for deletion in [False,True]: threads = [] logger.info('Deletion=%r...'%deletion) - schedule = self.sync_schedule if not deletion else self.delete_schedule + schedule = self.ordered_steps if not deletion else reversed(self.ordered_steps) - thread = threading.Thread(target=self.sync, args=(schedule.start_conditions, schedule.ordered_steps,deletion, schedule.signal_sem)) + for S in schedule: + thread = threading.Thread(target=self.sync, args=(S, deletion)) - logger.info('Deletion=%r...'%deletion) - threads.append(thread) + logger.info('Deletion=%r...'%deletion) + threads.append(thread) # Start threads for t in threads: