From: Tony Mack Date: Tue, 24 Sep 2013 14:12:33 +0000 (-0400) Subject: tabs, bugfixes X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=ce79de09a6a65b2be1de89c0ad9e74cf544f2db2;p=plstackapi.git tabs, bugfixes --- diff --git a/planetstack/observer/event_loop.py b/planetstack/observer/event_loop.py index 492cd9a..9884390 100644 --- a/planetstack/observer/event_loop.py +++ b/planetstack/observer/event_loop.py @@ -20,228 +20,228 @@ debug_mode = False logger = Logger(logfile='observer.log', level=logging.INFO) class StepNotReady(Exception): - pass + pass def toposort(g, steps): - reverse = {} - - for k,v in g.items(): - for rk in v: - try: - reverse[rk].append(k) - except: - reverse[rk]=k - - sources = [] - for k,v in g.items(): - if not reverse.has_key(k): - sources.append(k) - - - for k,v in reverse.iteritems(): - if (not v): - sources.append(k) - - order = [] - marked = [] - - while sources: - n = sources.pop() - try: - for m in g[n]: - if m not in marked: - sources.append(m) - marked.append(m) - except KeyError: - pass - order.append(n) - return order + reverse = {} + + for k,v in g.items(): + for rk in v: + try: + reverse[rk].append(k) + except: + reverse[rk]=k + + sources = [] + for k,v in g.items(): + if not reverse.has_key(k): + sources.append(k) + + + for k,v in reverse.iteritems(): + if (not v): + sources.append(k) + + order = [] + marked = [] + + while sources: + n = sources.pop() + try: + for m in g[n]: + if m not in marked: + sources.append(m) + marked.append(m) + except KeyError: + pass + order.append(n) + return order class PlanetStackObserver: - sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps] + sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps] - def __init__(self): - # The Condition object that gets signalled by Feefie events - self.load_sync_steps() - self.event_cond = threading.Condition() + def __init__(self): + # The Condition object that gets signalled by Feefie events + self.load_sync_steps() + self.event_cond = threading.Condition() self.driver = OpenStackDriver() - def wait_for_event(self, timeout): - self.event_cond.acquire() - self.event_cond.wait(timeout) - self.event_cond.release() - - def wake_up(self): - logger.info('Wake up routine called. Event cond %r'%self.event_cond) - self.event_cond.acquire() - self.event_cond.notify() - self.event_cond.release() - - def load_sync_steps(self): - dep_path = Config().observer_dependency_path - try: - # This contains dependencies between records, not sync steps - self.model_dependency_graph = json.loads(open(dep_path).read()) - except Exception,e: - raise e - - backend_path = Config().observer_backend_dependency_path - try: - # This contains dependencies between backend records - self.backend_dependency_graph = json.loads(open(backend_path).read()) - except Exception,e: - # We can work without a backend graph - self.backend_dependency_graph = {} - - provides_dict = {} - for s in self.sync_steps: - for m in s.provides: - try: - provides_dict[m.__name__].append(s.__name__) - except KeyError: - provides_dict[m.__name__]=[s.__name__] - - - step_graph = {} - for k,v in self.model_dependency_graph.iteritems(): - try: - for source in provides_dict[k]: - for m in v: - try: - for dest in provides_dict[m]: - # no deps, pass - try: - step_graph[source].append(dest) - except: - step_graph[source]=[dest] - except KeyError: - pass - - except KeyError: - pass - # no dependencies, pass - - import pdb - pdb.set_trace() - if (self.backend_dependency_graph): - backend_dict = {} - for s in sync_steps: - for m in s.serves: - backend_dict[m]=s.__name__ - - for k,v in backend_dependency_graph.iteritems(): - try: - source = backend_dict[k] - for m in v: - try: - dest = backend_dict[m] - except KeyError: - # no deps, pass - pass - step_graph[source]=dest - - except KeyError: - pass - # no dependencies, pass - - dependency_graph = step_graph - - self.ordered_steps = toposort(dependency_graph, self.sync_steps) - print "Order of steps=",self.ordered_steps - self.load_run_times() - - - def check_duration(self): - try: - if (duration > S.deadline): - logger.info('Sync step %s missed deadline, took %.2f seconds'%(S.name,duration)) - except AttributeError: - # S doesn't have a deadline - pass - - def update_run_time(self, step): - self.last_run_times[step.name]=time.time() - - def check_schedule(self, step): - time_since_last_run = time.time() - self.last_run_times[step.name] - try: - if (time_since_last_run < step.requested_interval): - raise StepNotReady - except AttributeError: - logger.info('Step %s does not have requested_interval set'%step.name) - raise StepNotReady - - def load_run_times(self): - try: - jrun_times = open('/tmp/observer_run_times').read() - self.last_run_times = json.loads(jrun_times) - except: - self.last_run_times={} - for e in self.ordered_steps: - self.last_run_times[e.name]=0 - - - - def save_run_times(self): - run_times = json.dumps(self.last_run_times) - open('/tmp/observer_run_times','w').write(run_times) - - def check_class_dependency(self, step, failed_steps): - for failed_step in failed_steps: - if (failed_step in self.dependency_graph[step.name]): - raise StepNotReady - - def run(self): - if not self.driver.enabled or not self.driver.has_openstack: - return - - while True: - try: - logger.info('Waiting for event') - tBeforeWait = time.time() - self.wait_for_event(timeout=300) - logger.info('Observer woke up') - - # Set of whole steps that failed - failed_steps = [] - - # Set of individual objects within steps that failed - failed_step_objects = [] - - for S in self.ordered_steps: - start_time=time.time() - - sync_step = S(driver=self.driver) - sync_step.dependencies = self.dependencies[sync_step.name] - sync_step.debug_mode = debug_mode - - should_run = False - try: - # Various checks that decide whether - # this step runs or not - self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed - self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour - should_run = True - except StepNotReady: - logging.info('Step not ready: %s'%sync_step.name) - failed_steps.add(sync_step) - except: - failed_steps.add(sync_step) - - if (should_run): - try: - duration=time.time() - start_time - - # ********* This is the actual sync step - failed_objects = sync_step(failed=failed_step_objects) - - - check_deadline(sync_step, duration) - failed_step_objects.extend(failed_objects) - self.update_run_time(sync_step) - except: - failed_steps.add(S) - self.save_run_times() - except: - logger.log_exc("Exception in observer run loop") - traceback.print_exc() + def wait_for_event(self, timeout): + self.event_cond.acquire() + self.event_cond.wait(timeout) + self.event_cond.release() + + def wake_up(self): + logger.info('Wake up routine called. Event cond %r'%self.event_cond) + self.event_cond.acquire() + self.event_cond.notify() + self.event_cond.release() + + def load_sync_steps(self): + dep_path = Config().observer_backend_dependency_graph + try: + # This contains dependencies between records, not sync steps + self.model_dependency_graph = json.loads(open(dep_path).read()) + except Exception,e: + raise e + + backend_path = Config().observer_backend_dependency_graph + try: + # This contains dependencies between backend records + self.backend_dependency_graph = json.loads(open(backend_path).read()) + except Exception,e: + # We can work without a backend graph + self.backend_dependency_graph = {} + + provides_dict = {} + for s in self.sync_steps: + for m in s.provides: + try: + provides_dict[m.__name__].append(s.__name__) + except KeyError: + provides_dict[m.__name__]=[s.__name__] + + + step_graph = {} + for k,v in self.model_dependency_graph.iteritems(): + try: + for source in provides_dict[k]: + for m in v: + try: + for dest in provides_dict[m]: + # no deps, pass + try: + step_graph[source].append(dest) + except: + step_graph[source]=[dest] + except KeyError: + pass + + except KeyError: + pass + # no dependencies, pass + + #import pdb + #pdb.set_trace() + if (self.backend_dependency_graph): + backend_dict = {} + for s in self.sync_steps: + for m in s.serves: + backend_dict[m]=s.__name__ + + for k,v in backend_dependency_graph.iteritems(): + try: + source = backend_dict[k] + for m in v: + try: + dest = backend_dict[m] + except KeyError: + # no deps, pass + pass + step_graph[source]=dest + + except KeyError: + pass + # no dependencies, pass + + dependency_graph = step_graph + + self.ordered_steps = toposort(dependency_graph, self.sync_steps) + print "Order of steps=",self.ordered_steps + self.load_run_times() + + + def check_duration(self): + try: + if (duration > S.deadline): + logger.info('Sync step %s missed deadline, took %.2f seconds'%(S.name,duration)) + except AttributeError: + # S doesn't have a deadline + pass + + def update_run_time(self, step): + self.last_run_times[step.name]=time.time() + + def check_schedule(self, step): + time_since_last_run = time.time() - self.last_run_times[step.name] + try: + if (time_since_last_run < step.requested_interval): + raise StepNotReady + except AttributeError: + logger.info('Step %s does not have requested_interval set'%step.name) + raise StepNotReady + + def load_run_times(self): + try: + jrun_times = open('/tmp/observer_run_times').read() + self.last_run_times = json.loads(jrun_times) + except: + self.last_run_times={} + for e in self.ordered_steps: + self.last_run_times[e.name]=0 + + + + def save_run_times(self): + run_times = json.dumps(self.last_run_times) + open('/tmp/observer_run_times','w').write(run_times) + + def check_class_dependency(self, step, failed_steps): + for failed_step in failed_steps: + if (failed_step in self.dependency_graph[step.name]): + raise StepNotReady + + def run(self): + if not self.driver.enabled or not self.driver.has_openstack: + return + + while True: + try: + logger.info('Waiting for event') + tBeforeWait = time.time() + self.wait_for_event(timeout=300) + logger.info('Observer woke up') + + # Set of whole steps that failed + failed_steps = [] + + # Set of individual objects within steps that failed + failed_step_objects = [] + + for S in self.ordered_steps: + start_time=time.time() + + sync_step = S(driver=self.driver) + sync_step.dependencies = self.dependencies[sync_step.name] + sync_step.debug_mode = debug_mode + + should_run = False + try: + # Various checks that decide whether + # this step runs or not + self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed + self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour + should_run = True + except StepNotReady: + logging.info('Step not ready: %s'%sync_step.name) + failed_steps.add(sync_step) + except: + failed_steps.add(sync_step) + + if (should_run): + try: + duration=time.time() - start_time + + # ********* This is the actual sync step + failed_objects = sync_step(failed=failed_step_objects) + + + check_deadline(sync_step, duration) + failed_step_objects.extend(failed_objects) + self.update_run_time(sync_step) + except: + failed_steps.add(S) + self.save_run_times() + except: + logger.log_exc("Exception in observer run loop") + traceback.print_exc() diff --git a/planetstack/observer/event_manager.py b/planetstack/observer/event_manager.py index 857452b..de622f5 100644 --- a/planetstack/observer/event_manager.py +++ b/planetstack/observer/event_manager.py @@ -10,80 +10,80 @@ import base64 from fofum import Fofum import json -# decorator that marks dispatachable event methods +# decorator that marks dispatachable event methods def event(func): - setattr(func, 'event', func.__name__) - return func + setattr(func, 'event', func.__name__) + return func class EventHandler: - # This code is currently not in use. - def __init__(self): + # This code is currently not in use. + def __init__(self): pass - @staticmethod - def get_events(): - events = [] - for name in dir(EventHandler): - attribute = getattr(EventHandler, name) - if hasattr(attribute, 'event'): - events.append(getattr(attribute, 'event')) - return events - - def dispatch(self, event, *args, **kwds): - if hasattr(self, event): - return getattr(self, event)(*args, **kwds) - - + @staticmethod + def get_events(): + events = [] + for name in dir(EventHandler): + attribute = getattr(EventHandler, name) + if hasattr(attribute, 'event'): + events.append(getattr(attribute, 'event')) + return events + + def dispatch(self, event, *args, **kwds): + if hasattr(self, event): + return getattr(self, event)(*args, **kwds) + + class EventSender: - def __init__(self,user=None,clientid=None): - try: - clid = Config().feefie_client_id - user = Config().feefie_client_user - except: - clid = 'planetstack_core_team' - user = 'pl' + def __init__(self,user=None,clientid=None): + try: + clid = Config().feefie_client_id + user = Config().feefie_client_user + except: + clid = 'planetstack_core_team' + user = 'pl' - self.fofum = Fofum(user=user) - self.fofum.make(clid) + self.fofum = Fofum(user=user) + self.fofum.make(clid) - def fire(self,**args): - self.fofum.fire(json.dumps(args)) + def fire(self,**args): + self.fofum.fire(json.dumps(args)) class EventListener: - def __init__(self,wake_up=None): - self.handler = EventHandler() - self.wake_up = wake_up - - def handle_event(self, payload): - payload_dict = json.loads(payload) - - try: - deletion = payload_dict['deletion_flag'] - if (deletion): - model = payload_dict['model'] - pk = payload_dict['pk'] - - for deleter in deleters[model]: - deleter(pk) - except: - deletion = False - - if (not deletion and self.wake_up): - self.wake_up() - - - def run(self): - # This is our unique client id, to be used when firing and receiving events - # It needs to be generated once and placed in the config file - - try: - clid = Config().feefie_client_id - user = Config().feefie_client_user - except: - clid = 'planetstack_core_team' - user = 'pl' - - f = Fofum(user=user) - - listener_thread = threading.Thread(target=f.listen_for_event,args=(clid,self.handle_event)) - listener_thread.start() + def __init__(self,wake_up=None): + self.handler = EventHandler() + self.wake_up = wake_up + + def handle_event(self, payload): + payload_dict = json.loads(payload) + + try: + deletion = payload_dict['deletion_flag'] + if (deletion): + model = payload_dict['model'] + pk = payload_dict['pk'] + + for deleter in deleters[model]: + deleter(pk) + except: + deletion = False + + if (not deletion and self.wake_up): + self.wake_up() + + + def run(self): + # This is our unique client id, to be used when firing and receiving events + # It needs to be generated once and placed in the config file + + try: + clid = Config().feefie_client_id + user = Config().feefie_client_user + except: + clid = 'planetstack_core_team' + user = 'pl' + + f = Fofum(user=user) + + listener_thread = threading.Thread(target=f.listen_for_event,args=(clid,self.handle_event)) + listener_thread.start() diff --git a/planetstack/observer/steps/sync_networks.py b/planetstack/observer/steps/sync_networks.py index f87d241..656ae68 100644 --- a/planetstack/observer/steps/sync_networks.py +++ b/planetstack/observer/steps/sync_networks.py @@ -5,54 +5,54 @@ from observer.openstacksyncstep import OpenStackSyncStep from core.models.network import * class SyncNetworks(OpenStackSyncStep): - provides=[Network] - requested_interval = 0 + provides=[Network] + requested_interval = 0 - def save_network(self, network): - if not network.network_id: - if network.template.sharedNetworkName: - network.network_id = network.template.sharedNetworkId - (network.subnet_id, network.subnet) = self.driver.get_network_subnet(network.network_id) - else: - network_name = network.name + def save_network(self, network): + if not network.network_id: + if network.template.sharedNetworkName: + network.network_id = network.template.sharedNetworkId + (network.subnet_id, network.subnet) = self.driver.get_network_subnet(network.network_id) + else: + network_name = network.name - # create network - os_network = self.driver.create_network(network_name, shared=True) - network.network_id = os_network['id'] + # create network + os_network = self.driver.create_network(network_name, shared=True) + network.network_id = os_network['id'] - # create router - router = self.driver.create_router(network_name) - network.router_id = router['id'] + # create router + router = self.driver.create_router(network_name) + network.router_id = router['id'] - # create subnet - next_subnet = self.get_next_subnet() - cidr = str(next_subnet.cidr) - ip_version = next_subnet.version - start = str(next_subnet[2]) - end = str(next_subnet[-2]) - subnet = self.driver.create_subnet(name=network_name, - network_id = network.network_id, - cidr_ip = cidr, - ip_version = ip_version, - start = start, - end = end) - network.subnet = cidr - network.subnet_id = subnet['id'] + # create subnet + next_subnet = self.get_next_subnet() + cidr = str(next_subnet.cidr) + ip_version = next_subnet.version + start = str(next_subnet[2]) + end = str(next_subnet[-2]) + subnet = self.driver.create_subnet(name=network_name, + network_id = network.network_id, + cidr_ip = cidr, + ip_version = ip_version, + start = start, + end = end) + network.subnet = cidr + network.subnet_id = subnet['id'] # add subnet as interface to slice's router self.driver.add_router_interface(router['id'], subnet['id']) # add external route self.driver.add_external_route(subnet) - def sync_record(self, site): - if network.owner and network.owner.creator: - try: - # update manager context + def sync_record(self, site): + if network.owner and network.owner.creator: + try: + # update manager context real_driver = self.driver self.driver = self.driver.client_driver(network.owner.creator, network.owner.name) - self.save_network(network) + self.save_network(network) self.driver = real_driver - logger.info("saved network: %s" % (network)) - except Exception,e: - logger.log_exc("save network failed: %s" % network) - raise e + logger.info("saved network: %s" % (network)) + except Exception,e: + logger.log_exc("save network failed: %s" % network) + raise e diff --git a/planetstack/observer/steps/sync_slice_memberships.py b/planetstack/observer/steps/sync_slice_memberships.py index 66953f1..1ec3a96 100644 --- a/planetstack/observer/steps/sync_slice_memberships.py +++ b/planetstack/observer/steps/sync_slice_memberships.py @@ -5,10 +5,10 @@ from observer.openstacksyncstep import OpenStackSyncStep from core.models.slice import * class SyncSliceMemberships(OpenStackSyncStep): - requested_interval=0 - provides=[SliceMembership] - def sync_record(self, user): - if slice_memb.user.kuser_id and slice_memb.slice.tenant_id: - self.driver.add_user_role(slice_memb.user.kuser_id, - slice_memb.slice.tenant_id, - slice_memb.role.role_type) + requested_interval=0 + provides=[SliceRole] + def sync_record(self, user): + if slice_memb.user.kuser_id and slice_memb.slice.tenant_id: + self.driver.add_user_role(slice_memb.user.kuser_id, + slice_memb.slice.tenant_id, + slice_memb.role.role_type) diff --git a/planetstack/observer/steps/sync_sliver_ips.py b/planetstack/observer/steps/sync_sliver_ips.py index d231d13..50ec6ad 100644 --- a/planetstack/observer/steps/sync_sliver_ips.py +++ b/planetstack/observer/steps/sync_sliver_ips.py @@ -5,21 +5,21 @@ from observer.openstacksyncstep import OpenStackSyncStep from core.models.sliver import Sliver class SyncSliverIps(OpenStackSyncStep): - provides=[Sliver] - requested_interval=0 - def fetch_pending(self): - slivers = Sliver.objects.filter(ip=None) - return slivers + provides=[Sliver] + requested_interval=0 + def fetch_pending(self): + slivers = Sliver.objects.filter(ip=None) + return slivers - def sync_record(self, sliver): + def sync_record(self, sliver): driver = self.driver.client_driver(tenant=sliver.slice.name) - servers = driver.shell.nova.servers.findall(id=sliver.instance_id) - if not servers: - return - server = servers[0] - ips = server.addresses.get(sliver.slice.name, []) - if not ips: - return - sliver.ip = ips[0]['addr'] - sliver.save() - logger.info("saved sliver ip: %s %s" % (sliver, ips[0])) + servers = driver.shell.nova.servers.findall(id=sliver.instance_id) + if not servers: + return + server = servers[0] + ips = server.addresses.get(sliver.slice.name, []) + if not ips: + return + sliver.ip = ips[0]['addr'] + sliver.save() + logger.info("saved sliver ip: %s %s" % (sliver, ips[0])) diff --git a/planetstack/observer/steps/sync_users.py b/planetstack/observer/steps/sync_users.py index aa665be..dde8a24 100644 --- a/planetstack/observer/steps/sync_users.py +++ b/planetstack/observer/steps/sync_users.py @@ -5,32 +5,32 @@ from observer.openstacksyncstep import OpenStackSyncStep from core.models.user import User class SyncUsers(OpenStackSyncStep): - provides=[User] - requested_interval=0 - def sync_record(self, user): - name = user.email[:user.email.find('@')] - user_fields = {'name': name, - 'email': user.email, - 'password': hashlib.md5(user.password).hexdigest()[:6], - 'enabled': True} - if not user.kuser_id: - keystone_user = self.driver.create_user(**user_fields) - user.kuser_id = keystone_user.id - else: - self.driver.update_user(user.kuser_id, user_fields) + provides=[User] + requested_interval=0 + def sync_record(self, user): + name = user.email[:user.email.find('@')] + user_fields = {'name': name, + 'email': user.email, + 'password': hashlib.md5(user.password).hexdigest()[:6], + 'enabled': True} + if not user.kuser_id: + keystone_user = self.driver.create_user(**user_fields) + user.kuser_id = keystone_user.id + else: + self.driver.update_user(user.kuser_id, user_fields) - if user.site: - self.driver.add_user_role(user.kuser_id, user.site.tenant_id, 'user') - if user.is_admin: - self.driver.add_user_role(user.kuser_id, user.site.tenant_id, 'admin') - else: - # may have admin role so attempt to remove it - self.driver.delete_user_role(user.kuser_id, user.site.tenant_id, 'admin') + if user.site: + self.driver.add_user_role(user.kuser_id, user.site.tenant_id, 'user') + if user.is_admin: + self.driver.add_user_role(user.kuser_id, user.site.tenant_id, 'admin') + else: + # may have admin role so attempt to remove it + self.driver.delete_user_role(user.kuser_id, user.site.tenant_id, 'admin') - if user.public_key: + if user.public_key: driver = self.driver.client_driver(caller=user, tenant=user.site.login_base) key_fields = {'name': user.keyname, 'public_key': user.public_key} driver.create_keypair(**key_fields) - user.save() + user.save() diff --git a/planetstack/observer/syncstep.py b/planetstack/observer/syncstep.py index 9f32621..c8d3e42 100644 --- a/planetstack/observer/syncstep.py +++ b/planetstack/observer/syncstep.py @@ -3,60 +3,60 @@ import base64 from planetstack.config import Config class FailedDependency(Exception): - pass + pass class SyncStep: - """ A PlanetStack Sync step. - - Attributes: - psmodel Model name the step synchronizes - dependencies list of names of models that must be synchronized first if the current model depends on them - """ - slow=False - def get_prop(prop): - try: - sync_config_dir = Config().sync_config_dir - except: - sync_config_dir = '/etc/planetstack/sync' - prop_config_path = '/'.join(sync_config_dir,self.name,prop) - return open(prop_config_path).read().rstrip() - - def __init__(self, **args): - """Initialize a sync step - Keyword arguments: - name -- Name of the step - provides -- PlanetStack models sync'd by this step - """ - dependencies = [] + """ A PlanetStack Sync step. + + Attributes: + psmodel Model name the step synchronizes + dependencies list of names of models that must be synchronized first if the current model depends on them + """ + slow=False + def get_prop(prop): + try: + sync_config_dir = Config().sync_config_dir + except: + sync_config_dir = '/etc/planetstack/sync' + prop_config_path = '/'.join(sync_config_dir,self.name,prop) + return open(prop_config_path).read().rstrip() + + def __init__(self, **args): + """Initialize a sync step + Keyword arguments: + name -- Name of the step + provides -- PlanetStack models sync'd by this step + """ + dependencies = [] self.driver = args.get('driver') - try: - self.soft_deadline = int(self.get_prop('soft_deadline_seconds')) - except: - self.soft_deadline = 5 # 5 seconds - - return - - def fetch_pending(self): - return Sliver.objects.filter(ip=None) - - def check_dependencies(self, obj): - for dep in self.dependencies: - peer_object = getattr(obj, dep.name.lowercase()) - if (peer_object.pk==dep.pk): - raise DependencyFailed - - def call(self, failed=[]): - pending = self.fetch_pending() - for o in pending: - if (not self.depends_on(o, failed)): - try: - check_dependencies(o) # Raises exception if failed - self.sync_record(o) - o.enacted = datetime.now() # Is this the same timezone? XXX - o.save(update_fields=['enacted']) - except: - failed.append(o) - return failed - - def __call__(self): - return self.call() + try: + self.soft_deadline = int(self.get_prop('soft_deadline_seconds')) + except: + self.soft_deadline = 5 # 5 seconds + + return + + def fetch_pending(self): + return Sliver.objects.filter(ip=None) + + def check_dependencies(self, obj): + for dep in self.dependencies: + peer_object = getattr(obj, dep.name.lowercase()) + if (peer_object.pk==dep.pk): + raise DependencyFailed + + def call(self, failed=[]): + pending = self.fetch_pending() + for o in pending: + if (not self.depends_on(o, failed)): + try: + check_dependencies(o) # Raises exception if failed + self.sync_record(o) + o.enacted = datetime.now() # Is this the same timezone? XXX + o.save(update_fields=['enacted']) + except: + failed.append(o) + return failed + + def __call__(self): + return self.call() diff --git a/planetstack/plstackapi_config b/planetstack/plstackapi_config index deaf2e3..6e0b26c 100644 --- a/planetstack/plstackapi_config +++ b/planetstack/plstackapi_config @@ -29,4 +29,4 @@ default_flavor=m1.small default_security_group=default [observer] -pl_dependency_graph='/opt/planetstack/model-deps' +pl_dependency_graph=/opt/planetstack/model-deps