2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24 ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27 from nepi.util.serializer import ECSerializer, SFormats
28 from nepi.util.plotter import ECPlotter, PFormats
29 from nepi.util.netgraph import NetGraph, TopologyType
31 # TODO: use multiprocessing instead of threading
32 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
43 class FailureLevel(object):
44 """ Possible failure states for the experiment """
49 class FailureManager(object):
50 """ The FailureManager is responsible for handling errors
51 and deciding whether an experiment should be aborted or not
56 self._failure_level = FailureLevel.OK
60 self._ec = weakref.ref(ec)
64 """ Returns the ExperimentController associated to this FailureManager
72 def eval_failure(self, guid):
73 """ Implements failure policy and sets the abort state of the
74 experiment based on the failure state and criticality of
77 :param guid: Guid of the RM upon which the failure of the experiment
82 if self._failure_level == FailureLevel.OK:
83 rm = self.ec.get_resource(guid)
85 critical = rm.get("critical")
87 if state == ResourceState.FAILED and critical:
88 self._failure_level = FailureLevel.RM_FAILURE
90 self.ec.logger.debug("RM critical failure occurred on guid %d." \
91 " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
93 def set_ec_failure(self):
94 self._failure_level = FailureLevel.EC_FAILURE
96 class ECState(object):
97 """ Possible states of the ExperimentController
105 class ExperimentController(object):
109 An experiment, or scenario, is defined by a concrete set of resources,
110 and the behavior, configuration and interconnection of those resources.
111 The Experiment Description (ED) is a detailed representation of a
112 single experiment. It contains all the necessary information to
113 allow repeating the experiment. NEPI allows to describe
114 experiments by registering components (resources), configuring them
115 and interconnecting them.
117 A same experiment (scenario) can be executed many times, generating
118 different results. We call an experiment execution (instance) a 'run'.
120 The ExperimentController (EC), is the entity responsible of
121 managing an experiment run. The same scenario can be
122 recreated (and re-run) by instantiating an EC and recreating
123 the same experiment description.
125 An experiment is represented as a graph of interconnected
126 resources. A resource is a generic concept in the sense that any
127 component taking part of an experiment, whether physical of
128 virtual, is considered a resource. A resources could be a host,
129 a virtual machine, an application, a simulator, a IP address.
131 A ResourceManager (RM), is the entity responsible for managing a
132 single resource. ResourceManagers are specific to a resource
133 type (i.e. An RM to control a Linux application will not be
134 the same as the RM used to control a ns-3 simulation).
135 To support a new type of resource, a new RM must be implemented.
136 NEPI already provides a variety of RMs to control basic resources,
137 and new can be extended from the existing ones.
139 Through the EC interface the user can create ResourceManagers (RMs),
140 configure them and interconnect them, to describe an experiment.
141 Describing an experiment through the EC does not run the experiment.
142 Only when the 'deploy()' method is invoked on the EC, the EC will take
143 actions to transform the 'described' experiment into a 'running' experiment.
145 While the experiment is running, it is possible to continue to
146 create/configure/connect RMs, and to deploy them to involve new
147 resources in the experiment (this is known as 'interactive' deployment).
149 An experiments in NEPI is identified by a string id,
150 which is either given by the user, or automatically generated by NEPI.
151 The purpose of this identifier is to separate files and results that
152 belong to different experiment scenarios.
153 However, since a same 'experiment' can be run many times, the experiment
154 id is not enough to identify an experiment instance (run).
155 For this reason, the ExperimentController has two identifier, the
156 exp_id, which can be re-used in different ExperimentController,
157 and the run_id, which is unique to one ExperimentController instance, and
158 is automatically generated by NEPI.
163 def load(cls, filepath, format = SFormats.XML):
164 serializer = ECSerializer()
165 ec = serializer.load(filepath)
168 def __init__(self, exp_id = None, local_dir = None, persist = False,
169 fm = None, add_node_callback = None, add_edge_callback = None,
171 """ ExperimentController entity to model an execute a network
174 :param exp_id: Human readable name to identify the experiment
177 :param local_dir: Path to local directory where to store experiment
181 :param persist: Save an XML description of the experiment after
182 completion at local_dir
185 :param fm: FailureManager object. If None is given, the default
186 FailureManager class will be used
187 :type fm: FailureManager
189 :param add_node_callback: Callback to invoke for node instantiation
190 when automatic topology creation mode is used
191 :type add_node_callback: function
193 :param add_edge_callback: Callback to invoke for edge instantiation
194 when automatic topology creation mode is used
195 :type add_edge_callback: function
198 super(ExperimentController, self).__init__()
201 self._logger = logging.getLogger("ExperimentController")
203 # Run identifier. It identifies a concrete execution instance (run)
205 # Since a same experiment (same configuration) can be executed many
206 # times, this run_id permits to separate result files generated on
207 # different experiment executions
208 self._run_id = tsformat()
210 # Experiment identifier. Usually assigned by the user
211 # Identifies the experiment scenario (i.e. configuration,
212 # resources used, etc)
213 self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
215 # Local path where to store experiment related files (results, etc)
217 local_dir = tempfile.gettempdir() # /tmp
219 self._local_dir = local_dir
220 self._exp_dir = os.path.join(local_dir, self.exp_id)
221 self._run_dir = os.path.join(self.exp_dir, self.run_id)
223 # If True persist the experiment controller in XML format, after completion
224 self._persist = persist
226 # generator of globally unique ids
227 self._guid_generator = guid.GuidGenerator()
230 self._resources = dict()
232 # Scheduler. It a queue that holds tasks scheduled for
233 # execution, and yields the next task to be executed
234 # ordered by execution and arrival time
235 self._scheduler = HeapScheduler()
240 # RM groups (for deployment)
241 self._groups = dict()
243 # generator of globally unique id for groups
244 self._group_id_generator = guid.GuidGenerator()
246 # Flag to stop processing thread
249 # Entity in charge of managing system failures
251 self._fm = FailureManager()
252 self._fm.set_ec(self)
255 self._state = ECState.RUNNING
257 # Automatically construct experiment description
258 self._netgraph = None
259 if add_node_callback or add_edge_callback or kwargs.get("topology"):
260 self._build_from_netgraph(add_node_callback, add_edge_callback,
263 # The runner is a pool of threads used to parallelize
268 # Event processing thread
269 self._cond = threading.Condition()
270 self._thread = threading.Thread(target = self._process)
271 self._thread.setDaemon(True)
276 """ Returns the logger instance of the Experiment Controller
283 """ Returns the failure manager
290 def failure_level(self):
291 """ Returns the level of FAILURE of th experiment
295 return self._fm._failure_level
299 """ Returns the state of the Experiment Controller
306 """ Returns the experiment id assigned by the user
313 """ Returns the experiment instance (run) identifier (automatically
321 """ Returns the number of processing nthreads used
324 return self._nthreads
328 """ Root local directory for experiment files
331 return self._local_dir
335 """ Local directory to store results and other files related to the
343 """ Local directory to store results and other files related to the
351 """ If True, persists the ExperimentController to XML format upon
352 experiment completion
359 """ Return NetGraph instance if experiment description was automatically
363 return self._netgraph
367 """ Returns True if the experiment has failed and should be interrupted,
371 return self._fm.abort
373 def inform_failure(self, guid):
374 """ Reports a failure in a RM to the EC for evaluation
376 :param guid: Resource id
381 return self._fm.eval_failure(guid)
383 def wait_finished(self, guids):
384 """ Blocking method that waits until all RMs in the 'guids' list
385 have reached a state >= STOPPED (i.e. STOPPED, FAILED or
386 RELEASED ), or until a failure in the experiment occurs
389 :param guids: List of guids
397 return self.wait(guids, state = ResourceState.STOPPED,
400 def wait_started(self, guids):
401 """ Blocking method that waits until all RMs in the 'guids' list
402 have reached a state >= STARTED, or until a failure in the
403 experiment occurs (i.e. abort == True)
405 :param guids: List of guids
413 return self.wait(guids, state = ResourceState.STARTED,
416 def wait_released(self, guids):
417 """ Blocking method that waits until all RMs in the 'guids' list
418 have reached a state == RELEASED, or until the EC fails
420 :param guids: List of guids
426 return self._state == ECState.FAILED
428 return self.wait(guids, state = ResourceState.RELEASED,
431 def wait_deployed(self, guids):
432 """ Blocking method that waits until all RMs in the 'guids' list
433 have reached a state >= READY, or until a failure in the
434 experiment occurs (i.e. abort == True)
436 :param guids: List of guids
444 return self.wait(guids, state = ResourceState.READY,
447 def wait(self, guids, state, quit):
448 """ Blocking method that waits until all RMs in the 'guids' list
449 have reached a state >= 'state', or until the 'quit' callback
452 :param guids: List of guids
456 if isinstance(guids, int):
459 # Make a copy to avoid modifying the original guids list
463 # If there are no more guids to wait for
464 # or the quit function returns True, exit the loop
465 if len(guids) == 0 or quit():
468 # If a guid reached one of the target states, remove it from list
470 rm = self.get_resource(guid)
474 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
475 rm.get_rtype(), guid, rstate, state))
478 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
479 guid, rstate, state))
485 def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
486 plotter = ECPlotter()
487 fpath = plotter.plot(self, dirpath = dirpath, format= format,
491 def serialize(self, format = SFormats.XML):
492 serializer = ECSerializer()
493 sec = serializer.load(self, format = format)
496 def save(self, dirpath = None, format = SFormats.XML):
498 dirpath = self.run_dir
505 serializer = ECSerializer()
506 path = serializer.save(self, dirpath, format = format)
509 def get_task(self, tid):
510 """ Returns a task by its id
512 :param tid: Id of the task
518 return self._tasks.get(tid)
520 def get_resource(self, guid):
521 """ Returns a registered ResourceManager by its guid
523 :param guid: Id of the resource
526 :rtype: ResourceManager
529 rm = self._resources.get(guid)
532 def get_resources_by_type(self, rtype):
533 """ Returns the ResourceManager objects of type rtype
535 :param rtype: Resource type
538 :rtype: list of ResourceManagers
542 for guid, rm in self._resources.iteritems():
543 if rm.get_rtype() == rtype:
547 def remove_resource(self, guid):
548 del self._resources[guid]
552 """ Returns the guids of all ResourceManagers
554 :return: Set of all RM guids
558 keys = self._resources.keys()
562 def filter_resources(self, rtype):
563 """ Returns the guids of all ResourceManagers of type rtype
565 :param rtype: Resource type
568 :rtype: list of guids
572 for guid, rm in self._resources.iteritems():
573 if rm.get_rtype() == rtype:
577 def register_resource(self, rtype, guid = None):
578 """ Registers a new ResourceManager of type 'rtype' in the experiment
580 This method will assign a new 'guid' for the RM, if no guid
583 :param rtype: Type of the RM
586 :return: Guid of the RM
590 # Get next available guid
591 guid = self._guid_generator.next(guid)
594 rm = ResourceFactory.create(rtype, self, guid)
597 self._resources[guid] = rm
601 def get_attributes(self, guid):
602 """ Returns all the attributes of the RM with guid 'guid'
604 :param guid: Guid of the RM
607 :return: List of attributes
611 rm = self.get_resource(guid)
612 return rm.get_attributes()
614 def get_attribute(self, guid, name):
615 """ Returns the attribute 'name' of the RM with guid 'guid'
617 :param guid: Guid of the RM
620 :param name: Name of the attribute
623 :return: The attribute with name 'name'
627 rm = self.get_resource(guid)
628 return rm.get_attribute(name)
630 def register_connection(self, guid1, guid2):
631 """ Registers a connection between a RM with guid 'guid1'
632 and another RM with guid 'guid2'.
634 The order of the in which the two guids are provided is not
635 important, since the connection relationship is symmetric.
637 :param guid1: First guid to connect
638 :type guid1: ResourceManager
640 :param guid2: Second guid to connect
641 :type guid: ResourceManager
644 rm1 = self.get_resource(guid1)
645 rm2 = self.get_resource(guid2)
647 rm1.register_connection(guid2)
648 rm2.register_connection(guid1)
650 def register_condition(self, guids1, action, guids2, state,
652 """ Registers an action START, STOP or DEPLOY for all RM on list
653 guids1 to occur at time 'time' after all elements in list guids2
654 have reached state 'state'.
656 :param guids1: List of guids of RMs subjected to action
659 :param action: Action to perform (either START, STOP or DEPLOY)
660 :type action: ResourceAction
662 :param guids2: List of guids of RMs to we waited for
665 :param state: State to wait for on RMs of list guids2 (STARTED,
667 :type state: ResourceState
669 :param time: Time to wait after guids2 has reached status
673 if isinstance(guids1, int):
675 if isinstance(guids2, int):
679 rm = self.get_resource(guid1)
680 rm.register_condition(action, guids2, state, time)
682 def enable_trace(self, guid, name):
683 """ Enables a trace to be collected during the experiment run
685 :param name: Name of the trace
689 rm = self.get_resource(guid)
690 rm.enable_trace(name)
692 def trace_enabled(self, guid, name):
693 """ Returns True if the trace of name 'name' is enabled
695 :param name: Name of the trace
699 rm = self.get_resource(guid)
700 return rm.trace_enabled(name)
702 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
703 """ Returns information on a collected trace, the trace stream or
704 blocks (chunks) of the trace stream
706 :param name: Name of the trace
709 :param attr: Can be one of:
710 - TraceAttr.ALL (complete trace content),
711 - TraceAttr.STREAM (block in bytes to read starting
713 - TraceAttr.PATH (full path to the trace file),
714 - TraceAttr.SIZE (size of trace file).
717 :param block: Number of bytes to retrieve from trace, when attr is
721 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
727 rm = self.get_resource(guid)
728 return rm.trace(name, attr, block, offset)
730 def get_traces(self, guid):
731 """ Returns the list of the trace names of the RM with guid 'guid'
733 :param guid: Guid of the RM
736 :return: List of trace names
740 rm = self.get_resource(guid)
741 return rm.get_traces()
744 def discover(self, guid):
745 """ Discovers an available resource matching the criteria defined
746 by the RM with guid 'guid', and associates that resource to the RM
748 Not all RM types require (or are capable of) performing resource
749 discovery. For the RM types which are not capable of doing so,
750 invoking this method does not have any consequences.
752 :param guid: Guid of the RM
756 rm = self.get_resource(guid)
759 def provision(self, guid):
760 """ Provisions the resource associated to the RM with guid 'guid'.
762 Provisioning means making a resource 'accessible' to the user.
763 Not all RM types require (or are capable of) performing resource
764 provisioning. For the RM types which are not capable of doing so,
765 invoking this method does not have any consequences.
767 :param guid: Guid of the RM
771 rm = self.get_resource(guid)
772 return rm.provision()
774 def get(self, guid, name):
775 """ Returns the value of the attribute with name 'name' on the
778 :param guid: Guid of the RM
781 :param name: Name of the attribute
784 :return: The value of the attribute with name 'name'
787 rm = self.get_resource(guid)
790 def set(self, guid, name, value):
791 """ Modifies the value of the attribute with name 'name' on the
794 :param guid: Guid of the RM
797 :param name: Name of the attribute
800 :param value: Value of the attribute
803 rm = self.get_resource(guid)
806 def get_global(self, rtype, name):
807 """ Returns the value of the global attribute with name 'name' on the
808 RMs of rtype 'rtype'.
810 :param guid: Guid of the RM
813 :param name: Name of the attribute
816 :return: The value of the attribute with name 'name'
819 rclass = ResourceFactory.get_resource_type(rtype)
820 return rclass.get_global(name)
822 def set_global(self, rtype, name, value):
823 """ Modifies the value of the global attribute with name 'name' on the
824 RMs of with rtype 'rtype'.
826 :param guid: Guid of the RM
829 :param name: Name of the attribute
832 :param value: Value of the attribute
835 rclass = ResourceFactory.get_resource_type(rtype)
836 return rclass.set_global(name, value)
838 def state(self, guid, hr = False):
839 """ Returns the state of a resource
841 :param guid: Resource guid
844 :param hr: Human readable. Forces return of a
845 status string instead of a number
849 rm = self.get_resource(guid)
853 return ResourceState2str.get(state)
857 def stop(self, guid):
858 """ Stops the RM with guid 'guid'
860 Stopping a RM means that the resource it controls will
861 no longer take part of the experiment.
863 :param guid: Guid of the RM
867 rm = self.get_resource(guid)
870 def start(self, guid):
871 """ Starts the RM with guid 'guid'
873 Starting a RM means that the resource it controls will
874 begin taking part of the experiment.
876 :param guid: Guid of the RM
880 rm = self.get_resource(guid)
883 def get_start_time(self, guid):
884 """ Returns the start time of the RM as a timestamp """
885 rm = self.get_resource(guid)
888 def get_stop_time(self, guid):
889 """ Returns the stop time of the RM as a timestamp """
890 rm = self.get_resource(guid)
893 def get_discover_time(self, guid):
894 """ Returns the discover time of the RM as a timestamp """
895 rm = self.get_resource(guid)
896 return rm.discover_time
898 def get_provision_time(self, guid):
899 """ Returns the provision time of the RM as a timestamp """
900 rm = self.get_resource(guid)
901 return rm.provision_time
903 def get_ready_time(self, guid):
904 """ Returns the deployment time of the RM as a timestamp """
905 rm = self.get_resource(guid)
908 def get_release_time(self, guid):
909 """ Returns the release time of the RM as a timestamp """
910 rm = self.get_resource(guid)
911 return rm.release_time
913 def get_failed_time(self, guid):
914 """ Returns the time failure occured for the RM as a timestamp """
915 rm = self.get_resource(guid)
916 return rm.failed_time
918 def set_with_conditions(self, name, value, guids1, guids2, state,
920 """ Modifies the value of attribute with name 'name' on all RMs
921 on the guids1 list when time 'time' has elapsed since all
922 elements in guids2 list have reached state 'state'.
924 :param name: Name of attribute to set in RM
927 :param value: Value of attribute to set in RM
930 :param guids1: List of guids of RMs subjected to action
933 :param action: Action to register (either START or STOP)
934 :type action: ResourceAction
936 :param guids2: List of guids of RMs to we waited for
939 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
940 :type state: ResourceState
942 :param time: Time to wait after guids2 has reached status
946 if isinstance(guids1, int):
948 if isinstance(guids2, int):
952 rm = self.get_resource(guid)
953 rm.set_with_conditions(name, value, guids2, state, time)
955 def deploy(self, guids = None, wait_all_ready = True, group = None):
956 """ Deploys all ResourceManagers in the guids list.
958 If the argument 'guids' is not given, all RMs with state NEW
961 :param guids: List of guids of RMs to deploy
964 :param wait_all_ready: Wait until all RMs are ready in
965 order to start the RMs
968 :param group: Id of deployment group in which to deploy RMs
972 self.logger.debug(" ------- DEPLOY START ------ ")
975 # If no guids list was passed, all 'NEW' RMs will be deployed
977 for guid, rm in self._resources.iteritems():
978 if rm.state == ResourceState.NEW:
981 if isinstance(guids, int):
984 # Create deployment group
985 # New guids can be added to a same deployment group later on
989 group = self._group_id_generator.next()
991 if group not in self._groups:
992 self._groups[group] = []
994 self._groups[group].extend(guids)
996 def wait_all_and_start(group):
997 # Function that checks if all resources are READY
998 # before scheduling a start_with_conditions for each RM
1001 # Get all guids in group
1002 guids = self._groups[group]
1005 if self.state(guid) < ResourceState.READY:
1010 callback = functools.partial(wait_all_and_start, group)
1011 self.schedule("1s", callback)
1013 # If all resources are ready, we schedule the start
1015 rm = self.get_resource(guid)
1016 self.schedule("0s", rm.start_with_conditions)
1018 if rm.conditions.get(ResourceAction.STOP):
1019 # Only if the RM has STOP conditions we
1020 # schedule a stop. Otherwise the RM will stop immediately
1021 self.schedule("0s", rm.stop_with_conditions)
1023 if wait_all_ready and new_group:
1024 # Schedule a function to check that all resources are
1025 # READY, and only then schedule the start.
1026 # This aims at reducing the number of tasks looping in the
1028 # Instead of having many start tasks, we will have only one for
1030 callback = functools.partial(wait_all_and_start, group)
1031 self.schedule("0s", callback)
1034 rm = self.get_resource(guid)
1035 rm.deployment_group = group
1036 self.schedule("0s", rm.deploy_with_conditions)
1038 if not wait_all_ready:
1039 self.schedule("0s", rm.start_with_conditions)
1041 if rm.conditions.get(ResourceAction.STOP):
1042 # Only if the RM has STOP conditions we
1043 # schedule a stop. Otherwise the RM will stop immediately
1044 self.schedule("0s", rm.stop_with_conditions)
1046 def release(self, guids = None):
1047 """ Releases all ResourceManagers in the guids list.
1049 If the argument 'guids' is not given, all RMs registered
1050 in the experiment are released.
1052 :param guids: List of RM guids
1056 if self._state == ECState.RELEASED:
1059 if isinstance(guids, int):
1063 guids = self.resources
1066 rm = self.get_resource(guid)
1067 self.schedule("0s", rm.release)
1069 self.wait_released(guids)
1075 if self.get(guid, "hardRelease"):
1076 self.remove_resource(guid)\
1078 # Mark the EC state as RELEASED
1079 self._state = ECState.RELEASED
1082 """ Releases all resources and stops the ExperimentController
1085 # If there was a major failure we can't exit gracefully
1086 if self._state == ECState.FAILED:
1087 raise RuntimeError("EC failure. Can not exit gracefully")
1089 # Remove all pending tasks from the scheduler queue
1090 for tid in list(self._scheduler.pending):
1091 self._scheduler.remove(tid)
1093 # Remove pending tasks from the workers queue
1094 self._runner.empty()
1098 # Mark the EC state as TERMINATED
1099 self._state = ECState.TERMINATED
1101 # Stop processing thread
1104 # Notify condition to wake up the processing thread
1107 if self._thread.is_alive():
1110 def schedule(self, date, callback, track = False):
1111 """ Schedules a callback to be executed at time 'date'.
1113 :param date: string containing execution time for the task.
1114 It can be expressed as an absolute time, using
1115 timestamp format, or as a relative time matching
1116 ^\d+.\d+(h|m|s|ms|us)$
1118 :param callback: code to be executed for the task. Must be a
1119 Python function, and receives args and kwargs
1122 :param track: if set to True, the task will be retrievable with
1123 the get_task() method
1125 :return : The Id of the task
1129 timestamp = stabsformat(date)
1130 task = Task(timestamp, callback)
1131 task = self._scheduler.schedule(task)
1134 self._tasks[task.id] = task
1136 # Notify condition to wake up the processing thread
1142 """ Process scheduled tasks.
1146 Tasks are scheduled by invoking the schedule method with a target
1147 callback and an execution time.
1148 The schedule method creates a new Task object with that callback
1149 and execution time, and pushes it into the '_scheduler' queue.
1150 The execution time and the order of arrival of tasks are used
1151 to order the tasks in the queue.
1153 The _process method is executed in an independent thread held by
1154 the ExperimentController for as long as the experiment is running.
1155 This method takes tasks from the '_scheduler' queue in a loop
1156 and processes them in parallel using multithreading.
1157 The environmental variable NEPI_NTHREADS can be used to control
1158 the number of threads used to process tasks. The default value is
1161 To execute tasks in parallel, a ParallelRunner (PR) object is used.
1162 This object keeps a pool of threads (workers), and a queue of tasks
1163 scheduled for 'immediate' execution.
1165 On each iteration, the '_process' loop will take the next task that
1166 is scheduled for 'future' execution from the '_scheduler' queue,
1167 and if the execution time of that task is >= to the current time,
1168 it will push that task into the PR for 'immediate execution'.
1169 As soon as a worker is free, the PR will assign the next task to
1172 Upon receiving a task to execute, each PR worker (thread) will
1173 invoke the _execute method of the EC, passing the task as
1175 The _execute method will then invoke task.callback inside a
1176 try/except block. If an exception is raised by the tasks.callback,
1177 it will be trapped by the try block, logged to standard error
1178 (usually the console), and the task will be marked as failed.
1182 self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
1183 self._runner = ParallelRun(maxthreads = self.nthreads)
1184 self._runner.start()
1186 while not self._stop:
1188 self._cond.acquire()
1190 task = self._scheduler.next()
1193 # No task to execute. Wait for a new task to be scheduled.
1196 # The task timestamp is in the future. Wait for timeout
1197 # or until another task is scheduled.
1199 if now < task.timestamp:
1200 # Calculate timeout in seconds
1201 timeout = tdiffsec(task.timestamp, now)
1203 # Re-schedule task with the same timestamp
1204 self._scheduler.schedule(task)
1208 # Wait timeout or until a new task awakes the condition
1209 self._cond.wait(timeout)
1211 self._cond.release()
1214 # Process tasks in parallel
1215 self._runner.put(self._execute, task)
1218 err = traceback.format_exc()
1219 self.logger.error("Error while processing tasks in the EC: %s" % err)
1221 # Set the EC to FAILED state
1222 self._state = ECState.FAILED
1224 # Set the FailureManager failure level to EC failure
1225 self._fm.set_ec_failure()
1227 self.logger.debug("Exiting the task processing loop ... ")
1230 self._runner.destroy()
1232 def _execute(self, task):
1233 """ Executes a single task.
1235 :param task: Object containing the callback to execute
1241 task.result = task.callback()
1242 task.status = TaskStatus.DONE
1245 err = traceback.format_exc()
1247 task.status = TaskStatus.ERROR
1249 self.logger.error("Error occurred while executing task: %s" % err)
1252 """ Awakes the processing thread if it is blocked waiting
1253 for new tasks to arrive
1256 self._cond.acquire()
1258 self._cond.release()
1260 def _build_from_netgraph(self, add_node_callback, add_edge_callback,
1262 """ Automates experiment description using a NetGraph instance.
1264 self._netgraph = NetGraph(**kwargs)
1266 if add_node_callback:
1267 ### Add resources to the EC
1268 for nid in self.netgraph.nodes():
1269 add_node_callback(self, nid)
1271 if add_edge_callback:
1272 #### Add connections between resources
1273 for nid1, nid2 in self.netgraph.edges():
1274 add_edge_callback(self, nid1, nid2)