# Copyright (C) 2013 INRIA
#
# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
+# it under the terms of the GNU General Public License version 2 as
+# published by the Free Software Foundation;
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
#
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
-from nepi.util import guid
+from six import next
+
from nepi.util.parallel import ParallelRun
from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat
from nepi.execution.resource import ResourceFactory, ResourceAction, \
from nepi.execution.trace import TraceAttr
from nepi.util.serializer import ECSerializer, SFormats
from nepi.util.plotter import ECPlotter, PFormats
+from nepi.util.netgraph import NetGraph, TopologyType
# TODO: use multiprocessing instead of threading
# TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
import weakref
class FailureLevel(object):
- """ Describes the system failure state """
+ """ Possible failure states for the experiment """
OK = 1
RM_FAILURE = 2
EC_FAILURE = 3
class FailureManager(object):
""" The FailureManager is responsible for handling errors
and deciding whether an experiment should be aborted or not
-
"""
- def __init__(self, ec):
- self._ec = weakref.ref(ec)
+ def __init__(self):
+ self._ec = None
self._failure_level = FailureLevel.OK
self._abort = False
+ def set_ec(self, ec):
+ self._ec = weakref.ref(ec)
+
@property
def ec(self):
""" Returns the ExperimentController associated to this FailureManager
-
"""
-
return self._ec()
@property
return self._abort
def eval_failure(self, guid):
+ """ Implements failure policy and sets the abort state of the
+ experiment based on the failure state and criticality of
+ the RM
+
+ :param guid: Guid of the RM upon which the failure of the experiment
+ is evaluated
+ :type guid: int
+
+ """
if self._failure_level == FailureLevel.OK:
rm = self.ec.get_resource(guid)
state = rm.state
self._failure_level = FailureLevel.EC_FAILURE
class ECState(object):
- """ Possible states for an ExperimentController
+ """ Possible states of the ExperimentController
"""
RUNNING = 1
FAILED = 2
- TERMINATED = 3
+ RELEASED = 3
+ TERMINATED = 4
+
+# historical note: this class used to be in util/guid.py but is used only here
+# FIXME: This class is not thread-safe. Should it be made thread-safe?
+class GuidGenerator(object):
+ def __init__(self):
+ self._last_guid = 0
+
+ # historical note: this used to be called `next`
+ # which confused 2to3 - and me - while it has
+ # nothing to do at all with the iteration protocol
+ def generate(self, guid = None):
+ if guid == None:
+ guid = self._last_guid + 1
+
+ self._last_guid = self._last_guid if guid <= self._last_guid else guid
+
+ return guid
class ExperimentController(object):
"""
- .. class:: Class Args :
-
- :param exp_id: Human readable identifier for the experiment scenario.
- :type exp_id: str
-
.. note::
An experiment, or scenario, is defined by a concrete set of resources,
- behavior, configuration and interconnection of those resources.
+ and the behavior, configuration and interconnection of those resources.
The Experiment Description (ED) is a detailed representation of a
single experiment. It contains all the necessary information to
allow repeating the experiment. NEPI allows to describe
recreated (and re-run) by instantiating an EC and recreating
the same experiment description.
- In NEPI, an experiment is represented as a graph of interconnected
+ An experiment is represented as a graph of interconnected
resources. A resource is a generic concept in the sense that any
component taking part of an experiment, whether physical of
virtual, is considered a resource. A resources could be a host,
single resource. ResourceManagers are specific to a resource
type (i.e. An RM to control a Linux application will not be
the same as the RM used to control a ns-3 simulation).
- To support a new type of resource in NEPI, a new RM must be
- implemented. NEPI already provides a variety of
- RMs to control basic resources, and new can be extended from
- the existing ones.
+ To support a new type of resource, a new RM must be implemented.
+ NEPI already provides a variety of RMs to control basic resources,
+ and new can be extended from the existing ones.
Through the EC interface the user can create ResourceManagers (RMs),
configure them and interconnect them, to describe an experiment.
ec = serializer.load(filepath)
return ec
- def __init__(self, exp_id = None, local_dir = None, persist = False):
- """ ExperimentController entity to model an execute a network experiment.
+ def __init__(self, exp_id = None, local_dir = None, persist = False,
+ fm = None, add_node_callback = None, add_edge_callback = None,
+ **kwargs):
+ """ ExperimentController entity to model an execute a network
+ experiment.
:param exp_id: Human readable name to identify the experiment
- :type name: str
+ :type exp_id: str
:param local_dir: Path to local directory where to store experiment
related files
- :type name: str
+ :type local_dir: str
:param persist: Save an XML description of the experiment after
completion at local_dir
- :type name: bool
+ :type persist: bool
- """
+ :param fm: FailureManager object. If None is given, the default
+ FailureManager class will be used
+ :type fm: FailureManager
+
+ :param add_node_callback: Callback to invoke for node instantiation
+ when automatic topology creation mode is used
+ :type add_node_callback: function
+ :param add_edge_callback: Callback to invoke for edge instantiation
+ when automatic topology creation mode is used
+ :type add_edge_callback: function
+
+ """
super(ExperimentController, self).__init__()
# Logging
# Local path where to store experiment related files (results, etc)
if not local_dir:
- local_dir = tempfile.mkdtemp()
+ local_dir = tempfile.gettempdir() # /tmp
self._local_dir = local_dir
self._exp_dir = os.path.join(local_dir, self.exp_id)
self._persist = persist
# generator of globally unique ids
- self._guid_generator = guid.GuidGenerator()
+ self._guid_generator = GuidGenerator()
# Resource managers
self._resources = dict()
self._groups = dict()
# generator of globally unique id for groups
- self._group_id_generator = guid.GuidGenerator()
+ self._group_id_generator = GuidGenerator()
# Flag to stop processing thread
self._stop = False
# Entity in charge of managing system failures
- self._fm = FailureManager(self)
+ if not fm:
+ self._fm = FailureManager()
+ self._fm.set_ec(self)
# EC state
self._state = ECState.RUNNING
+ # Automatically construct experiment description
+ self._netgraph = None
+ if add_node_callback or add_edge_callback or kwargs.get("topology"):
+ self._build_from_netgraph(add_node_callback, add_edge_callback,
+ **kwargs)
+
# The runner is a pool of threads used to parallelize
# execution of tasks
self._nthreads = 20
"""
return self._logger
+ @property
+ def fm(self):
+ """ Returns the failure manager
+
+ """
+
+ return self._fm
+
@property
def failure_level(self):
""" Returns the level of FAILURE of th experiment
@property
def persist(self):
- """ If Trie persist the ExperimentController to XML format upon completion
+ """ If True, persists the ExperimentController to XML format upon
+ experiment completion
"""
return self._persist
+ @property
+ def netgraph(self):
+ """ Return NetGraph instance if experiment description was automatically
+ generated
+
+ """
+ return self._netgraph
+
@property
def abort(self):
""" Returns True if the experiment has failed and should be interrupted,
return sec
def save(self, dirpath = None, format = SFormats.XML):
+ if dirpath == None:
+ dirpath = self.run_dir
+
+ try:
+ os.makedirs(dirpath)
+ except OSError:
+ pass
+
serializer = ECSerializer()
- path = serializer.save(self, dirpath = None, format = format)
+ path = serializer.save(self, dirpath, format = format)
return path
def get_task(self, tid):
return rm
def get_resources_by_type(self, rtype):
- """ Returns a registered ResourceManager by its guid
+ """ Returns the ResourceManager objects of type rtype
:param rtype: Resource type
:type rtype: string
"""
rms = []
- for guid, rm in self._resources.iteritems():
- if rm.get_rtype() == type:
+ for guid, rm in self._resources.items():
+ if rm.get_rtype() == rtype:
rms.append(rm)
return rms
@property
def resources(self):
- """ Returns the set() of guids of all the ResourceManager
+ """ Returns the guids of all ResourceManagers
:return: Set of all RM guids
- :rtype: set
+ :rtype: list
"""
- keys = self._resources.keys()
+ keys = list(self._resources.keys())
return keys
+ def filter_resources(self, rtype):
+ """ Returns the guids of all ResourceManagers of type rtype
+
+ :param rtype: Resource type
+ :type rtype: string
+
+ :rtype: list of guids
+
+ """
+ rms = []
+ for guid, rm in self._resources.items():
+ if rm.get_rtype() == rtype:
+ rms.append(rm.guid)
+ return rms
+
def register_resource(self, rtype, guid = None):
""" Registers a new ResourceManager of type 'rtype' in the experiment
"""
# Get next available guid
- guid = self._guid_generator.next(guid)
+ # xxx_next_hiccup
+ guid = self._guid_generator.generate(guid)
# Instantiate RM
rm = ResourceFactory.create(rtype, self, guid)
if not guids:
# If no guids list was passed, all 'NEW' RMs will be deployed
guids = []
- for guid, rm in self._resources.iteritems():
+ for guid, rm in self._resources.items():
if rm.state == ResourceState.NEW:
guids.append(guid)
new_group = False
if not group:
new_group = True
- group = self._group_id_generator.next()
+ # xxx_next_hiccup
+ group = self._group_id_generator.generate()
if group not in self._groups:
self._groups[group] = []
:type guids: list
"""
+ if self._state == ECState.RELEASED:
+ return
+
if isinstance(guids, int):
guids = [guids]
self.wait_released(guids)
if self.persist:
- self.save(dirpath = self.run_dir)
+ self.save()
for guid in guids:
if self.get(guid, "hardRelease"):
- self.remove_resource(guid)
+ self.remove_resource(guid)\
+
+ # Mark the EC state as RELEASED
+ self._state = ECState.RELEASED
def shutdown(self):
""" Releases all resources and stops the ExperimentController
try:
self._cond.acquire()
- task = self._scheduler.next()
+ task = next(self._scheduler)
if not task:
# No task to execute. Wait for a new task to be scheduled.
self._cond.notify()
self._cond.release()
+ def _build_from_netgraph(self, add_node_callback, add_edge_callback,
+ **kwargs):
+ """ Automates experiment description using a NetGraph instance.
+ """
+ self._netgraph = NetGraph(**kwargs)
+
+ if add_node_callback:
+ ### Add resources to the EC
+ for nid in self.netgraph.nodes():
+ add_node_callback(self, nid)
+
+ if add_edge_callback:
+ #### Add connections between resources
+ for nid1, nid2 in self.netgraph.edges():
+ add_edge_callback(self, nid1, nid2)
+