2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24 ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27 from nepi.util.serializer import ECSerializer, SFormats
28 from nepi.util.plotter import ECPlotter, PFormats
29 from nepi.util.netgraph import NetGraph, TopologyType
31 # TODO: use multiprocessing instead of threading
32 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
43 class FailureLevel(object):
44 """ Describes the system failure state """
49 class FailureManager(object):
50 """ The FailureManager is responsible for handling errors
51 and deciding whether an experiment should be aborted or not
55 def __init__(self, ec):
56 self._ec = weakref.ref(ec)
57 self._failure_level = FailureLevel.OK
62 """ Returns the ExperimentController associated to this FailureManager
72 def eval_failure(self, guid):
73 if self._failure_level == FailureLevel.OK:
74 rm = self.ec.get_resource(guid)
76 critical = rm.get("critical")
78 if state == ResourceState.FAILED and critical:
79 self._failure_level = FailureLevel.RM_FAILURE
81 self.ec.logger.debug("RM critical failure occurred on guid %d." \
82 " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
84 def set_ec_failure(self):
85 self._failure_level = FailureLevel.EC_FAILURE
87 class ECState(object):
88 """ Possible states for an ExperimentController
96 class ExperimentController(object):
98 .. class:: Class Args :
100 :param exp_id: Human readable identifier for the experiment scenario.
105 An experiment, or scenario, is defined by a concrete set of resources,
106 and the behavior, configuration and interconnection of those resources.
107 The Experiment Description (ED) is a detailed representation of a
108 single experiment. It contains all the necessary information to
109 allow repeating the experiment. NEPI allows to describe
110 experiments by registering components (resources), configuring them
111 and interconnecting them.
113 A same experiment (scenario) can be executed many times, generating
114 different results. We call an experiment execution (instance) a 'run'.
116 The ExperimentController (EC), is the entity responsible of
117 managing an experiment run. The same scenario can be
118 recreated (and re-run) by instantiating an EC and recreating
119 the same experiment description.
121 An experiment is represented as a graph of interconnected
122 resources. A resource is a generic concept in the sense that any
123 component taking part of an experiment, whether physical of
124 virtual, is considered a resource. A resources could be a host,
125 a virtual machine, an application, a simulator, a IP address.
127 A ResourceManager (RM), is the entity responsible for managing a
128 single resource. ResourceManagers are specific to a resource
129 type (i.e. An RM to control a Linux application will not be
130 the same as the RM used to control a ns-3 simulation).
131 To support a new type of resource, a new RM must be implemented.
132 NEPI already provides a variety of RMs to control basic resources,
133 and new can be extended from the existing ones.
135 Through the EC interface the user can create ResourceManagers (RMs),
136 configure them and interconnect them, to describe an experiment.
137 Describing an experiment through the EC does not run the experiment.
138 Only when the 'deploy()' method is invoked on the EC, the EC will take
139 actions to transform the 'described' experiment into a 'running' experiment.
141 While the experiment is running, it is possible to continue to
142 create/configure/connect RMs, and to deploy them to involve new
143 resources in the experiment (this is known as 'interactive' deployment).
145 An experiments in NEPI is identified by a string id,
146 which is either given by the user, or automatically generated by NEPI.
147 The purpose of this identifier is to separate files and results that
148 belong to different experiment scenarios.
149 However, since a same 'experiment' can be run many times, the experiment
150 id is not enough to identify an experiment instance (run).
151 For this reason, the ExperimentController has two identifier, the
152 exp_id, which can be re-used in different ExperimentController,
153 and the run_id, which is unique to one ExperimentController instance, and
154 is automatically generated by NEPI.
159 def load(cls, filepath, format = SFormats.XML):
160 serializer = ECSerializer()
161 ec = serializer.load(filepath)
164 def __init__(self, exp_id = None, local_dir = None, persist = False,
165 add_node_callback = None, add_edge_callback = None, **kwargs):
166 """ ExperimentController entity to model an execute a network
169 :param exp_id: Human readable name to identify the experiment
172 :param local_dir: Path to local directory where to store experiment
176 :param persist: Save an XML description of the experiment after
177 completion at local_dir
180 :param add_node_callback: Callback to invoke for node instantiation
181 when automatic topology creation mode is used
182 :type add_node_callback: function
184 :param add_edge_callback: Callback to invoke for edge instantiation
185 when automatic topology creation mode is used
186 :type add_edge_callback: function
189 super(ExperimentController, self).__init__()
192 self._logger = logging.getLogger("ExperimentController")
194 # Run identifier. It identifies a concrete execution instance (run)
196 # Since a same experiment (same configuration) can be executed many
197 # times, this run_id permits to separate result files generated on
198 # different experiment executions
199 self._run_id = tsformat()
201 # Experiment identifier. Usually assigned by the user
202 # Identifies the experiment scenario (i.e. configuration,
203 # resources used, etc)
204 self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
206 # Local path where to store experiment related files (results, etc)
208 local_dir = tempfile.gettempdir() # /tmp
210 self._local_dir = local_dir
211 self._exp_dir = os.path.join(local_dir, self.exp_id)
212 self._run_dir = os.path.join(self.exp_dir, self.run_id)
214 # If True persist the experiment controller in XML format, after completion
215 self._persist = persist
217 # generator of globally unique ids
218 self._guid_generator = guid.GuidGenerator()
221 self._resources = dict()
223 # Scheduler. It a queue that holds tasks scheduled for
224 # execution, and yields the next task to be executed
225 # ordered by execution and arrival time
226 self._scheduler = HeapScheduler()
231 # RM groups (for deployment)
232 self._groups = dict()
234 # generator of globally unique id for groups
235 self._group_id_generator = guid.GuidGenerator()
237 # Flag to stop processing thread
240 # Entity in charge of managing system failures
241 self._fm = FailureManager(self)
244 self._state = ECState.RUNNING
246 # Automatically construct experiment description
247 self._netgraph = None
248 if add_node_callback or add_edge_callback or kwargs.get("topology"):
249 self._build_from_netgraph(add_node_callback, add_edge_callback,
252 # The runner is a pool of threads used to parallelize
257 # Event processing thread
258 self._cond = threading.Condition()
259 self._thread = threading.Thread(target = self._process)
260 self._thread.setDaemon(True)
265 """ Returns the logger instance of the Experiment Controller
271 def failure_level(self):
272 """ Returns the level of FAILURE of th experiment
276 return self._fm._failure_level
280 """ Returns the state of the Experiment Controller
287 """ Returns the experiment id assigned by the user
294 """ Returns the experiment instance (run) identifier (automatically
302 """ Returns the number of processing nthreads used
305 return self._nthreads
309 """ Root local directory for experiment files
312 return self._local_dir
316 """ Local directory to store results and other files related to the
324 """ Local directory to store results and other files related to the
332 """ If True, persists the ExperimentController to XML format upon
333 experiment completion
340 """ Return NetGraph instance if experiment description was automatically
344 return self._netgraph
348 """ Returns True if the experiment has failed and should be interrupted,
352 return self._fm.abort
354 def inform_failure(self, guid):
355 """ Reports a failure in a RM to the EC for evaluation
357 :param guid: Resource id
362 return self._fm.eval_failure(guid)
364 def wait_finished(self, guids):
365 """ Blocking method that waits until all RMs in the 'guids' list
366 have reached a state >= STOPPED (i.e. STOPPED, FAILED or
367 RELEASED ), or until a failure in the experiment occurs
370 :param guids: List of guids
378 return self.wait(guids, state = ResourceState.STOPPED,
381 def wait_started(self, guids):
382 """ Blocking method that waits until all RMs in the 'guids' list
383 have reached a state >= STARTED, or until a failure in the
384 experiment occurs (i.e. abort == True)
386 :param guids: List of guids
394 return self.wait(guids, state = ResourceState.STARTED,
397 def wait_released(self, guids):
398 """ Blocking method that waits until all RMs in the 'guids' list
399 have reached a state == RELEASED, or until the EC fails
401 :param guids: List of guids
407 return self._state == ECState.FAILED
409 return self.wait(guids, state = ResourceState.RELEASED,
412 def wait_deployed(self, guids):
413 """ Blocking method that waits until all RMs in the 'guids' list
414 have reached a state >= READY, or until a failure in the
415 experiment occurs (i.e. abort == True)
417 :param guids: List of guids
425 return self.wait(guids, state = ResourceState.READY,
428 def wait(self, guids, state, quit):
429 """ Blocking method that waits until all RMs in the 'guids' list
430 have reached a state >= 'state', or until the 'quit' callback
433 :param guids: List of guids
437 if isinstance(guids, int):
440 # Make a copy to avoid modifying the original guids list
444 # If there are no more guids to wait for
445 # or the quit function returns True, exit the loop
446 if len(guids) == 0 or quit():
449 # If a guid reached one of the target states, remove it from list
451 rm = self.get_resource(guid)
455 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
456 rm.get_rtype(), guid, rstate, state))
459 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
460 guid, rstate, state))
466 def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
467 plotter = ECPlotter()
468 fpath = plotter.plot(self, dirpath = dirpath, format= format,
472 def serialize(self, format = SFormats.XML):
473 serializer = ECSerializer()
474 sec = serializer.load(self, format = format)
477 def save(self, dirpath = None, format = SFormats.XML):
479 dirpath = self.run_dir
486 serializer = ECSerializer()
487 path = serializer.save(self, dirpath, format = format)
490 def get_task(self, tid):
491 """ Returns a task by its id
493 :param tid: Id of the task
499 return self._tasks.get(tid)
501 def get_resource(self, guid):
502 """ Returns a registered ResourceManager by its guid
504 :param guid: Id of the resource
507 :rtype: ResourceManager
510 rm = self._resources.get(guid)
513 def get_resources_by_type(self, rtype):
514 """ Returns the ResourceManager objects of type rtype
516 :param rtype: Resource type
519 :rtype: list of ResourceManagers
523 for guid, rm in self._resources.iteritems():
524 if rm.get_rtype() == rtype:
528 def remove_resource(self, guid):
529 del self._resources[guid]
533 """ Returns the guids of all ResourceManagers
535 :return: Set of all RM guids
539 keys = self._resources.keys()
543 def filter_resources(self, rtype):
544 """ Returns the guids of all ResourceManagers of type rtype
546 :param rtype: Resource type
549 :rtype: list of guids
553 for guid, rm in self._resources.iteritems():
554 if rm.get_rtype() == rtype:
558 def register_resource(self, rtype, guid = None):
559 """ Registers a new ResourceManager of type 'rtype' in the experiment
561 This method will assign a new 'guid' for the RM, if no guid
564 :param rtype: Type of the RM
567 :return: Guid of the RM
571 # Get next available guid
572 guid = self._guid_generator.next(guid)
575 rm = ResourceFactory.create(rtype, self, guid)
578 self._resources[guid] = rm
582 def get_attributes(self, guid):
583 """ Returns all the attributes of the RM with guid 'guid'
585 :param guid: Guid of the RM
588 :return: List of attributes
592 rm = self.get_resource(guid)
593 return rm.get_attributes()
595 def get_attribute(self, guid, name):
596 """ Returns the attribute 'name' of the RM with guid 'guid'
598 :param guid: Guid of the RM
601 :param name: Name of the attribute
604 :return: The attribute with name 'name'
608 rm = self.get_resource(guid)
609 return rm.get_attribute(name)
611 def register_connection(self, guid1, guid2):
612 """ Registers a connection between a RM with guid 'guid1'
613 and another RM with guid 'guid2'.
615 The order of the in which the two guids are provided is not
616 important, since the connection relationship is symmetric.
618 :param guid1: First guid to connect
619 :type guid1: ResourceManager
621 :param guid2: Second guid to connect
622 :type guid: ResourceManager
625 rm1 = self.get_resource(guid1)
626 rm2 = self.get_resource(guid2)
628 rm1.register_connection(guid2)
629 rm2.register_connection(guid1)
631 def register_condition(self, guids1, action, guids2, state,
633 """ Registers an action START, STOP or DEPLOY for all RM on list
634 guids1 to occur at time 'time' after all elements in list guids2
635 have reached state 'state'.
637 :param guids1: List of guids of RMs subjected to action
640 :param action: Action to perform (either START, STOP or DEPLOY)
641 :type action: ResourceAction
643 :param guids2: List of guids of RMs to we waited for
646 :param state: State to wait for on RMs of list guids2 (STARTED,
648 :type state: ResourceState
650 :param time: Time to wait after guids2 has reached status
654 if isinstance(guids1, int):
656 if isinstance(guids2, int):
660 rm = self.get_resource(guid1)
661 rm.register_condition(action, guids2, state, time)
663 def enable_trace(self, guid, name):
664 """ Enables a trace to be collected during the experiment run
666 :param name: Name of the trace
670 rm = self.get_resource(guid)
671 rm.enable_trace(name)
673 def trace_enabled(self, guid, name):
674 """ Returns True if the trace of name 'name' is enabled
676 :param name: Name of the trace
680 rm = self.get_resource(guid)
681 return rm.trace_enabled(name)
683 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
684 """ Returns information on a collected trace, the trace stream or
685 blocks (chunks) of the trace stream
687 :param name: Name of the trace
690 :param attr: Can be one of:
691 - TraceAttr.ALL (complete trace content),
692 - TraceAttr.STREAM (block in bytes to read starting
694 - TraceAttr.PATH (full path to the trace file),
695 - TraceAttr.SIZE (size of trace file).
698 :param block: Number of bytes to retrieve from trace, when attr is
702 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
708 rm = self.get_resource(guid)
709 return rm.trace(name, attr, block, offset)
711 def get_traces(self, guid):
712 """ Returns the list of the trace names of the RM with guid 'guid'
714 :param guid: Guid of the RM
717 :return: List of trace names
721 rm = self.get_resource(guid)
722 return rm.get_traces()
725 def discover(self, guid):
726 """ Discovers an available resource matching the criteria defined
727 by the RM with guid 'guid', and associates that resource to the RM
729 Not all RM types require (or are capable of) performing resource
730 discovery. For the RM types which are not capable of doing so,
731 invoking this method does not have any consequences.
733 :param guid: Guid of the RM
737 rm = self.get_resource(guid)
740 def provision(self, guid):
741 """ Provisions the resource associated to the RM with guid 'guid'.
743 Provisioning means making a resource 'accessible' to the user.
744 Not all RM types require (or are capable of) performing resource
745 provisioning. For the RM types which are not capable of doing so,
746 invoking this method does not have any consequences.
748 :param guid: Guid of the RM
752 rm = self.get_resource(guid)
753 return rm.provision()
755 def get(self, guid, name):
756 """ Returns the value of the attribute with name 'name' on the
759 :param guid: Guid of the RM
762 :param name: Name of the attribute
765 :return: The value of the attribute with name 'name'
768 rm = self.get_resource(guid)
771 def set(self, guid, name, value):
772 """ Modifies the value of the attribute with name 'name' on the
775 :param guid: Guid of the RM
778 :param name: Name of the attribute
781 :param value: Value of the attribute
784 rm = self.get_resource(guid)
787 def get_global(self, rtype, name):
788 """ Returns the value of the global attribute with name 'name' on the
789 RMs of rtype 'rtype'.
791 :param guid: Guid of the RM
794 :param name: Name of the attribute
797 :return: The value of the attribute with name 'name'
800 rclass = ResourceFactory.get_resource_type(rtype)
801 return rclass.get_global(name)
803 def set_global(self, rtype, name, value):
804 """ Modifies the value of the global attribute with name 'name' on the
805 RMs of with rtype 'rtype'.
807 :param guid: Guid of the RM
810 :param name: Name of the attribute
813 :param value: Value of the attribute
816 rclass = ResourceFactory.get_resource_type(rtype)
817 return rclass.set_global(name, value)
819 def state(self, guid, hr = False):
820 """ Returns the state of a resource
822 :param guid: Resource guid
825 :param hr: Human readable. Forces return of a
826 status string instead of a number
830 rm = self.get_resource(guid)
834 return ResourceState2str.get(state)
838 def stop(self, guid):
839 """ Stops the RM with guid 'guid'
841 Stopping a RM means that the resource it controls will
842 no longer take part of the experiment.
844 :param guid: Guid of the RM
848 rm = self.get_resource(guid)
851 def start(self, guid):
852 """ Starts the RM with guid 'guid'
854 Starting a RM means that the resource it controls will
855 begin taking part of the experiment.
857 :param guid: Guid of the RM
861 rm = self.get_resource(guid)
864 def get_start_time(self, guid):
865 """ Returns the start time of the RM as a timestamp """
866 rm = self.get_resource(guid)
869 def get_stop_time(self, guid):
870 """ Returns the stop time of the RM as a timestamp """
871 rm = self.get_resource(guid)
874 def get_discover_time(self, guid):
875 """ Returns the discover time of the RM as a timestamp """
876 rm = self.get_resource(guid)
877 return rm.discover_time
879 def get_provision_time(self, guid):
880 """ Returns the provision time of the RM as a timestamp """
881 rm = self.get_resource(guid)
882 return rm.provision_time
884 def get_ready_time(self, guid):
885 """ Returns the deployment time of the RM as a timestamp """
886 rm = self.get_resource(guid)
889 def get_release_time(self, guid):
890 """ Returns the release time of the RM as a timestamp """
891 rm = self.get_resource(guid)
892 return rm.release_time
894 def get_failed_time(self, guid):
895 """ Returns the time failure occured for the RM as a timestamp """
896 rm = self.get_resource(guid)
897 return rm.failed_time
899 def set_with_conditions(self, name, value, guids1, guids2, state,
901 """ Modifies the value of attribute with name 'name' on all RMs
902 on the guids1 list when time 'time' has elapsed since all
903 elements in guids2 list have reached state 'state'.
905 :param name: Name of attribute to set in RM
908 :param value: Value of attribute to set in RM
911 :param guids1: List of guids of RMs subjected to action
914 :param action: Action to register (either START or STOP)
915 :type action: ResourceAction
917 :param guids2: List of guids of RMs to we waited for
920 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
921 :type state: ResourceState
923 :param time: Time to wait after guids2 has reached status
927 if isinstance(guids1, int):
929 if isinstance(guids2, int):
933 rm = self.get_resource(guid)
934 rm.set_with_conditions(name, value, guids2, state, time)
936 def deploy(self, guids = None, wait_all_ready = True, group = None):
937 """ Deploys all ResourceManagers in the guids list.
939 If the argument 'guids' is not given, all RMs with state NEW
942 :param guids: List of guids of RMs to deploy
945 :param wait_all_ready: Wait until all RMs are ready in
946 order to start the RMs
949 :param group: Id of deployment group in which to deploy RMs
953 self.logger.debug(" ------- DEPLOY START ------ ")
956 # If no guids list was passed, all 'NEW' RMs will be deployed
958 for guid, rm in self._resources.iteritems():
959 if rm.state == ResourceState.NEW:
962 if isinstance(guids, int):
965 # Create deployment group
966 # New guids can be added to a same deployment group later on
970 group = self._group_id_generator.next()
972 if group not in self._groups:
973 self._groups[group] = []
975 self._groups[group].extend(guids)
977 def wait_all_and_start(group):
978 # Function that checks if all resources are READY
979 # before scheduling a start_with_conditions for each RM
982 # Get all guids in group
983 guids = self._groups[group]
986 if self.state(guid) < ResourceState.READY:
991 callback = functools.partial(wait_all_and_start, group)
992 self.schedule("1s", callback)
994 # If all resources are ready, we schedule the start
996 rm = self.get_resource(guid)
997 self.schedule("0s", rm.start_with_conditions)
999 if rm.conditions.get(ResourceAction.STOP):
1000 # Only if the RM has STOP conditions we
1001 # schedule a stop. Otherwise the RM will stop immediately
1002 self.schedule("0s", rm.stop_with_conditions)
1004 if wait_all_ready and new_group:
1005 # Schedule a function to check that all resources are
1006 # READY, and only then schedule the start.
1007 # This aims at reducing the number of tasks looping in the
1009 # Instead of having many start tasks, we will have only one for
1011 callback = functools.partial(wait_all_and_start, group)
1012 self.schedule("0s", callback)
1015 rm = self.get_resource(guid)
1016 rm.deployment_group = group
1017 self.schedule("0s", rm.deploy_with_conditions)
1019 if not wait_all_ready:
1020 self.schedule("0s", rm.start_with_conditions)
1022 if rm.conditions.get(ResourceAction.STOP):
1023 # Only if the RM has STOP conditions we
1024 # schedule a stop. Otherwise the RM will stop immediately
1025 self.schedule("0s", rm.stop_with_conditions)
1027 def release(self, guids = None):
1028 """ Releases all ResourceManagers in the guids list.
1030 If the argument 'guids' is not given, all RMs registered
1031 in the experiment are released.
1033 :param guids: List of RM guids
1037 if self._state == ECState.RELEASED:
1040 if isinstance(guids, int):
1044 guids = self.resources
1047 rm = self.get_resource(guid)
1048 self.schedule("0s", rm.release)
1050 self.wait_released(guids)
1056 if self.get(guid, "hardRelease"):
1057 self.remove_resource(guid)\
1059 # Mark the EC state as RELEASED
1060 self._state = ECState.RELEASED
1063 """ Releases all resources and stops the ExperimentController
1066 # If there was a major failure we can't exit gracefully
1067 if self._state == ECState.FAILED:
1068 raise RuntimeError("EC failure. Can not exit gracefully")
1070 # Remove all pending tasks from the scheduler queue
1071 for tid in list(self._scheduler.pending):
1072 self._scheduler.remove(tid)
1074 # Remove pending tasks from the workers queue
1075 self._runner.empty()
1079 # Mark the EC state as TERMINATED
1080 self._state = ECState.TERMINATED
1082 # Stop processing thread
1085 # Notify condition to wake up the processing thread
1088 if self._thread.is_alive():
1091 def schedule(self, date, callback, track = False):
1092 """ Schedules a callback to be executed at time 'date'.
1094 :param date: string containing execution time for the task.
1095 It can be expressed as an absolute time, using
1096 timestamp format, or as a relative time matching
1097 ^\d+.\d+(h|m|s|ms|us)$
1099 :param callback: code to be executed for the task. Must be a
1100 Python function, and receives args and kwargs
1103 :param track: if set to True, the task will be retrievable with
1104 the get_task() method
1106 :return : The Id of the task
1110 timestamp = stabsformat(date)
1111 task = Task(timestamp, callback)
1112 task = self._scheduler.schedule(task)
1115 self._tasks[task.id] = task
1117 # Notify condition to wake up the processing thread
1123 """ Process scheduled tasks.
1127 Tasks are scheduled by invoking the schedule method with a target
1128 callback and an execution time.
1129 The schedule method creates a new Task object with that callback
1130 and execution time, and pushes it into the '_scheduler' queue.
1131 The execution time and the order of arrival of tasks are used
1132 to order the tasks in the queue.
1134 The _process method is executed in an independent thread held by
1135 the ExperimentController for as long as the experiment is running.
1136 This method takes tasks from the '_scheduler' queue in a loop
1137 and processes them in parallel using multithreading.
1138 The environmental variable NEPI_NTHREADS can be used to control
1139 the number of threads used to process tasks. The default value is
1142 To execute tasks in parallel, a ParallelRunner (PR) object is used.
1143 This object keeps a pool of threads (workers), and a queue of tasks
1144 scheduled for 'immediate' execution.
1146 On each iteration, the '_process' loop will take the next task that
1147 is scheduled for 'future' execution from the '_scheduler' queue,
1148 and if the execution time of that task is >= to the current time,
1149 it will push that task into the PR for 'immediate execution'.
1150 As soon as a worker is free, the PR will assign the next task to
1153 Upon receiving a task to execute, each PR worker (thread) will
1154 invoke the _execute method of the EC, passing the task as
1156 The _execute method will then invoke task.callback inside a
1157 try/except block. If an exception is raised by the tasks.callback,
1158 it will be trapped by the try block, logged to standard error
1159 (usually the console), and the task will be marked as failed.
1163 self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
1164 self._runner = ParallelRun(maxthreads = self.nthreads)
1165 self._runner.start()
1167 while not self._stop:
1169 self._cond.acquire()
1171 task = self._scheduler.next()
1174 # No task to execute. Wait for a new task to be scheduled.
1177 # The task timestamp is in the future. Wait for timeout
1178 # or until another task is scheduled.
1180 if now < task.timestamp:
1181 # Calculate timeout in seconds
1182 timeout = tdiffsec(task.timestamp, now)
1184 # Re-schedule task with the same timestamp
1185 self._scheduler.schedule(task)
1189 # Wait timeout or until a new task awakes the condition
1190 self._cond.wait(timeout)
1192 self._cond.release()
1195 # Process tasks in parallel
1196 self._runner.put(self._execute, task)
1199 err = traceback.format_exc()
1200 self.logger.error("Error while processing tasks in the EC: %s" % err)
1202 # Set the EC to FAILED state
1203 self._state = ECState.FAILED
1205 # Set the FailureManager failure level to EC failure
1206 self._fm.set_ec_failure()
1208 self.logger.debug("Exiting the task processing loop ... ")
1211 self._runner.destroy()
1213 def _execute(self, task):
1214 """ Executes a single task.
1216 :param task: Object containing the callback to execute
1222 task.result = task.callback()
1223 task.status = TaskStatus.DONE
1226 err = traceback.format_exc()
1228 task.status = TaskStatus.ERROR
1230 self.logger.error("Error occurred while executing task: %s" % err)
1233 """ Awakes the processing thread if it is blocked waiting
1234 for new tasks to arrive
1237 self._cond.acquire()
1239 self._cond.release()
1241 def _build_from_netgraph(self, add_node_callback, add_edge_callback,
1243 """ Automates experiment description using a NetGraph instance.
1245 self._netgraph = NetGraph(**kwargs)
1247 if add_node_callback:
1248 ### Add resources to the EC
1249 for nid in self.netgraph.nodes():
1250 add_node_callback(self, nid)
1252 if add_edge_callback:
1253 #### Add connections between resources
1254 for nid1, nid2 in self.netgraph.edges():
1255 add_edge_callback(self, nid1, nid2)