X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fexecution%2Fec.py;h=5f34f8268c0da3fc35bf16d7fc21c782bd586bda;hb=8ee426bdd9f75fc977cb40281f95c6eebc6c44c9;hp=2c6557ffd99c709e43f2c32ac5ce1e61a7e09baa;hpb=d5b781271af50ba526332809bc632fe17ef6d5e5;p=nepi.git diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index 2c6557ff..5f34f826 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -3,9 +3,8 @@ # Copyright (C) 2013 INRIA # # This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation; # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -17,7 +16,8 @@ # # Author: Alina Quereilhac -from nepi.util import guid +from six import next + from nepi.util.parallel import ParallelRun from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat from nepi.execution.resource import ResourceFactory, ResourceAction, \ @@ -26,6 +26,7 @@ from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus from nepi.execution.trace import TraceAttr from nepi.util.serializer import ECSerializer, SFormats from nepi.util.plotter import ECPlotter, PFormats +from nepi.util.netgraph import NetGraph, TopologyType # TODO: use multiprocessing instead of threading # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode) @@ -34,12 +35,13 @@ import functools import logging import os import sys +import tempfile import time import threading import weakref class FailureLevel(object): - """ Describes the system failure state """ + """ Possible failure states for the experiment """ OK = 1 RM_FAILURE = 2 EC_FAILURE = 3 @@ -47,20 +49,20 @@ class FailureLevel(object): class FailureManager(object): """ The FailureManager is responsible for handling errors and deciding whether an experiment should be aborted or not - """ - def __init__(self, ec): - self._ec = weakref.ref(ec) + def __init__(self): + self._ec = None self._failure_level = FailureLevel.OK self._abort = False + def set_ec(self, ec): + self._ec = weakref.ref(ec) + @property def ec(self): """ Returns the ExperimentController associated to this FailureManager - """ - return self._ec() @property @@ -68,6 +70,15 @@ class FailureManager(object): return self._abort def eval_failure(self, guid): + """ Implements failure policy and sets the abort state of the + experiment based on the failure state and criticality of + the RM + + :param guid: Guid of the RM upon which the failure of the experiment + is evaluated + :type guid: int + + """ if self._failure_level == FailureLevel.OK: rm = self.ec.get_resource(guid) state = rm.state @@ -83,24 +94,37 @@ class FailureManager(object): self._failure_level = FailureLevel.EC_FAILURE class ECState(object): - """ Possible states for an ExperimentController + """ Possible states of the ExperimentController """ RUNNING = 1 FAILED = 2 - TERMINATED = 3 + RELEASED = 3 + TERMINATED = 4 + +# historical note: this class used to be in util/guid.py but is used only here +# FIXME: This class is not thread-safe. Should it be made thread-safe? +class GuidGenerator(object): + def __init__(self): + self._last_guid = 0 + + # historical note: this used to be called `next` + # which confused 2to3 - and me - while it has + # nothing to do at all with the iteration protocol + def generate(self, guid = None): + if guid == None: + guid = self._last_guid + 1 + + self._last_guid = self._last_guid if guid <= self._last_guid else guid + + return guid class ExperimentController(object): """ - .. class:: Class Args : - - :param exp_id: Human readable identifier for the experiment scenario. - :type exp_id: str - .. note:: An experiment, or scenario, is defined by a concrete set of resources, - behavior, configuration and interconnection of those resources. + and the behavior, configuration and interconnection of those resources. The Experiment Description (ED) is a detailed representation of a single experiment. It contains all the necessary information to allow repeating the experiment. NEPI allows to describe @@ -115,7 +139,7 @@ class ExperimentController(object): recreated (and re-run) by instantiating an EC and recreating the same experiment description. - In NEPI, an experiment is represented as a graph of interconnected + An experiment is represented as a graph of interconnected resources. A resource is a generic concept in the sense that any component taking part of an experiment, whether physical of virtual, is considered a resource. A resources could be a host, @@ -125,10 +149,9 @@ class ExperimentController(object): single resource. ResourceManagers are specific to a resource type (i.e. An RM to control a Linux application will not be the same as the RM used to control a ns-3 simulation). - To support a new type of resource in NEPI, a new RM must be - implemented. NEPI already provides a variety of - RMs to control basic resources, and new can be extended from - the existing ones. + To support a new type of resource, a new RM must be implemented. + NEPI already provides a variety of RMs to control basic resources, + and new can be extended from the existing ones. Through the EC interface the user can create ResourceManagers (RMs), configure them and interconnect them, to describe an experiment. @@ -150,7 +173,7 @@ class ExperimentController(object): exp_id, which can be re-used in different ExperimentController, and the run_id, which is unique to one ExperimentController instance, and is automatically generated by NEPI. - + """ @classmethod @@ -159,7 +182,36 @@ class ExperimentController(object): ec = serializer.load(filepath) return ec - def __init__(self, exp_id = None): + def __init__(self, exp_id = None, local_dir = None, persist = False, + fm = None, add_node_callback = None, add_edge_callback = None, + **kwargs): + """ ExperimentController entity to model an execute a network + experiment. + + :param exp_id: Human readable name to identify the experiment + :type exp_id: str + + :param local_dir: Path to local directory where to store experiment + related files + :type local_dir: str + + :param persist: Save an XML description of the experiment after + completion at local_dir + :type persist: bool + + :param fm: FailureManager object. If None is given, the default + FailureManager class will be used + :type fm: FailureManager + + :param add_node_callback: Callback to invoke for node instantiation + when automatic topology creation mode is used + :type add_node_callback: function + + :param add_edge_callback: Callback to invoke for edge instantiation + when automatic topology creation mode is used + :type add_edge_callback: function + + """ super(ExperimentController, self).__init__() # Logging @@ -177,8 +229,19 @@ class ExperimentController(object): # resources used, etc) self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex') + # Local path where to store experiment related files (results, etc) + if not local_dir: + local_dir = tempfile.gettempdir() # /tmp + + self._local_dir = local_dir + self._exp_dir = os.path.join(local_dir, self.exp_id) + self._run_dir = os.path.join(self.exp_dir, self.run_id) + + # If True persist the experiment controller in XML format, after completion + self._persist = persist + # generator of globally unique ids - self._guid_generator = guid.GuidGenerator() + self._guid_generator = GuidGenerator() # Resource managers self._resources = dict() @@ -195,17 +258,25 @@ class ExperimentController(object): self._groups = dict() # generator of globally unique id for groups - self._group_id_generator = guid.GuidGenerator() + self._group_id_generator = GuidGenerator() # Flag to stop processing thread self._stop = False # Entity in charge of managing system failures - self._fm = FailureManager(self) + if not fm: + self._fm = FailureManager() + self._fm.set_ec(self) # EC state self._state = ECState.RUNNING + # Automatically construct experiment description + self._netgraph = None + if add_node_callback or add_edge_callback or kwargs.get("topology"): + self._build_from_netgraph(add_node_callback, add_edge_callback, + **kwargs) + # The runner is a pool of threads used to parallelize # execution of tasks self._nthreads = 20 @@ -224,6 +295,14 @@ class ExperimentController(object): """ return self._logger + @property + def fm(self): + """ Returns the failure manager + + """ + + return self._fm + @property def failure_level(self): """ Returns the level of FAILURE of th experiment @@ -261,7 +340,45 @@ class ExperimentController(object): """ return self._nthreads - + @property + def local_dir(self): + """ Root local directory for experiment files + + """ + return self._local_dir + + @property + def exp_dir(self): + """ Local directory to store results and other files related to the + experiment. + + """ + return self._exp_dir + + @property + def run_dir(self): + """ Local directory to store results and other files related to the + experiment run. + + """ + return self._run_dir + + @property + def persist(self): + """ If True, persists the ExperimentController to XML format upon + experiment completion + + """ + return self._persist + + @property + def netgraph(self): + """ Return NetGraph instance if experiment description was automatically + generated + + """ + return self._netgraph + @property def abort(self): """ Returns True if the experiment has failed and should be interrupted, @@ -394,8 +511,16 @@ class ExperimentController(object): return sec def save(self, dirpath = None, format = SFormats.XML): + if dirpath == None: + dirpath = self.run_dir + + try: + os.makedirs(dirpath) + except OSError: + pass + serializer = ECSerializer() - path = serializer.save(self, dirpath = None, format = format) + path = serializer.save(self, dirpath, format = format) return path def get_task(self, tid): @@ -422,7 +547,7 @@ class ExperimentController(object): return rm def get_resources_by_type(self, rtype): - """ Returns a registered ResourceManager by its guid + """ Returns the ResourceManager objects of type rtype :param rtype: Resource type :type rtype: string @@ -431,8 +556,8 @@ class ExperimentController(object): """ rms = [] - for guid, rm in self._resources.iteritems(): - if rm.get_rtype() == type: + for guid, rm in self._resources.items(): + if rm.get_rtype() == rtype: rms.append(rm) return rms @@ -441,17 +566,32 @@ class ExperimentController(object): @property def resources(self): - """ Returns the set() of guids of all the ResourceManager + """ Returns the guids of all ResourceManagers :return: Set of all RM guids - :rtype: set + :rtype: list """ - keys = self._resources.keys() + keys = list(self._resources.keys()) return keys - def register_resource(self, rtype, guid = None): + def filter_resources(self, rtype): + """ Returns the guids of all ResourceManagers of type rtype + + :param rtype: Resource type + :type rtype: string + + :rtype: list of guids + + """ + rms = [] + for guid, rm in self._resources.items(): + if rm.get_rtype() == rtype: + rms.append(rm.guid) + return rms + + def register_resource(self, rtype, guid = None, **keywords): """ Registers a new ResourceManager of type 'rtype' in the experiment This method will assign a new 'guid' for the RM, if no guid @@ -465,7 +605,8 @@ class ExperimentController(object): """ # Get next available guid - guid = self._guid_generator.next(guid) + # xxx_next_hiccup + guid = self._guid_generator.generate(guid) # Instantiate RM rm = ResourceFactory.create(rtype, self, guid) @@ -473,6 +614,18 @@ class ExperimentController(object): # Store RM self._resources[guid] = rm + ### so we can do something like + # node = ec.register_resource("linux::Node", + # username = user, + # hostname = host) + ### instead of + # node = ec.register_resource("linux::Node") + # ec.set(node, "username", user) + # ec.set(node, "hostname", host) + + for name, value in keywords.items(): + self.set(guid, name, value) + return guid def get_attributes(self, guid): @@ -851,7 +1004,7 @@ class ExperimentController(object): if not guids: # If no guids list was passed, all 'NEW' RMs will be deployed guids = [] - for guid, rm in self._resources.iteritems(): + for guid, rm in self._resources.items(): if rm.state == ResourceState.NEW: guids.append(guid) @@ -863,7 +1016,8 @@ class ExperimentController(object): new_group = False if not group: new_group = True - group = self._group_id_generator.next() + # xxx_next_hiccup + group = self._group_id_generator.generate() if group not in self._groups: self._groups[group] = [] @@ -930,6 +1084,9 @@ class ExperimentController(object): :type guids: list """ + if self._state == ECState.RELEASED: + return + if isinstance(guids, int): guids = [guids] @@ -942,9 +1099,15 @@ class ExperimentController(object): self.wait_released(guids) + if self.persist: + self.save() + for guid in guids: if self.get(guid, "hardRelease"): - self.remove_resource(guid) + self.remove_resource(guid)\ + + # Mark the EC state as RELEASED + self._state = ECState.RELEASED def shutdown(self): """ Releases all resources and stops the ExperimentController @@ -1055,7 +1218,7 @@ class ExperimentController(object): try: self._cond.acquire() - task = self._scheduler.next() + task = next(self._scheduler) if not task: # No task to execute. Wait for a new task to be scheduled. @@ -1125,3 +1288,19 @@ class ExperimentController(object): self._cond.notify() self._cond.release() + def _build_from_netgraph(self, add_node_callback, add_edge_callback, + **kwargs): + """ Automates experiment description using a NetGraph instance. + """ + self._netgraph = NetGraph(**kwargs) + + if add_node_callback: + ### Add resources to the EC + for nid in self.netgraph.nodes(): + add_node_callback(self, nid) + + if add_edge_callback: + #### Add connections between resources + for nid1, nid2 in self.netgraph.edges(): + add_edge_callback(self, nid1, nid2) +