X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fexecution%2Fec.py;h=58ece229e25b4fe8076376c46587806c607a2742;hb=d8144cd833c3a8e82d9580655787b491e768e4f8;hp=2c6557ffd99c709e43f2c32ac5ce1e61a7e09baa;hpb=d5b781271af50ba526332809bc632fe17ef6d5e5;p=nepi.git diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index 2c6557ff..58ece229 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -26,6 +26,7 @@ from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus from nepi.execution.trace import TraceAttr from nepi.util.serializer import ECSerializer, SFormats from nepi.util.plotter import ECPlotter, PFormats +from nepi.util.netgraph import NetGraph, TopologyType # TODO: use multiprocessing instead of threading # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode) @@ -34,6 +35,7 @@ import functools import logging import os import sys +import tempfile import time import threading import weakref @@ -88,7 +90,8 @@ class ECState(object): """ RUNNING = 1 FAILED = 2 - TERMINATED = 3 + RELEASED = 3 + TERMINATED = 4 class ExperimentController(object): """ @@ -100,7 +103,7 @@ class ExperimentController(object): .. note:: An experiment, or scenario, is defined by a concrete set of resources, - behavior, configuration and interconnection of those resources. + and the behavior, configuration and interconnection of those resources. The Experiment Description (ED) is a detailed representation of a single experiment. It contains all the necessary information to allow repeating the experiment. NEPI allows to describe @@ -115,7 +118,7 @@ class ExperimentController(object): recreated (and re-run) by instantiating an EC and recreating the same experiment description. - In NEPI, an experiment is represented as a graph of interconnected + An experiment is represented as a graph of interconnected resources. A resource is a generic concept in the sense that any component taking part of an experiment, whether physical of virtual, is considered a resource. A resources could be a host, @@ -125,10 +128,9 @@ class ExperimentController(object): single resource. ResourceManagers are specific to a resource type (i.e. An RM to control a Linux application will not be the same as the RM used to control a ns-3 simulation). - To support a new type of resource in NEPI, a new RM must be - implemented. NEPI already provides a variety of - RMs to control basic resources, and new can be extended from - the existing ones. + To support a new type of resource, a new RM must be implemented. + NEPI already provides a variety of RMs to control basic resources, + and new can be extended from the existing ones. Through the EC interface the user can create ResourceManagers (RMs), configure them and interconnect them, to describe an experiment. @@ -150,7 +152,7 @@ class ExperimentController(object): exp_id, which can be re-used in different ExperimentController, and the run_id, which is unique to one ExperimentController instance, and is automatically generated by NEPI. - + """ @classmethod @@ -159,7 +161,31 @@ class ExperimentController(object): ec = serializer.load(filepath) return ec - def __init__(self, exp_id = None): + def __init__(self, exp_id = None, local_dir = None, persist = False, + add_node_callback = None, add_edge_callback = None, **kwargs): + """ ExperimentController entity to model an execute a network + experiment. + + :param exp_id: Human readable name to identify the experiment + :type exp_id: str + + :param local_dir: Path to local directory where to store experiment + related files + :type local_dir: str + + :param persist: Save an XML description of the experiment after + completion at local_dir + :type persist: bool + + :param add_node_callback: Callback to invoke for node instantiation + when automatic topology creation mode is used + :type add_node_callback: function + + :param add_edge_callback: Callback to invoke for edge instantiation + when automatic topology creation mode is used + :type add_edge_callback: function + + """ super(ExperimentController, self).__init__() # Logging @@ -177,6 +203,17 @@ class ExperimentController(object): # resources used, etc) self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex') + # Local path where to store experiment related files (results, etc) + if not local_dir: + local_dir = tempfile.gettempdir() # /tmp + + self._local_dir = local_dir + self._exp_dir = os.path.join(local_dir, self.exp_id) + self._run_dir = os.path.join(self.exp_dir, self.run_id) + + # If True persist the experiment controller in XML format, after completion + self._persist = persist + # generator of globally unique ids self._guid_generator = guid.GuidGenerator() @@ -206,6 +243,12 @@ class ExperimentController(object): # EC state self._state = ECState.RUNNING + # Automatically construct experiment description + self._netgraph = None + if add_node_callback or add_edge_callback or kwargs.get("topology"): + self._build_from_netgraph(add_node_callback, add_edge_callback, + **kwargs) + # The runner is a pool of threads used to parallelize # execution of tasks self._nthreads = 20 @@ -261,7 +304,45 @@ class ExperimentController(object): """ return self._nthreads - + @property + def local_dir(self): + """ Root local directory for experiment files + + """ + return self._local_dir + + @property + def exp_dir(self): + """ Local directory to store results and other files related to the + experiment. + + """ + return self._exp_dir + + @property + def run_dir(self): + """ Local directory to store results and other files related to the + experiment run. + + """ + return self._run_dir + + @property + def persist(self): + """ If True, persists the ExperimentController to XML format upon + experiment completion + + """ + return self._persist + + @property + def netgraph(self): + """ Return NetGraph instance if experiment description was automatically + generated + + """ + return self._netgraph + @property def abort(self): """ Returns True if the experiment has failed and should be interrupted, @@ -394,8 +475,16 @@ class ExperimentController(object): return sec def save(self, dirpath = None, format = SFormats.XML): + if dirpath == None: + dirpath = self.run_dir + + try: + os.makedirs(dirpath) + except OSError: + pass + serializer = ECSerializer() - path = serializer.save(self, dirpath = None, format = format) + path = serializer.save(self, dirpath, format = format) return path def get_task(self, tid): @@ -422,7 +511,7 @@ class ExperimentController(object): return rm def get_resources_by_type(self, rtype): - """ Returns a registered ResourceManager by its guid + """ Returns the ResourceManager objects of type rtype :param rtype: Resource type :type rtype: string @@ -432,7 +521,7 @@ class ExperimentController(object): """ rms = [] for guid, rm in self._resources.iteritems(): - if rm.get_rtype() == type: + if rm.get_rtype() == rtype: rms.append(rm) return rms @@ -441,16 +530,31 @@ class ExperimentController(object): @property def resources(self): - """ Returns the set() of guids of all the ResourceManager + """ Returns the guids of all ResourceManagers :return: Set of all RM guids - :rtype: set + :rtype: list """ keys = self._resources.keys() return keys + def filter_resources(self, rtype): + """ Returns the guids of all ResourceManagers of type rtype + + :param rtype: Resource type + :type rtype: string + + :rtype: list of guids + + """ + rms = [] + for guid, rm in self._resources.iteritems(): + if rm.get_rtype() == rtype: + rms.append(rm.guid) + return rms + def register_resource(self, rtype, guid = None): """ Registers a new ResourceManager of type 'rtype' in the experiment @@ -930,6 +1034,9 @@ class ExperimentController(object): :type guids: list """ + if self._state == ECState.RELEASED: + return + if isinstance(guids, int): guids = [guids] @@ -942,9 +1049,15 @@ class ExperimentController(object): self.wait_released(guids) + if self.persist: + self.save() + for guid in guids: if self.get(guid, "hardRelease"): - self.remove_resource(guid) + self.remove_resource(guid)\ + + # Mark the EC state as RELEASED + self._state = ECState.RELEASED def shutdown(self): """ Releases all resources and stops the ExperimentController @@ -1125,3 +1238,19 @@ class ExperimentController(object): self._cond.notify() self._cond.release() + def _build_from_netgraph(self, add_node_callback, add_edge_callback, + **kwargs): + """ Automates experiment description using a NetGraph instance. + """ + self._netgraph = NetGraph(**kwargs) + + if add_node_callback: + ### Add resources to the EC + for nid in self.netgraph.nodes(): + add_node_callback(self, nid) + + if add_edge_callback: + #### Add connections between resources + for nid1, nid2 in self.netgraph.edges(): + add_edge_callback(self, nid1, nid2) +