2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
21 from nepi.util import guid
22 from nepi.util.parallel import ParallelRun
23 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat
24 from nepi.execution.resource import ResourceFactory, ResourceAction, \
25 ResourceState, ResourceState2str
26 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
27 from nepi.execution.trace import TraceAttr
28 from nepi.util.serializer import ECSerializer, SFormats
29 from nepi.util.plotter import ECPlotter, PFormats
30 from nepi.util.netgraph import NetGraph, TopologyType
32 # TODO: use multiprocessing instead of threading
33 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
44 class FailureLevel(object):
45 """ Possible failure states for the experiment """
50 class FailureManager(object):
51 """ The FailureManager is responsible for handling errors
52 and deciding whether an experiment should be aborted or not
57 self._failure_level = FailureLevel.OK
61 self._ec = weakref.ref(ec)
65 """ Returns the ExperimentController associated to this FailureManager
73 def eval_failure(self, guid):
74 """ Implements failure policy and sets the abort state of the
75 experiment based on the failure state and criticality of
78 :param guid: Guid of the RM upon which the failure of the experiment
83 if self._failure_level == FailureLevel.OK:
84 rm = self.ec.get_resource(guid)
86 critical = rm.get("critical")
88 if state == ResourceState.FAILED and critical:
89 self._failure_level = FailureLevel.RM_FAILURE
91 self.ec.logger.debug("RM critical failure occurred on guid %d." \
92 " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
94 def set_ec_failure(self):
95 self._failure_level = FailureLevel.EC_FAILURE
97 class ECState(object):
98 """ Possible states of the ExperimentController
106 class ExperimentController(object):
110 An experiment, or scenario, is defined by a concrete set of resources,
111 and the behavior, configuration and interconnection of those resources.
112 The Experiment Description (ED) is a detailed representation of a
113 single experiment. It contains all the necessary information to
114 allow repeating the experiment. NEPI allows to describe
115 experiments by registering components (resources), configuring them
116 and interconnecting them.
118 A same experiment (scenario) can be executed many times, generating
119 different results. We call an experiment execution (instance) a 'run'.
121 The ExperimentController (EC), is the entity responsible of
122 managing an experiment run. The same scenario can be
123 recreated (and re-run) by instantiating an EC and recreating
124 the same experiment description.
126 An experiment is represented as a graph of interconnected
127 resources. A resource is a generic concept in the sense that any
128 component taking part of an experiment, whether physical of
129 virtual, is considered a resource. A resources could be a host,
130 a virtual machine, an application, a simulator, a IP address.
132 A ResourceManager (RM), is the entity responsible for managing a
133 single resource. ResourceManagers are specific to a resource
134 type (i.e. An RM to control a Linux application will not be
135 the same as the RM used to control a ns-3 simulation).
136 To support a new type of resource, a new RM must be implemented.
137 NEPI already provides a variety of RMs to control basic resources,
138 and new can be extended from the existing ones.
140 Through the EC interface the user can create ResourceManagers (RMs),
141 configure them and interconnect them, to describe an experiment.
142 Describing an experiment through the EC does not run the experiment.
143 Only when the 'deploy()' method is invoked on the EC, the EC will take
144 actions to transform the 'described' experiment into a 'running' experiment.
146 While the experiment is running, it is possible to continue to
147 create/configure/connect RMs, and to deploy them to involve new
148 resources in the experiment (this is known as 'interactive' deployment).
150 An experiments in NEPI is identified by a string id,
151 which is either given by the user, or automatically generated by NEPI.
152 The purpose of this identifier is to separate files and results that
153 belong to different experiment scenarios.
154 However, since a same 'experiment' can be run many times, the experiment
155 id is not enough to identify an experiment instance (run).
156 For this reason, the ExperimentController has two identifier, the
157 exp_id, which can be re-used in different ExperimentController,
158 and the run_id, which is unique to one ExperimentController instance, and
159 is automatically generated by NEPI.
164 def load(cls, filepath, format = SFormats.XML):
165 serializer = ECSerializer()
166 ec = serializer.load(filepath)
169 def __init__(self, exp_id = None, local_dir = None, persist = False,
170 fm = None, add_node_callback = None, add_edge_callback = None,
172 """ ExperimentController entity to model an execute a network
175 :param exp_id: Human readable name to identify the experiment
178 :param local_dir: Path to local directory where to store experiment
182 :param persist: Save an XML description of the experiment after
183 completion at local_dir
186 :param fm: FailureManager object. If None is given, the default
187 FailureManager class will be used
188 :type fm: FailureManager
190 :param add_node_callback: Callback to invoke for node instantiation
191 when automatic topology creation mode is used
192 :type add_node_callback: function
194 :param add_edge_callback: Callback to invoke for edge instantiation
195 when automatic topology creation mode is used
196 :type add_edge_callback: function
199 super(ExperimentController, self).__init__()
202 self._logger = logging.getLogger("ExperimentController")
204 # Run identifier. It identifies a concrete execution instance (run)
206 # Since a same experiment (same configuration) can be executed many
207 # times, this run_id permits to separate result files generated on
208 # different experiment executions
209 self._run_id = tsformat()
211 # Experiment identifier. Usually assigned by the user
212 # Identifies the experiment scenario (i.e. configuration,
213 # resources used, etc)
214 self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
216 # Local path where to store experiment related files (results, etc)
218 local_dir = tempfile.gettempdir() # /tmp
220 self._local_dir = local_dir
221 self._exp_dir = os.path.join(local_dir, self.exp_id)
222 self._run_dir = os.path.join(self.exp_dir, self.run_id)
224 # If True persist the experiment controller in XML format, after completion
225 self._persist = persist
227 # generator of globally unique ids
228 self._guid_generator = guid.GuidGenerator()
231 self._resources = dict()
233 # Scheduler. It a queue that holds tasks scheduled for
234 # execution, and yields the next task to be executed
235 # ordered by execution and arrival time
236 self._scheduler = HeapScheduler()
241 # RM groups (for deployment)
242 self._groups = dict()
244 # generator of globally unique id for groups
245 self._group_id_generator = guid.GuidGenerator()
247 # Flag to stop processing thread
250 # Entity in charge of managing system failures
252 self._fm = FailureManager()
253 self._fm.set_ec(self)
256 self._state = ECState.RUNNING
258 # Automatically construct experiment description
259 self._netgraph = None
260 if add_node_callback or add_edge_callback or kwargs.get("topology"):
261 self._build_from_netgraph(add_node_callback, add_edge_callback,
264 # The runner is a pool of threads used to parallelize
269 # Event processing thread
270 self._cond = threading.Condition()
271 self._thread = threading.Thread(target = self._process)
272 self._thread.setDaemon(True)
277 """ Returns the logger instance of the Experiment Controller
284 """ Returns the failure manager
291 def failure_level(self):
292 """ Returns the level of FAILURE of th experiment
296 return self._fm._failure_level
300 """ Returns the state of the Experiment Controller
307 """ Returns the experiment id assigned by the user
314 """ Returns the experiment instance (run) identifier (automatically
322 """ Returns the number of processing nthreads used
325 return self._nthreads
329 """ Root local directory for experiment files
332 return self._local_dir
336 """ Local directory to store results and other files related to the
344 """ Local directory to store results and other files related to the
352 """ If True, persists the ExperimentController to XML format upon
353 experiment completion
360 """ Return NetGraph instance if experiment description was automatically
364 return self._netgraph
368 """ Returns True if the experiment has failed and should be interrupted,
372 return self._fm.abort
374 def inform_failure(self, guid):
375 """ Reports a failure in a RM to the EC for evaluation
377 :param guid: Resource id
382 return self._fm.eval_failure(guid)
384 def wait_finished(self, guids):
385 """ Blocking method that waits until all RMs in the 'guids' list
386 have reached a state >= STOPPED (i.e. STOPPED, FAILED or
387 RELEASED ), or until a failure in the experiment occurs
390 :param guids: List of guids
398 return self.wait(guids, state = ResourceState.STOPPED,
401 def wait_started(self, guids):
402 """ Blocking method that waits until all RMs in the 'guids' list
403 have reached a state >= STARTED, or until a failure in the
404 experiment occurs (i.e. abort == True)
406 :param guids: List of guids
414 return self.wait(guids, state = ResourceState.STARTED,
417 def wait_released(self, guids):
418 """ Blocking method that waits until all RMs in the 'guids' list
419 have reached a state == RELEASED, or until the EC fails
421 :param guids: List of guids
427 return self._state == ECState.FAILED
429 return self.wait(guids, state = ResourceState.RELEASED,
432 def wait_deployed(self, guids):
433 """ Blocking method that waits until all RMs in the 'guids' list
434 have reached a state >= READY, or until a failure in the
435 experiment occurs (i.e. abort == True)
437 :param guids: List of guids
445 return self.wait(guids, state = ResourceState.READY,
448 def wait(self, guids, state, quit):
449 """ Blocking method that waits until all RMs in the 'guids' list
450 have reached a state >= 'state', or until the 'quit' callback
453 :param guids: List of guids
457 if isinstance(guids, int):
460 # Make a copy to avoid modifying the original guids list
464 # If there are no more guids to wait for
465 # or the quit function returns True, exit the loop
466 if len(guids) == 0 or quit():
469 # If a guid reached one of the target states, remove it from list
471 rm = self.get_resource(guid)
475 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
476 rm.get_rtype(), guid, rstate, state))
479 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
480 guid, rstate, state))
486 def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
487 plotter = ECPlotter()
488 fpath = plotter.plot(self, dirpath = dirpath, format= format,
492 def serialize(self, format = SFormats.XML):
493 serializer = ECSerializer()
494 sec = serializer.load(self, format = format)
497 def save(self, dirpath = None, format = SFormats.XML):
499 dirpath = self.run_dir
506 serializer = ECSerializer()
507 path = serializer.save(self, dirpath, format = format)
510 def get_task(self, tid):
511 """ Returns a task by its id
513 :param tid: Id of the task
519 return self._tasks.get(tid)
521 def get_resource(self, guid):
522 """ Returns a registered ResourceManager by its guid
524 :param guid: Id of the resource
527 :rtype: ResourceManager
530 rm = self._resources.get(guid)
533 def get_resources_by_type(self, rtype):
534 """ Returns the ResourceManager objects of type rtype
536 :param rtype: Resource type
539 :rtype: list of ResourceManagers
543 for guid, rm in self._resources.items():
544 if rm.get_rtype() == rtype:
548 def remove_resource(self, guid):
549 del self._resources[guid]
553 """ Returns the guids of all ResourceManagers
555 :return: Set of all RM guids
559 keys = list(self._resources.keys())
563 def filter_resources(self, rtype):
564 """ Returns the guids of all ResourceManagers of type rtype
566 :param rtype: Resource type
569 :rtype: list of guids
573 for guid, rm in self._resources.items():
574 if rm.get_rtype() == rtype:
578 def register_resource(self, rtype, guid = None):
579 """ Registers a new ResourceManager of type 'rtype' in the experiment
581 This method will assign a new 'guid' for the RM, if no guid
584 :param rtype: Type of the RM
587 :return: Guid of the RM
591 # Get next available guid
593 guid = self._guid_generator.next(guid)
596 rm = ResourceFactory.create(rtype, self, guid)
599 self._resources[guid] = rm
603 def get_attributes(self, guid):
604 """ Returns all the attributes of the RM with guid 'guid'
606 :param guid: Guid of the RM
609 :return: List of attributes
613 rm = self.get_resource(guid)
614 return rm.get_attributes()
616 def get_attribute(self, guid, name):
617 """ Returns the attribute 'name' of the RM with guid 'guid'
619 :param guid: Guid of the RM
622 :param name: Name of the attribute
625 :return: The attribute with name 'name'
629 rm = self.get_resource(guid)
630 return rm.get_attribute(name)
632 def register_connection(self, guid1, guid2):
633 """ Registers a connection between a RM with guid 'guid1'
634 and another RM with guid 'guid2'.
636 The order of the in which the two guids are provided is not
637 important, since the connection relationship is symmetric.
639 :param guid1: First guid to connect
640 :type guid1: ResourceManager
642 :param guid2: Second guid to connect
643 :type guid: ResourceManager
646 rm1 = self.get_resource(guid1)
647 rm2 = self.get_resource(guid2)
649 rm1.register_connection(guid2)
650 rm2.register_connection(guid1)
652 def register_condition(self, guids1, action, guids2, state,
654 """ Registers an action START, STOP or DEPLOY for all RM on list
655 guids1 to occur at time 'time' after all elements in list guids2
656 have reached state 'state'.
658 :param guids1: List of guids of RMs subjected to action
661 :param action: Action to perform (either START, STOP or DEPLOY)
662 :type action: ResourceAction
664 :param guids2: List of guids of RMs to we waited for
667 :param state: State to wait for on RMs of list guids2 (STARTED,
669 :type state: ResourceState
671 :param time: Time to wait after guids2 has reached status
675 if isinstance(guids1, int):
677 if isinstance(guids2, int):
681 rm = self.get_resource(guid1)
682 rm.register_condition(action, guids2, state, time)
684 def enable_trace(self, guid, name):
685 """ Enables a trace to be collected during the experiment run
687 :param name: Name of the trace
691 rm = self.get_resource(guid)
692 rm.enable_trace(name)
694 def trace_enabled(self, guid, name):
695 """ Returns True if the trace of name 'name' is enabled
697 :param name: Name of the trace
701 rm = self.get_resource(guid)
702 return rm.trace_enabled(name)
704 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
705 """ Returns information on a collected trace, the trace stream or
706 blocks (chunks) of the trace stream
708 :param name: Name of the trace
711 :param attr: Can be one of:
712 - TraceAttr.ALL (complete trace content),
713 - TraceAttr.STREAM (block in bytes to read starting
715 - TraceAttr.PATH (full path to the trace file),
716 - TraceAttr.SIZE (size of trace file).
719 :param block: Number of bytes to retrieve from trace, when attr is
723 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
729 rm = self.get_resource(guid)
730 return rm.trace(name, attr, block, offset)
732 def get_traces(self, guid):
733 """ Returns the list of the trace names of the RM with guid 'guid'
735 :param guid: Guid of the RM
738 :return: List of trace names
742 rm = self.get_resource(guid)
743 return rm.get_traces()
746 def discover(self, guid):
747 """ Discovers an available resource matching the criteria defined
748 by the RM with guid 'guid', and associates that resource to the RM
750 Not all RM types require (or are capable of) performing resource
751 discovery. For the RM types which are not capable of doing so,
752 invoking this method does not have any consequences.
754 :param guid: Guid of the RM
758 rm = self.get_resource(guid)
761 def provision(self, guid):
762 """ Provisions the resource associated to the RM with guid 'guid'.
764 Provisioning means making a resource 'accessible' to the user.
765 Not all RM types require (or are capable of) performing resource
766 provisioning. For the RM types which are not capable of doing so,
767 invoking this method does not have any consequences.
769 :param guid: Guid of the RM
773 rm = self.get_resource(guid)
774 return rm.provision()
776 def get(self, guid, name):
777 """ Returns the value of the attribute with name 'name' on the
780 :param guid: Guid of the RM
783 :param name: Name of the attribute
786 :return: The value of the attribute with name 'name'
789 rm = self.get_resource(guid)
792 def set(self, guid, name, value):
793 """ Modifies the value of the attribute with name 'name' on the
796 :param guid: Guid of the RM
799 :param name: Name of the attribute
802 :param value: Value of the attribute
805 rm = self.get_resource(guid)
808 def get_global(self, rtype, name):
809 """ Returns the value of the global attribute with name 'name' on the
810 RMs of rtype 'rtype'.
812 :param guid: Guid of the RM
815 :param name: Name of the attribute
818 :return: The value of the attribute with name 'name'
821 rclass = ResourceFactory.get_resource_type(rtype)
822 return rclass.get_global(name)
824 def set_global(self, rtype, name, value):
825 """ Modifies the value of the global attribute with name 'name' on the
826 RMs of with rtype 'rtype'.
828 :param guid: Guid of the RM
831 :param name: Name of the attribute
834 :param value: Value of the attribute
837 rclass = ResourceFactory.get_resource_type(rtype)
838 return rclass.set_global(name, value)
840 def state(self, guid, hr = False):
841 """ Returns the state of a resource
843 :param guid: Resource guid
846 :param hr: Human readable. Forces return of a
847 status string instead of a number
851 rm = self.get_resource(guid)
855 return ResourceState2str.get(state)
859 def stop(self, guid):
860 """ Stops the RM with guid 'guid'
862 Stopping a RM means that the resource it controls will
863 no longer take part of the experiment.
865 :param guid: Guid of the RM
869 rm = self.get_resource(guid)
872 def start(self, guid):
873 """ Starts the RM with guid 'guid'
875 Starting a RM means that the resource it controls will
876 begin taking part of the experiment.
878 :param guid: Guid of the RM
882 rm = self.get_resource(guid)
885 def get_start_time(self, guid):
886 """ Returns the start time of the RM as a timestamp """
887 rm = self.get_resource(guid)
890 def get_stop_time(self, guid):
891 """ Returns the stop time of the RM as a timestamp """
892 rm = self.get_resource(guid)
895 def get_discover_time(self, guid):
896 """ Returns the discover time of the RM as a timestamp """
897 rm = self.get_resource(guid)
898 return rm.discover_time
900 def get_provision_time(self, guid):
901 """ Returns the provision time of the RM as a timestamp """
902 rm = self.get_resource(guid)
903 return rm.provision_time
905 def get_ready_time(self, guid):
906 """ Returns the deployment time of the RM as a timestamp """
907 rm = self.get_resource(guid)
910 def get_release_time(self, guid):
911 """ Returns the release time of the RM as a timestamp """
912 rm = self.get_resource(guid)
913 return rm.release_time
915 def get_failed_time(self, guid):
916 """ Returns the time failure occured for the RM as a timestamp """
917 rm = self.get_resource(guid)
918 return rm.failed_time
920 def set_with_conditions(self, name, value, guids1, guids2, state,
922 """ Modifies the value of attribute with name 'name' on all RMs
923 on the guids1 list when time 'time' has elapsed since all
924 elements in guids2 list have reached state 'state'.
926 :param name: Name of attribute to set in RM
929 :param value: Value of attribute to set in RM
932 :param guids1: List of guids of RMs subjected to action
935 :param action: Action to register (either START or STOP)
936 :type action: ResourceAction
938 :param guids2: List of guids of RMs to we waited for
941 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
942 :type state: ResourceState
944 :param time: Time to wait after guids2 has reached status
948 if isinstance(guids1, int):
950 if isinstance(guids2, int):
954 rm = self.get_resource(guid)
955 rm.set_with_conditions(name, value, guids2, state, time)
957 def deploy(self, guids = None, wait_all_ready = True, group = None):
958 """ Deploys all ResourceManagers in the guids list.
960 If the argument 'guids' is not given, all RMs with state NEW
963 :param guids: List of guids of RMs to deploy
966 :param wait_all_ready: Wait until all RMs are ready in
967 order to start the RMs
970 :param group: Id of deployment group in which to deploy RMs
974 self.logger.debug(" ------- DEPLOY START ------ ")
977 # If no guids list was passed, all 'NEW' RMs will be deployed
979 for guid, rm in self._resources.items():
980 if rm.state == ResourceState.NEW:
983 if isinstance(guids, int):
986 # Create deployment group
987 # New guids can be added to a same deployment group later on
992 group = self._group_id_generator.next()
994 if group not in self._groups:
995 self._groups[group] = []
997 self._groups[group].extend(guids)
999 def wait_all_and_start(group):
1000 # Function that checks if all resources are READY
1001 # before scheduling a start_with_conditions for each RM
1004 # Get all guids in group
1005 guids = self._groups[group]
1008 if self.state(guid) < ResourceState.READY:
1013 callback = functools.partial(wait_all_and_start, group)
1014 self.schedule("1s", callback)
1016 # If all resources are ready, we schedule the start
1018 rm = self.get_resource(guid)
1019 self.schedule("0s", rm.start_with_conditions)
1021 if rm.conditions.get(ResourceAction.STOP):
1022 # Only if the RM has STOP conditions we
1023 # schedule a stop. Otherwise the RM will stop immediately
1024 self.schedule("0s", rm.stop_with_conditions)
1026 if wait_all_ready and new_group:
1027 # Schedule a function to check that all resources are
1028 # READY, and only then schedule the start.
1029 # This aims at reducing the number of tasks looping in the
1031 # Instead of having many start tasks, we will have only one for
1033 callback = functools.partial(wait_all_and_start, group)
1034 self.schedule("0s", callback)
1037 rm = self.get_resource(guid)
1038 rm.deployment_group = group
1039 self.schedule("0s", rm.deploy_with_conditions)
1041 if not wait_all_ready:
1042 self.schedule("0s", rm.start_with_conditions)
1044 if rm.conditions.get(ResourceAction.STOP):
1045 # Only if the RM has STOP conditions we
1046 # schedule a stop. Otherwise the RM will stop immediately
1047 self.schedule("0s", rm.stop_with_conditions)
1049 def release(self, guids = None):
1050 """ Releases all ResourceManagers in the guids list.
1052 If the argument 'guids' is not given, all RMs registered
1053 in the experiment are released.
1055 :param guids: List of RM guids
1059 if self._state == ECState.RELEASED:
1062 if isinstance(guids, int):
1066 guids = self.resources
1069 rm = self.get_resource(guid)
1070 self.schedule("0s", rm.release)
1072 self.wait_released(guids)
1078 if self.get(guid, "hardRelease"):
1079 self.remove_resource(guid)\
1081 # Mark the EC state as RELEASED
1082 self._state = ECState.RELEASED
1085 """ Releases all resources and stops the ExperimentController
1088 # If there was a major failure we can't exit gracefully
1089 if self._state == ECState.FAILED:
1090 raise RuntimeError("EC failure. Can not exit gracefully")
1092 # Remove all pending tasks from the scheduler queue
1093 for tid in list(self._scheduler.pending):
1094 self._scheduler.remove(tid)
1096 # Remove pending tasks from the workers queue
1097 self._runner.empty()
1101 # Mark the EC state as TERMINATED
1102 self._state = ECState.TERMINATED
1104 # Stop processing thread
1107 # Notify condition to wake up the processing thread
1110 if self._thread.is_alive():
1113 def schedule(self, date, callback, track = False):
1114 """ Schedules a callback to be executed at time 'date'.
1116 :param date: string containing execution time for the task.
1117 It can be expressed as an absolute time, using
1118 timestamp format, or as a relative time matching
1119 ^\d+.\d+(h|m|s|ms|us)$
1121 :param callback: code to be executed for the task. Must be a
1122 Python function, and receives args and kwargs
1125 :param track: if set to True, the task will be retrievable with
1126 the get_task() method
1128 :return : The Id of the task
1132 timestamp = stabsformat(date)
1133 task = Task(timestamp, callback)
1134 task = self._scheduler.schedule(task)
1137 self._tasks[task.id] = task
1139 # Notify condition to wake up the processing thread
1145 """ Process scheduled tasks.
1149 Tasks are scheduled by invoking the schedule method with a target
1150 callback and an execution time.
1151 The schedule method creates a new Task object with that callback
1152 and execution time, and pushes it into the '_scheduler' queue.
1153 The execution time and the order of arrival of tasks are used
1154 to order the tasks in the queue.
1156 The _process method is executed in an independent thread held by
1157 the ExperimentController for as long as the experiment is running.
1158 This method takes tasks from the '_scheduler' queue in a loop
1159 and processes them in parallel using multithreading.
1160 The environmental variable NEPI_NTHREADS can be used to control
1161 the number of threads used to process tasks. The default value is
1164 To execute tasks in parallel, a ParallelRunner (PR) object is used.
1165 This object keeps a pool of threads (workers), and a queue of tasks
1166 scheduled for 'immediate' execution.
1168 On each iteration, the '_process' loop will take the next task that
1169 is scheduled for 'future' execution from the '_scheduler' queue,
1170 and if the execution time of that task is >= to the current time,
1171 it will push that task into the PR for 'immediate execution'.
1172 As soon as a worker is free, the PR will assign the next task to
1175 Upon receiving a task to execute, each PR worker (thread) will
1176 invoke the _execute method of the EC, passing the task as
1178 The _execute method will then invoke task.callback inside a
1179 try/except block. If an exception is raised by the tasks.callback,
1180 it will be trapped by the try block, logged to standard error
1181 (usually the console), and the task will be marked as failed.
1185 self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
1186 self._runner = ParallelRun(maxthreads = self.nthreads)
1187 self._runner.start()
1189 while not self._stop:
1191 self._cond.acquire()
1193 task = next(self._scheduler)
1196 # No task to execute. Wait for a new task to be scheduled.
1199 # The task timestamp is in the future. Wait for timeout
1200 # or until another task is scheduled.
1202 if now < task.timestamp:
1203 # Calculate timeout in seconds
1204 timeout = tdiffsec(task.timestamp, now)
1206 # Re-schedule task with the same timestamp
1207 self._scheduler.schedule(task)
1211 # Wait timeout or until a new task awakes the condition
1212 self._cond.wait(timeout)
1214 self._cond.release()
1217 # Process tasks in parallel
1218 self._runner.put(self._execute, task)
1221 err = traceback.format_exc()
1222 self.logger.error("Error while processing tasks in the EC: %s" % err)
1224 # Set the EC to FAILED state
1225 self._state = ECState.FAILED
1227 # Set the FailureManager failure level to EC failure
1228 self._fm.set_ec_failure()
1230 self.logger.debug("Exiting the task processing loop ... ")
1233 self._runner.destroy()
1235 def _execute(self, task):
1236 """ Executes a single task.
1238 :param task: Object containing the callback to execute
1244 task.result = task.callback()
1245 task.status = TaskStatus.DONE
1248 err = traceback.format_exc()
1250 task.status = TaskStatus.ERROR
1252 self.logger.error("Error occurred while executing task: %s" % err)
1255 """ Awakes the processing thread if it is blocked waiting
1256 for new tasks to arrive
1259 self._cond.acquire()
1261 self._cond.release()
1263 def _build_from_netgraph(self, add_node_callback, add_edge_callback,
1265 """ Automates experiment description using a NetGraph instance.
1267 self._netgraph = NetGraph(**kwargs)
1269 if add_node_callback:
1270 ### Add resources to the EC
1271 for nid in self.netgraph.nodes():
1272 add_node_callback(self, nid)
1274 if add_edge_callback:
1275 #### Add connections between resources
1276 for nid1, nid2 in self.netgraph.edges():
1277 add_edge_callback(self, nid1, nid2)