Merge branch 'master' of ssh://git.planet-lab.org/git/plstackapi origin/observer3.0
authorSapan Bhatia <gwsapan@gmail.com>
Wed, 30 Apr 2014 05:15:48 +0000 (01:15 -0400)
committerSapan Bhatia <gwsapan@gmail.com>
Wed, 30 Apr 2014 05:15:48 +0000 (01:15 -0400)
Conflicts:
planetstack/observer/event_loop.py

1  2 
planetstack/observer/event_loop.py

@@@ -26,285 -25,269 +26,285 @@@ debug_mode = Fals
  logger = Logger(level=logging.INFO)
  
  class StepNotReady(Exception):
-       pass
+     pass
  
  class NoOpDriver:
-       def __init__(self):
-                self.enabled = True
+     def __init__(self):
+          self.enabled = True
  
  class PlanetStackObserver:
 -    #sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps,SyncExternalRoutes,SyncUsers,SyncRoles,SyncNodes,SyncImages,GarbageCollector]
 -    sync_steps = []
 -
 -    def __init__(self):
 -        # The Condition object that gets signalled by Feefie events
 -        self.step_lookup = {}
 -        self.load_sync_step_modules()
 -        self.load_sync_steps()
 -        self.event_cond = threading.Condition()
 -
 -
 -        self.driver_kind = getattr(Config(), "observer_driver", "openstack")
 -        if self.driver_kind=="openstack":
 -            self.driver = OpenStackDriver()
 -        else:
 -            self.driver = NoOpDriver()
 -
 -    def wait_for_event(self, timeout):
 -        logger.info('Waiting for event')
 -        self.event_cond.acquire()
 -        self.event_cond.wait(timeout)
 -        self.event_cond.release()
 -
 -    def wake_up(self):
 -        logger.info('Wake up routine called. Event cond %r'%self.event_cond)
 -        self.event_cond.acquire()
 -        self.event_cond.notify()
 -        self.event_cond.release()
 -
 -    def load_sync_step_modules(self, step_dir=None):
 -        if step_dir is None:
 -            if hasattr(Config(), "observer_steps_dir"):
 -                step_dir = Config().observer_steps_dir
 -            else:
 -                step_dir = "/opt/planetstack/observer/steps"
 -
 -        for fn in os.listdir(step_dir):
 -            pathname = os.path.join(step_dir,fn)
 -            if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py"):
 -                module = imp.load_source(fn[:-3],pathname)
 -                for classname in dir(module):
 -                    c = getattr(module, classname, None)
 -
 -                    # make sure 'c' is a descendent of SyncStep and has a
 -                    # provides field (this eliminates the abstract base classes
 -                    # since they don't have a provides)
 -
 -                    if inspect.isclass(c) and issubclass(c, SyncStep) and hasattr(c,"provides") and (c not in self.sync_steps):
 -                        self.sync_steps.append(c)
 -        logger.info('loaded sync steps: %s' % ",".join([x.__name__ for x in self.sync_steps]))
 -        # print 'loaded sync steps: %s' % ",".join([x.__name__ for x in self.sync_steps])
 -
 -    def load_sync_steps(self):
 -        dep_path = Config().observer_dependency_graph
 -        logger.info('Loading model dependency graph from %s' % dep_path)
 -        try:
 -            # This contains dependencies between records, not sync steps
 -            self.model_dependency_graph = json.loads(open(dep_path).read())
 -        except Exception,e:
 -            raise e
 -
 -        try:
 -            backend_path = Config().observer_pl_dependency_graph
 -            logger.info('Loading backend dependency graph from %s' % backend_path)
 -            # This contains dependencies between backend records
 -            self.backend_dependency_graph = json.loads(open(backend_path).read())
 -        except Exception,e:
 -            logger.info('Backend dependency graph not loaded')
 -            # We can work without a backend graph
 -            self.backend_dependency_graph = {}
 -
 -        provides_dict = {}
 -        for s in self.sync_steps:
 -            self.step_lookup[s.__name__] = s 
 -            for m in s.provides:
 -                try:
 -                    provides_dict[m.__name__].append(s.__name__)
 -                except KeyError:
 -                    provides_dict[m.__name__]=[s.__name__]
 -
 -                
 -        step_graph = {}
 -        for k,v in self.model_dependency_graph.iteritems():
 -            try:
 -                for source in provides_dict[k]:
 -                    for m in v:
 -                        try:
 -                            for dest in provides_dict[m]:
 -                                # no deps, pass
 -                                try:
 -                                    if (dest not in step_graph[source]):
 -                                        step_graph[source].append(dest)
 -                                except:
 -                                    step_graph[source]=[dest]
 -                        except KeyError:
 -                            pass
 -                    
 -            except KeyError:
 -                pass
 -                # no dependencies, pass
 -        
 -        #import pdb
 -        #pdb.set_trace()
 -        if (self.backend_dependency_graph):
 -            backend_dict = {}
 -            for s in self.sync_steps:
 -                for m in s.serves:
 -                    backend_dict[m]=s.__name__
 -                    
 -            for k,v in backend_dependency_graph.iteritems():
 -                try:
 -                    source = backend_dict[k]
 -                    for m in v:
 -                        try:
 -                            dest = backend_dict[m]
 -                        except KeyError:
 -                            # no deps, pass
 -                            pass
 -                        step_graph[source]=dest
 -                        
 -                except KeyError:
 -                    pass
 -                    # no dependencies, pass
 -
 -        dependency_graph = step_graph
 -
 -        self.ordered_steps = toposort(dependency_graph, map(lambda s:s.__name__,self.sync_steps))
 -        print "Order of steps=",self.ordered_steps
 -        self.load_run_times()
 -        
 -
 -    def check_duration(self, step, duration):
 -        try:
 -            if (duration > step.deadline):
 -                logger.info('Sync step %s missed deadline, took %.2f seconds'%(step.name,duration))
 -        except AttributeError:
 -            # S doesn't have a deadline
 -            pass
 -
 -    def update_run_time(self, step):
 -        self.last_run_times[step.__name__]=time.time()
 -
 -    def check_schedule(self, step):
 -        time_since_last_run = time.time() - self.last_run_times.get(step.__name__, 0)
 -        try:
 -            if (time_since_last_run < step.requested_interval):
 -                raise StepNotReady
 -        except AttributeError:
 -            logger.info('Step %s does not have requested_interval set'%step.__name__)
 -            raise StepNotReady
 -    
 -    def load_run_times(self):
 -        try:
 -            jrun_times = open('/tmp/observer_run_times').read()
 -            self.last_run_times = json.loads(jrun_times)
 -        except:
 -            self.last_run_times={}
 -            for e in self.ordered_steps:
 -                self.last_run_times[e]=0
 -
 -
 -    def save_run_times(self):
 -        run_times = json.dumps(self.last_run_times)
 -        open('/tmp/observer_run_times','w').write(run_times)
 -
 -    def check_class_dependency(self, step, failed_steps):
 -        step.dependenices = []
 -        for obj in step.provides:
 -            step.dependenices.extend(self.model_dependency_graph.get(obj.__name__, []))
 -        for failed_step in failed_steps:
 -            if (failed_step in step.dependencies):
 -                raise StepNotReady
 -
 -
 -    def run_steps(self):
 -        try:
 -            logger.info('Observer run steps')
 -
 -            # Set of whole steps that failed
 -            failed_steps = []
 -
 -            # Set of individual objects within steps that failed
 -            failed_step_objects = set()
 -
 -            for S in self.ordered_steps:
 -                step = self.step_lookup[S]
 -                start_time=time.time()
 -                
 -                sync_step = step(driver=self.driver)
 -                sync_step.__name__ = step.__name__
 -                sync_step.dependencies = []
 -                try:
 -                    mlist = sync_step.provides
 -                    
 -                    for m in mlist:
 -                        sync_step.dependencies.extend(self.model_dependency_graph[m.__name__])
 -                except KeyError:
 -                    pass
 -                sync_step.debug_mode = debug_mode
 -
 -                should_run = False
 -                try:
 -                    # Various checks that decide whether
 -                    # this step runs or not
 -                    self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
 -                    self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour
 -                    should_run = True
 -                except StepNotReady:
 -                    logging.info('Step not ready: %s'%sync_step.__name__)
 -                    failed_steps.append(sync_step)
 -                except:
 -                    failed_steps.append(sync_step)
 -
 -                if (should_run):
 -                    try:
 -                        duration=time.time() - start_time
 -
 -                        logger.info('Executing step %s' % sync_step.__name__)
 -
 -                        # ********* This is the actual sync step
 -                        #import pdb
 -                        #pdb.set_trace()
 -                        failed_objects = sync_step(failed=list(failed_step_objects))
 -
 -
 -                        self.check_duration(sync_step, duration)
 -                        if failed_objects:
 -                            failed_step_objects.update(failed_objects)
 -                        self.update_run_time(sync_step)
 -                    except:
 -                        failed_steps.append(S)
 -            self.save_run_times()
 -        except:
 -            logger.log_exc("Exception in observer run loop")
 -            traceback.print_exc()
 -
 -    def run(self):
 -        try:
 -            logger.info('Observer start run loop')
 -            if not self.driver.enabled:
 -                return
 -            if (self.driver_kind=="openstack") and (not self.driver.has_openstack):
 -                return
 -
 -            while True:
 -                try:  
 -                    self.wait_for_event(timeout=30)       
 -                except: 
 -                    logger.log_exc("Exception in observer wait for event") 
 -                    traceback.print_exc()
 -
 -                try: 
 -                    self.run_steps()            
 -                except: 
 -                    logger.log_exc("Exception in observer run steps")
 -                    traceback.print_exc()
 -        except:
 -            logger.log_exc("Exception in observer run loop")
 -            traceback.print_exc()
 +      #sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps,SyncExternalRoutes,SyncUsers,SyncRoles,SyncNodes,SyncImages,GarbageCollector]
 +      sync_steps = []
 +
 +      def __init__(self):
 +              # The Condition object that gets signalled by Feefie events
 +              self.step_lookup = {}
 +              self.load_sync_step_modules()
 +              self.load_sync_steps()
 +              self.event_cond = threading.Condition()
 +
 +              self.driver_kind = getattr(Config(), "observer_driver", "openstack")
 +              if self.driver_kind=="openstack":
 +                      self.driver = OpenStackDriver()
 +              else:
 +                      self.driver = NoOpDriver()
 +
 +      def wait_for_event(self, timeout):
 +              self.event_cond.acquire()
 +              self.event_cond.wait(timeout)
 +              self.event_cond.release()
 +
 +      def wake_up(self):
 +              logger.info('Wake up routine called. Event cond %r'%self.event_cond)
 +              self.event_cond.acquire()
 +              self.event_cond.notify()
 +              self.event_cond.release()
 +
 +      def load_sync_step_modules(self, step_dir=None):
 +              if step_dir is None:
 +                      if hasattr(Config(), "observer_steps_dir"):
 +                              step_dir = Config().observer_steps_dir
 +                      else:
 +                              step_dir = "/opt/planetstack/observer/steps"
 +
 +              for fn in os.listdir(step_dir):
 +                      pathname = os.path.join(step_dir,fn)
 +                      if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py"):
 +                              module = imp.load_source(fn[:-3],pathname)
 +                              for classname in dir(module):
 +                                      c = getattr(module, classname, None)
 +
 +                                      # make sure 'c' is a descendent of SyncStep and has a
 +                                      # provides field (this eliminates the abstract base classes
 +                                      # since they don't have a provides)
 +
 +                                      if inspect.isclass(c) and issubclass(c, SyncStep) and hasattr(c,"provides") and (c not in self.sync_steps):
 +                                              self.sync_steps.append(c)
 +              logger.info('loaded sync steps: %s' % ",".join([x.__name__ for x in self.sync_steps]))
 +              # print 'loaded sync steps: %s' % ",".join([x.__name__ for x in self.sync_steps])
 +
 +      def load_sync_steps(self):
 +              dep_path = Config().observer_dependency_graph
 +              logger.info('Loading model dependency graph from %s' % dep_path)
 +              try:
 +                      # This contains dependencies between records, not sync steps
 +                      self.model_dependency_graph = json.loads(open(dep_path).read())
 +              except Exception,e:
 +                      raise e
 +
 +              try:
 +                      backend_path = Config().observer_pl_dependency_graph
 +                      logger.info('Loading backend dependency graph from %s' % backend_path)
 +                      # This contains dependencies between backend records
 +                      self.backend_dependency_graph = json.loads(open(backend_path).read())
 +              except Exception,e:
 +                      logger.info('Backend dependency graph not loaded')
 +                      # We can work without a backend graph
 +                      self.backend_dependency_graph = {}
 +
 +              provides_dict = {}
 +              for s in self.sync_steps:
 +                      self.step_lookup[s.__name__] = s 
 +                      for m in s.provides:
 +                              try:
 +                                      provides_dict[m.__name__].append(s.__name__)
 +                              except KeyError:
 +                                      provides_dict[m.__name__]=[s.__name__]
 +
 +                              
 +              step_graph = {}
 +              for k,v in self.model_dependency_graph.iteritems():
 +                      try:
 +                              for source in provides_dict[k]:
 +                                      for m in v:
 +                                              try:
 +                                                      for dest in provides_dict[m]:
 +                                                              # no deps, pass
 +                                                              try:
 +                                                                      if (dest not in step_graph[source]):
 +                                                                              step_graph[source].append(dest)
 +                                                              except:
 +                                                                      step_graph[source]=[dest]
 +                                              except KeyError:
 +                                                      pass
 +                                      
 +                      except KeyError:
 +                              pass
 +                              # no dependencies, pass
 +              
 +              #import pdb
 +              #pdb.set_trace()
 +              if (self.backend_dependency_graph):
 +                      backend_dict = {}
 +                      for s in self.sync_steps:
 +                              for m in s.serves:
 +                                      backend_dict[m]=s.__name__
 +                                      
 +                      for k,v in backend_dependency_graph.iteritems():
 +                              try:
 +                                      source = backend_dict[k]
 +                                      for m in v:
 +                                              try:
 +                                                      dest = backend_dict[m]
 +                                              except KeyError:
 +                                                      # no deps, pass
 +                                                      pass
 +                                              step_graph[source]=dest
 +                                              
 +                              except KeyError:
 +                                      pass
 +                                      # no dependencies, pass
 +
 +              dependency_graph = step_graph
 +
 +              self.ordered_steps = toposort(dependency_graph, map(lambda s:s.__name__,self.sync_steps))
 +              print "Order of steps=",self.ordered_steps
 +              self.load_run_times()
 +              
 +
 +      def check_duration(self, step, duration):
 +              try:
 +                      if (duration > step.deadline):
 +                              logger.info('Sync step %s missed deadline, took %.2f seconds'%(step.name,duration))
 +              except AttributeError:
 +                      # S doesn't have a deadline
 +                      pass
 +
 +      def update_run_time(self, step, deletion):
 +              if (not deletion):
 +                      self.last_run_times[step.__name__]=time.time()
 +              else:
 +                      self.last_deletion_run_times[step.__name__]=time.time()
 +
 +
 +      def check_schedule(self, step, deletion):
 +              last_run_times = self.last_run_times if not deletion else self.last_deletion_run_times
 +
 +              time_since_last_run = time.time() - last_run_times.get(step.__name__, 0)
 +              try:
 +                      if (time_since_last_run < step.requested_interval):
 +                              raise StepNotReady
 +              except AttributeError:
 +                      logger.info('Step %s does not have requested_interval set'%step.__name__)
 +                      raise StepNotReady
 +      
 +      def load_run_times(self):
 +              try:
 +                      jrun_times = open('/tmp/observer_run_times').read()
 +                      self.last_run_times = json.loads(jrun_times)
 +              except:
 +                      self.last_run_times={}
 +                      for e in self.ordered_steps:
 +                              self.last_run_times[e]=0
 +              try:
 +                      jrun_times = open('/tmp/observer_deletion_run_times').read()
 +                      self.last_deletion_run_times = json.loads(jrun_times)
 +              except:
 +                      self.last_deletion_run_times={}
 +                      for e in self.ordered_steps:
 +                              self.last_deletion_run_times[e]=0
 +
 +
 +
 +      def save_run_times(self):
 +              run_times = json.dumps(self.last_run_times)
 +              open('/tmp/observer_run_times','w').write(run_times)
 +
 +              deletion_run_times = json.dumps(self.last_deletion_run_times)
 +              open('/tmp/observer_deletion_run_times','w').write(deletion_run_times)
 +
 +      def check_class_dependency(self, step, failed_steps):
 +              step.dependenices = []
 +              for obj in step.provides:
 +                      step.dependenices.extend(self.model_dependency_graph.get(obj.__name__, []))
 +              for failed_step in failed_steps:
 +                      if (failed_step in step.dependencies):
 +                              raise StepNotReady
 +
 +      def run(self):
 +              if not self.driver.enabled:
 +                      return
 +              if (self.driver_kind=="openstack") and (not self.driver.has_openstack):
 +                      return
 +
 +              while True:
 +                      try:
 +                              error_map_file = getattr(Config(), "error_map_path", "/opt/planetstack/error_map.txt")
 +                              error_mapper = ErrorMapper(error_map_file)
 +
 +                              logger.info('Waiting for event')
 +                              tBeforeWait = time.time()
 +                              self.wait_for_event(timeout=30)
 +                              logger.info('Observer woke up')
 +
 +                              # Two passes. One for sync, the other for deletion.
 +                              for deletion in (False,True):
 +                                      logger.info('Creation pass...')
 +                                      # Set of whole steps that failed
 +                                      failed_steps = []
 +
 +                                      # Set of individual objects within steps that failed
 +                                      failed_step_objects = set()
 +
 +                                      ordered_steps = self.ordered_steps if not deletion else reversed(self.ordered_steps)
 +
 +                                      for S in ordered_steps:
 +                                              step = self.step_lookup[S]
 +                                              start_time=time.time()
 +                                              
 +                                              sync_step = step(driver=self.driver,error_map=error_mapper)
 +                                              sync_step.__name__ = step.__name__
 +                                              sync_step.dependencies = []
 +                                              try:
 +                                                      mlist = sync_step.provides
 +                                                      
 +                                                      for m in mlist:
 +                                                              sync_step.dependencies.extend(self.model_dependency_graph[m.__name__])
 +                                              except KeyError:
 +                                                      pass
 +                                              sync_step.debug_mode = debug_mode
 +
 +                                              should_run = False
 +                                              try:
 +                                                      # Various checks that decide whether
 +                                                      # this step runs or not
 +                                                      self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
 +                                                      self.check_schedule(sync_step,deletion) # dont run sync_network_routes if time since last run < 1 hour
 +                                                      should_run = True
 +                                              except StepNotReady:
 +                                                      logging.info('Step not ready: %s'%sync_step.__name__)
 +                                                      failed_steps.append(sync_step)
 +                                              except Exception,e:
 +                                                      logging.error('%r',e)
 +                                                      logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion))
 +                                                      failed_steps.append(sync_step)
 +
 +                                              if (should_run):
 +                                                      try:
 +                                                              duration=time.time() - start_time
 +
 +                                                              logger.info('Executing step %s' % sync_step.__name__)
 +
 +                                                              # ********* This is the actual sync step
 +                                                              #import pdb
 +                                                              #pdb.set_trace()
 +                                                              failed_objects = sync_step(failed=list(failed_step_objects), deletion=deletion)
 +
 +
 +                                                              self.check_duration(sync_step, duration)
 +                                                              if failed_objects:
 +                                                                      failed_step_objects.update(failed_objects)
 +
 +                                                              if (not deletion):
 +                                                                      self.update_run_time(sync_step)
 +                                                              else:
 +                                                                      self.update_deletion_run_time(sync_step)
 +                                                      except Exception,e:
 +                                                              logging.error('Model step failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e)
 +                                                              logger.log_exc(e)
 +                                                              failed_steps.append(S)
 +                              self.save_run_times()
 +                      except Exception, e:
 +                              logging.error('Core error. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e)
 +                              logger.log_exc("Exception in observer run loop")
 +                              traceback.print_exc()