From e1509e86fb337d86fb377248340181f786821b12 Mon Sep 17 00:00:00 2001 From: Tony Mack Date: Wed, 9 Oct 2013 12:38:04 -0400 Subject: [PATCH] sync nodes --- planetstack/observer/event_loop.py | 482 +++++++++--------- planetstack/observer/steps/__init__.py | 1 + .../observer/steps/garbage_collector.py | 22 +- planetstack/observer/steps/sync_nodes.py | 42 ++ planetstack/plstackapi_config | 1 + 5 files changed, 306 insertions(+), 242 deletions(-) create mode 100644 planetstack/observer/steps/sync_nodes.py diff --git a/planetstack/observer/event_loop.py b/planetstack/observer/event_loop.py index 7903ce4..b177409 100644 --- a/planetstack/observer/event_loop.py +++ b/planetstack/observer/event_loop.py @@ -20,185 +20,185 @@ debug_mode = False logger = Logger(logfile='observer.log', level=logging.INFO) class StepNotReady(Exception): - pass + pass def toposort(g, steps=None): - if (not steps): - keys = set(g.keys()) - values = set({}) - for v in g.values(): - values=values | set(v) - - steps=list(keys|values) - reverse = {} - - for k,v in g.items(): - for rk in v: - try: - reverse[rk].append(k) - except: - reverse[rk]=k - - sources = [] - for k,v in g.items(): - if not reverse.has_key(k): - sources.append(k) - - - for k,v in reverse.iteritems(): - if (not v): - sources.append(k) - - order = [] - marked = [] - - while sources: - n = sources.pop() - try: - for m in g[n]: - if m not in marked: - sources.append(m) - marked.append(m) - except KeyError: - pass - if (n in steps): - order.append(n) - - order.reverse() - order.extend(set(steps)-set(order)) - return order + if (not steps): + keys = set(g.keys()) + values = set({}) + for v in g.values(): + values=values | set(v) + + steps=list(keys|values) + reverse = {} + + for k,v in g.items(): + for rk in v: + try: + reverse[rk].append(k) + except: + reverse[rk]=k + + sources = [] + for k,v in g.items(): + if not reverse.has_key(k): + sources.append(k) + + + for k,v in reverse.iteritems(): + if (not v): + sources.append(k) + + order = [] + marked = [] + + while sources: + n = sources.pop() + try: + for m in g[n]: + if m not in marked: + sources.append(m) + marked.append(m) + except KeyError: + pass + if (n in steps): + order.append(n) + + order.reverse() + order.extend(set(steps)-set(order)) + return order class PlanetStackObserver: - sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps,SyncExternalRoutes,SyncUsers,SyncRoles,GarbageCollector] - - def __init__(self): - # The Condition object that gets signalled by Feefie events - self.step_lookup = {} - self.load_sync_steps() - self.event_cond = threading.Condition() - self.driver = OpenStackDriver() - - def wait_for_event(self, timeout): - self.event_cond.acquire() - self.event_cond.wait(timeout) - self.event_cond.release() - - def wake_up(self): - logger.info('Wake up routine called. Event cond %r'%self.event_cond) - self.event_cond.acquire() - self.event_cond.notify() - self.event_cond.release() - - def load_sync_steps(self): - dep_path = Config().observer_backend_dependency_graph - try: - # This contains dependencies between records, not sync steps - self.model_dependency_graph = json.loads(open(dep_path).read()) - except Exception,e: - raise e - - try: - backend_path = Config().observer_pl_dependency_graph - # This contains dependencies between backend records - self.backend_dependency_graph = json.loads(open(backend_path).read()) - except Exception,e: - # We can work without a backend graph - self.backend_dependency_graph = {} - - provides_dict = {} - for s in self.sync_steps: - self.step_lookup[s.__name__] = s - for m in s.provides: - try: - provides_dict[m.__name__].append(s.__name__) - except KeyError: - provides_dict[m.__name__]=[s.__name__] - - - step_graph = {} - for k,v in self.model_dependency_graph.iteritems(): - try: - for source in provides_dict[k]: - for m in v: - try: - for dest in provides_dict[m]: - # no deps, pass - try: - step_graph[source].append(dest) - except: - step_graph[source]=[dest] - except KeyError: - pass - - except KeyError: - pass - # no dependencies, pass - - #import pdb - #pdb.set_trace() - if (self.backend_dependency_graph): - backend_dict = {} - for s in self.sync_steps: - for m in s.serves: - backend_dict[m]=s.__name__ - - for k,v in backend_dependency_graph.iteritems(): - try: - source = backend_dict[k] - for m in v: - try: - dest = backend_dict[m] - except KeyError: - # no deps, pass - pass - step_graph[source]=dest - - except KeyError: - pass - # no dependencies, pass - - dependency_graph = step_graph - - self.ordered_steps = toposort(dependency_graph, map(lambda s:s.__name__,self.sync_steps)) - print "Order of steps=",self.ordered_steps - self.load_run_times() - - - def check_duration(self, step, duration): - try: - if (duration > step.deadline): - logger.info('Sync step %s missed deadline, took %.2f seconds'%(step.name,duration)) - except AttributeError: - # S doesn't have a deadline - pass - - def update_run_time(self, step): - self.last_run_times[step.__name__]=time.time() - - def check_schedule(self, step): - time_since_last_run = time.time() - self.last_run_times.get(step.__name__, 0) - try: - if (time_since_last_run < step.requested_interval): - raise StepNotReady - except AttributeError: - logger.info('Step %s does not have requested_interval set'%step.__name__) - raise StepNotReady - - def load_run_times(self): - try: - jrun_times = open('/tmp/observer_run_times').read() - self.last_run_times = json.loads(jrun_times) - except: - self.last_run_times={} - for e in self.ordered_steps: - self.last_run_times[e]=0 - - - def save_run_times(self): - run_times = json.dumps(self.last_run_times) - open('/tmp/observer_run_times','w').write(run_times) - - def check_class_dependency(self, step, failed_steps): + sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps,SyncExternalRoutes,SyncUsers,SyncRoles,SyncNodes,GarbageCollector] + + def __init__(self): + # The Condition object that gets signalled by Feefie events + self.step_lookup = {} + self.load_sync_steps() + self.event_cond = threading.Condition() + self.driver = OpenStackDriver() + + def wait_for_event(self, timeout): + self.event_cond.acquire() + self.event_cond.wait(timeout) + self.event_cond.release() + + def wake_up(self): + logger.info('Wake up routine called. Event cond %r'%self.event_cond) + self.event_cond.acquire() + self.event_cond.notify() + self.event_cond.release() + + def load_sync_steps(self): + dep_path = Config().observer_backend_dependency_graph + try: + # This contains dependencies between records, not sync steps + self.model_dependency_graph = json.loads(open(dep_path).read()) + except Exception,e: + raise e + + try: + backend_path = Config().observer_pl_dependency_graph + # This contains dependencies between backend records + self.backend_dependency_graph = json.loads(open(backend_path).read()) + except Exception,e: + # We can work without a backend graph + self.backend_dependency_graph = {} + + provides_dict = {} + for s in self.sync_steps: + self.step_lookup[s.__name__] = s + for m in s.provides: + try: + provides_dict[m.__name__].append(s.__name__) + except KeyError: + provides_dict[m.__name__]=[s.__name__] + + + step_graph = {} + for k,v in self.model_dependency_graph.iteritems(): + try: + for source in provides_dict[k]: + for m in v: + try: + for dest in provides_dict[m]: + # no deps, pass + try: + step_graph[source].append(dest) + except: + step_graph[source]=[dest] + except KeyError: + pass + + except KeyError: + pass + # no dependencies, pass + + #import pdb + #pdb.set_trace() + if (self.backend_dependency_graph): + backend_dict = {} + for s in self.sync_steps: + for m in s.serves: + backend_dict[m]=s.__name__ + + for k,v in backend_dependency_graph.iteritems(): + try: + source = backend_dict[k] + for m in v: + try: + dest = backend_dict[m] + except KeyError: + # no deps, pass + pass + step_graph[source]=dest + + except KeyError: + pass + # no dependencies, pass + + dependency_graph = step_graph + + self.ordered_steps = toposort(dependency_graph, map(lambda s:s.__name__,self.sync_steps)) + print "Order of steps=",self.ordered_steps + self.load_run_times() + + + def check_duration(self, step, duration): + try: + if (duration > step.deadline): + logger.info('Sync step %s missed deadline, took %.2f seconds'%(step.name,duration)) + except AttributeError: + # S doesn't have a deadline + pass + + def update_run_time(self, step): + self.last_run_times[step.__name__]=time.time() + + def check_schedule(self, step): + time_since_last_run = time.time() - self.last_run_times.get(step.__name__, 0) + try: + if (time_since_last_run < step.requested_interval): + raise StepNotReady + except AttributeError: + logger.info('Step %s does not have requested_interval set'%step.__name__) + raise StepNotReady + + def load_run_times(self): + try: + jrun_times = open('/tmp/observer_run_times').read() + self.last_run_times = json.loads(jrun_times) + except: + self.last_run_times={} + for e in self.ordered_steps: + self.last_run_times[e]=0 + + + def save_run_times(self): + run_times = json.dumps(self.last_run_times) + open('/tmp/observer_run_times','w').write(run_times) + + def check_class_dependency(self, step, failed_steps): step.dependenices = [] for obj in step.provides: step.dependenices.extend(self.model_dependency_graph.get(obj.__name__, [])) @@ -206,69 +206,69 @@ class PlanetStackObserver: if (failed_step in step.dependencies): raise StepNotReady - def run(self): - if not self.driver.enabled or not self.driver.has_openstack: - return - while True: - try: - logger.info('Waiting for event') - tBeforeWait = time.time() - self.wait_for_event(timeout=30) - logger.info('Observer woke up') - - # Set of whole steps that failed - failed_steps = [] - - # Set of individual objects within steps that failed - failed_step_objects = [] - - for S in self.ordered_steps: - step = self.step_lookup[S] - start_time=time.time() - - sync_step = step(driver=self.driver) - sync_step.__name__ = step.__name__ - sync_step.dependencies = [] - try: - mlist = sync_step.provides - - for m in mlist: - sync_step.dependencies.extend(self.model_dependency_graph[m.__name__]) - except KeyError: - pass - sync_step.debug_mode = debug_mode - - should_run = False - try: - # Various checks that decide whether - # this step runs or not - self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed - self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour - should_run = True - except StepNotReady: - logging.info('Step not ready: %s'%sync_step.__name__) - failed_steps.append(sync_step) - except: - failed_steps.append(sync_step) - - if (should_run): - try: - duration=time.time() - start_time - - # ********* This is the actual sync step - #import pdb - #pdb.set_trace() - failed_objects = sync_step(failed=failed_step_objects) - - - self.check_duration(sync_step, duration) - if failed_objects: - failed_step_objects.extend(failed_objects) - self.update_run_time(sync_step) - except: - raise - failed_steps.append(S) - self.save_run_times() - except: - logger.log_exc("Exception in observer run loop") - traceback.print_exc() + def run(self): + if not self.driver.enabled or not self.driver.has_openstack: + return + while True: + try: + logger.info('Waiting for event') + tBeforeWait = time.time() + self.wait_for_event(timeout=30) + logger.info('Observer woke up') + + # Set of whole steps that failed + failed_steps = [] + + # Set of individual objects within steps that failed + failed_step_objects = [] + + for S in self.ordered_steps: + step = self.step_lookup[S] + start_time=time.time() + + sync_step = step(driver=self.driver) + sync_step.__name__ = step.__name__ + sync_step.dependencies = [] + try: + mlist = sync_step.provides + + for m in mlist: + sync_step.dependencies.extend(self.model_dependency_graph[m.__name__]) + except KeyError: + pass + sync_step.debug_mode = debug_mode + + should_run = False + try: + # Various checks that decide whether + # this step runs or not + self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed + self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour + should_run = True + except StepNotReady: + logging.info('Step not ready: %s'%sync_step.__name__) + failed_steps.append(sync_step) + except: + failed_steps.append(sync_step) + + if (should_run): + try: + duration=time.time() - start_time + + # ********* This is the actual sync step + #import pdb + #pdb.set_trace() + failed_objects = sync_step(failed=failed_step_objects) + + + self.check_duration(sync_step, duration) + if failed_objects: + failed_step_objects.extend(failed_objects) + self.update_run_time(sync_step) + except: + raise + failed_steps.append(S) + self.save_run_times() + except: + logger.log_exc("Exception in observer run loop") + traceback.print_exc() diff --git a/planetstack/observer/steps/__init__.py b/planetstack/observer/steps/__init__.py index 7954426..50405a8 100644 --- a/planetstack/observer/steps/__init__.py +++ b/planetstack/observer/steps/__init__.py @@ -9,4 +9,5 @@ from .sync_sliver_ips import SyncSliverIps from .sync_slivers import SyncSlivers from .sync_users import SyncUsers from .sync_roles import SyncRoles +from .sync_nodes import SyncNodes from .garbage_collector import GarbageCollector diff --git a/planetstack/observer/steps/garbage_collector.py b/planetstack/observer/steps/garbage_collector.py index 5d434a0..a45db8c 100644 --- a/planetstack/observer/steps/garbage_collector.py +++ b/planetstack/observer/steps/garbage_collector.py @@ -16,7 +16,7 @@ class GarbageCollector(OpenStackSyncStep): def call(self, **args): try: - #self.sync_roles() + #self.gc_roles() self.gc_tenants() self.gc_users() self.gc_user_tenant_roles() @@ -208,3 +208,23 @@ class GarbageCollector(OpenStackSyncStep): def gc_external_routes(self): pass + + def gc_nodes(self): + # collect local nodes + nodes = Node.objects.all() + nodes_dict = {} + for node in nodes: + nodes_dict[node.name] = node + + # collect nova nodes: + compute_nodes = self.client.nova.hypervisors.list() + compute_nodes_dict = {} + for compute_node in compute_nodes: + compute_nodes_dict[compute_node.hypervisor_hostname] = compute_node + + # remove old nodes + old_node_names = set(nodes_dict.keys()).difference(compute_nodes_dict.keys()) + Node.objects.filter(name__in=old_node_names).delete() + + def gc_images(self): + pass diff --git a/planetstack/observer/steps/sync_nodes.py b/planetstack/observer/steps/sync_nodes.py new file mode 100644 index 0000000..a1f0803 --- /dev/null +++ b/planetstack/observer/steps/sync_nodes.py @@ -0,0 +1,42 @@ +import os +import base64 +import random +from datetime import datetime +from django.db.models import F, Q +from planetstack.config import Config +from observer.openstacksyncstep import OpenStackSyncStep +from core.models.node import Node +from core.models.deployment import Deployment +from core.models.site import Site + +class SyncNodes(OpenStackSyncStep): + provides=[Node] + requested_interval=0 + + def fetch_pending(self): + config = Config() + deployment = Deployment.objects.filter(name=config.plc_deployment)[0] + login_bases = ['princeton', 'stanford', 'gt', 'uw', 'mpisws'] + sites = Site.objects.filter(login_base__in=login_bases) + + # collect local nodes + nodes = Node.objects.all() + node_hostnames = [node.name for node in nodes] + + # collect nova nodes + # generate list of new nodes + new_nodes = [] + compute_nodes = self.driver.shell.nova.hypervisors.list() + for compute_node in compute_nodes: + if compute_node.hypervisor_hostname not in node_hostnames: + # pick a random site to add the node to for now + site_index = random.randint(0, len(sites)) + node = Node(name=compute_node.hypervisor_hostname, + site=sites[site_index], deployment=deployment) + new_nodes.append(node) + + return new_nodes + + def sync_record(self, node): + node.save() + diff --git a/planetstack/plstackapi_config b/planetstack/plstackapi_config index 6e0b26c..a716460 100644 --- a/planetstack/plstackapi_config +++ b/planetstack/plstackapi_config @@ -1,5 +1,6 @@ [plc] name=plc +deployment=plc [db] name=planetstack -- 2.43.0