2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 from nepi.util import guid
20 from nepi.util.parallel import ParallelRun
21 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat
22 from nepi.execution.resource import ResourceFactory, ResourceAction, \
23 ResourceState, ResourceState2str
24 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
25 from nepi.execution.trace import TraceAttr
26 from nepi.util.serializer import ECSerializer, SFormats
27 from nepi.util.plotter import ECPlotter, PFormats
28 from nepi.util.netgraph import NetGraph, TopologyType
30 # TODO: use multiprocessing instead of threading
31 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
42 class FailureLevel(object):
43 """ Possible failure states for the experiment """
48 class FailureManager(object):
49 """ The FailureManager is responsible for handling errors
50 and deciding whether an experiment should be aborted or not
55 self._failure_level = FailureLevel.OK
59 self._ec = weakref.ref(ec)
63 """ Returns the ExperimentController associated to this FailureManager
71 def eval_failure(self, guid):
72 """ Implements failure policy and sets the abort state of the
73 experiment based on the failure state and criticality of
76 :param guid: Guid of the RM upon which the failure of the experiment
81 if self._failure_level == FailureLevel.OK:
82 rm = self.ec.get_resource(guid)
84 critical = rm.get("critical")
86 if state == ResourceState.FAILED and critical:
87 self._failure_level = FailureLevel.RM_FAILURE
89 self.ec.logger.debug("RM critical failure occurred on guid %d." \
90 " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
92 def set_ec_failure(self):
93 self._failure_level = FailureLevel.EC_FAILURE
95 class ECState(object):
96 """ Possible states of the ExperimentController
104 class ExperimentController(object):
108 An experiment, or scenario, is defined by a concrete set of resources,
109 and the behavior, configuration and interconnection of those resources.
110 The Experiment Description (ED) is a detailed representation of a
111 single experiment. It contains all the necessary information to
112 allow repeating the experiment. NEPI allows to describe
113 experiments by registering components (resources), configuring them
114 and interconnecting them.
116 A same experiment (scenario) can be executed many times, generating
117 different results. We call an experiment execution (instance) a 'run'.
119 The ExperimentController (EC), is the entity responsible of
120 managing an experiment run. The same scenario can be
121 recreated (and re-run) by instantiating an EC and recreating
122 the same experiment description.
124 An experiment is represented as a graph of interconnected
125 resources. A resource is a generic concept in the sense that any
126 component taking part of an experiment, whether physical of
127 virtual, is considered a resource. A resources could be a host,
128 a virtual machine, an application, a simulator, a IP address.
130 A ResourceManager (RM), is the entity responsible for managing a
131 single resource. ResourceManagers are specific to a resource
132 type (i.e. An RM to control a Linux application will not be
133 the same as the RM used to control a ns-3 simulation).
134 To support a new type of resource, a new RM must be implemented.
135 NEPI already provides a variety of RMs to control basic resources,
136 and new can be extended from the existing ones.
138 Through the EC interface the user can create ResourceManagers (RMs),
139 configure them and interconnect them, to describe an experiment.
140 Describing an experiment through the EC does not run the experiment.
141 Only when the 'deploy()' method is invoked on the EC, the EC will take
142 actions to transform the 'described' experiment into a 'running' experiment.
144 While the experiment is running, it is possible to continue to
145 create/configure/connect RMs, and to deploy them to involve new
146 resources in the experiment (this is known as 'interactive' deployment).
148 An experiments in NEPI is identified by a string id,
149 which is either given by the user, or automatically generated by NEPI.
150 The purpose of this identifier is to separate files and results that
151 belong to different experiment scenarios.
152 However, since a same 'experiment' can be run many times, the experiment
153 id is not enough to identify an experiment instance (run).
154 For this reason, the ExperimentController has two identifier, the
155 exp_id, which can be re-used in different ExperimentController,
156 and the run_id, which is unique to one ExperimentController instance, and
157 is automatically generated by NEPI.
162 def load(cls, filepath, format = SFormats.XML):
163 serializer = ECSerializer()
164 ec = serializer.load(filepath)
167 def __init__(self, exp_id = None, local_dir = None, persist = False,
168 fm = None, add_node_callback = None, add_edge_callback = None,
170 """ ExperimentController entity to model an execute a network
173 :param exp_id: Human readable name to identify the experiment
176 :param local_dir: Path to local directory where to store experiment
180 :param persist: Save an XML description of the experiment after
181 completion at local_dir
184 :param fm: FailureManager object. If None is given, the default
185 FailureManager class will be used
186 :type fm: FailureManager
188 :param add_node_callback: Callback to invoke for node instantiation
189 when automatic topology creation mode is used
190 :type add_node_callback: function
192 :param add_edge_callback: Callback to invoke for edge instantiation
193 when automatic topology creation mode is used
194 :type add_edge_callback: function
197 super(ExperimentController, self).__init__()
200 self._logger = logging.getLogger("ExperimentController")
202 # Run identifier. It identifies a concrete execution instance (run)
204 # Since a same experiment (same configuration) can be executed many
205 # times, this run_id permits to separate result files generated on
206 # different experiment executions
207 self._run_id = tsformat()
209 # Experiment identifier. Usually assigned by the user
210 # Identifies the experiment scenario (i.e. configuration,
211 # resources used, etc)
212 self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
214 # Local path where to store experiment related files (results, etc)
216 local_dir = tempfile.gettempdir() # /tmp
218 self._local_dir = local_dir
219 self._exp_dir = os.path.join(local_dir, self.exp_id)
220 self._run_dir = os.path.join(self.exp_dir, self.run_id)
222 # If True persist the experiment controller in XML format, after completion
223 self._persist = persist
225 # generator of globally unique ids
226 self._guid_generator = guid.GuidGenerator()
229 self._resources = dict()
231 # Scheduler. It a queue that holds tasks scheduled for
232 # execution, and yields the next task to be executed
233 # ordered by execution and arrival time
234 self._scheduler = HeapScheduler()
239 # RM groups (for deployment)
240 self._groups = dict()
242 # generator of globally unique id for groups
243 self._group_id_generator = guid.GuidGenerator()
245 # Flag to stop processing thread
248 # Entity in charge of managing system failures
250 self._fm = FailureManager()
251 self._fm.set_ec(self)
254 self._state = ECState.RUNNING
256 # Automatically construct experiment description
257 self._netgraph = None
258 if add_node_callback or add_edge_callback or kwargs.get("topology"):
259 self._build_from_netgraph(add_node_callback, add_edge_callback,
262 # The runner is a pool of threads used to parallelize
267 # Event processing thread
268 self._cond = threading.Condition()
269 self._thread = threading.Thread(target = self._process)
270 self._thread.setDaemon(True)
275 """ Returns the logger instance of the Experiment Controller
282 """ Returns the failure manager
289 def failure_level(self):
290 """ Returns the level of FAILURE of th experiment
294 return self._fm._failure_level
298 """ Returns the state of the Experiment Controller
305 """ Returns the experiment id assigned by the user
312 """ Returns the experiment instance (run) identifier (automatically
320 """ Returns the number of processing nthreads used
323 return self._nthreads
327 """ Root local directory for experiment files
330 return self._local_dir
334 """ Local directory to store results and other files related to the
342 """ Local directory to store results and other files related to the
350 """ If True, persists the ExperimentController to XML format upon
351 experiment completion
358 """ Return NetGraph instance if experiment description was automatically
362 return self._netgraph
366 """ Returns True if the experiment has failed and should be interrupted,
370 return self._fm.abort
372 def inform_failure(self, guid):
373 """ Reports a failure in a RM to the EC for evaluation
375 :param guid: Resource id
380 return self._fm.eval_failure(guid)
382 def wait_finished(self, guids):
383 """ Blocking method that waits until all RMs in the 'guids' list
384 have reached a state >= STOPPED (i.e. STOPPED, FAILED or
385 RELEASED ), or until a failure in the experiment occurs
388 :param guids: List of guids
396 return self.wait(guids, state = ResourceState.STOPPED,
399 def wait_started(self, guids):
400 """ Blocking method that waits until all RMs in the 'guids' list
401 have reached a state >= STARTED, or until a failure in the
402 experiment occurs (i.e. abort == True)
404 :param guids: List of guids
412 return self.wait(guids, state = ResourceState.STARTED,
415 def wait_released(self, guids):
416 """ Blocking method that waits until all RMs in the 'guids' list
417 have reached a state == RELEASED, or until the EC fails
419 :param guids: List of guids
425 return self._state == ECState.FAILED
427 return self.wait(guids, state = ResourceState.RELEASED,
430 def wait_deployed(self, guids):
431 """ Blocking method that waits until all RMs in the 'guids' list
432 have reached a state >= READY, or until a failure in the
433 experiment occurs (i.e. abort == True)
435 :param guids: List of guids
443 return self.wait(guids, state = ResourceState.READY,
446 def wait(self, guids, state, quit):
447 """ Blocking method that waits until all RMs in the 'guids' list
448 have reached a state >= 'state', or until the 'quit' callback
451 :param guids: List of guids
455 if isinstance(guids, int):
458 # Make a copy to avoid modifying the original guids list
462 # If there are no more guids to wait for
463 # or the quit function returns True, exit the loop
464 if len(guids) == 0 or quit():
467 # If a guid reached one of the target states, remove it from list
469 rm = self.get_resource(guid)
473 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
474 rm.get_rtype(), guid, rstate, state))
477 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
478 guid, rstate, state))
484 def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
485 plotter = ECPlotter()
486 fpath = plotter.plot(self, dirpath = dirpath, format= format,
490 def serialize(self, format = SFormats.XML):
491 serializer = ECSerializer()
492 sec = serializer.load(self, format = format)
495 def save(self, dirpath = None, format = SFormats.XML):
497 dirpath = self.run_dir
504 serializer = ECSerializer()
505 path = serializer.save(self, dirpath, format = format)
508 def get_task(self, tid):
509 """ Returns a task by its id
511 :param tid: Id of the task
517 return self._tasks.get(tid)
519 def get_resource(self, guid):
520 """ Returns a registered ResourceManager by its guid
522 :param guid: Id of the resource
525 :rtype: ResourceManager
528 rm = self._resources.get(guid)
531 def get_resources_by_type(self, rtype):
532 """ Returns the ResourceManager objects of type rtype
534 :param rtype: Resource type
537 :rtype: list of ResourceManagers
541 for guid, rm in self._resources.iteritems():
542 if rm.get_rtype() == rtype:
546 def remove_resource(self, guid):
547 del self._resources[guid]
551 """ Returns the guids of all ResourceManagers
553 :return: Set of all RM guids
557 keys = self._resources.keys()
561 def filter_resources(self, rtype):
562 """ Returns the guids of all ResourceManagers of type rtype
564 :param rtype: Resource type
567 :rtype: list of guids
571 for guid, rm in self._resources.iteritems():
572 if rm.get_rtype() == rtype:
576 def register_resource(self, rtype, guid = None):
577 """ Registers a new ResourceManager of type 'rtype' in the experiment
579 This method will assign a new 'guid' for the RM, if no guid
582 :param rtype: Type of the RM
585 :return: Guid of the RM
589 # Get next available guid
590 guid = self._guid_generator.next(guid)
593 rm = ResourceFactory.create(rtype, self, guid)
596 self._resources[guid] = rm
600 def get_attributes(self, guid):
601 """ Returns all the attributes of the RM with guid 'guid'
603 :param guid: Guid of the RM
606 :return: List of attributes
610 rm = self.get_resource(guid)
611 return rm.get_attributes()
613 def get_attribute(self, guid, name):
614 """ Returns the attribute 'name' of the RM with guid 'guid'
616 :param guid: Guid of the RM
619 :param name: Name of the attribute
622 :return: The attribute with name 'name'
626 rm = self.get_resource(guid)
627 return rm.get_attribute(name)
629 def register_connection(self, guid1, guid2):
630 """ Registers a connection between a RM with guid 'guid1'
631 and another RM with guid 'guid2'.
633 The order of the in which the two guids are provided is not
634 important, since the connection relationship is symmetric.
636 :param guid1: First guid to connect
637 :type guid1: ResourceManager
639 :param guid2: Second guid to connect
640 :type guid: ResourceManager
643 rm1 = self.get_resource(guid1)
644 rm2 = self.get_resource(guid2)
646 rm1.register_connection(guid2)
647 rm2.register_connection(guid1)
649 def register_condition(self, guids1, action, guids2, state,
651 """ Registers an action START, STOP or DEPLOY for all RM on list
652 guids1 to occur at time 'time' after all elements in list guids2
653 have reached state 'state'.
655 :param guids1: List of guids of RMs subjected to action
658 :param action: Action to perform (either START, STOP or DEPLOY)
659 :type action: ResourceAction
661 :param guids2: List of guids of RMs to we waited for
664 :param state: State to wait for on RMs of list guids2 (STARTED,
666 :type state: ResourceState
668 :param time: Time to wait after guids2 has reached status
672 if isinstance(guids1, int):
674 if isinstance(guids2, int):
678 rm = self.get_resource(guid1)
679 rm.register_condition(action, guids2, state, time)
681 def enable_trace(self, guid, name):
682 """ Enables a trace to be collected during the experiment run
684 :param name: Name of the trace
688 rm = self.get_resource(guid)
689 rm.enable_trace(name)
691 def trace_enabled(self, guid, name):
692 """ Returns True if the trace of name 'name' is enabled
694 :param name: Name of the trace
698 rm = self.get_resource(guid)
699 return rm.trace_enabled(name)
701 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
702 """ Returns information on a collected trace, the trace stream or
703 blocks (chunks) of the trace stream
705 :param name: Name of the trace
708 :param attr: Can be one of:
709 - TraceAttr.ALL (complete trace content),
710 - TraceAttr.STREAM (block in bytes to read starting
712 - TraceAttr.PATH (full path to the trace file),
713 - TraceAttr.SIZE (size of trace file).
716 :param block: Number of bytes to retrieve from trace, when attr is
720 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
726 rm = self.get_resource(guid)
727 return rm.trace(name, attr, block, offset)
729 def get_traces(self, guid):
730 """ Returns the list of the trace names of the RM with guid 'guid'
732 :param guid: Guid of the RM
735 :return: List of trace names
739 rm = self.get_resource(guid)
740 return rm.get_traces()
743 def discover(self, guid):
744 """ Discovers an available resource matching the criteria defined
745 by the RM with guid 'guid', and associates that resource to the RM
747 Not all RM types require (or are capable of) performing resource
748 discovery. For the RM types which are not capable of doing so,
749 invoking this method does not have any consequences.
751 :param guid: Guid of the RM
755 rm = self.get_resource(guid)
758 def provision(self, guid):
759 """ Provisions the resource associated to the RM with guid 'guid'.
761 Provisioning means making a resource 'accessible' to the user.
762 Not all RM types require (or are capable of) performing resource
763 provisioning. For the RM types which are not capable of doing so,
764 invoking this method does not have any consequences.
766 :param guid: Guid of the RM
770 rm = self.get_resource(guid)
771 return rm.provision()
773 def get(self, guid, name):
774 """ Returns the value of the attribute with name 'name' on the
777 :param guid: Guid of the RM
780 :param name: Name of the attribute
783 :return: The value of the attribute with name 'name'
786 rm = self.get_resource(guid)
789 def set(self, guid, name, value):
790 """ Modifies the value of the attribute with name 'name' on the
793 :param guid: Guid of the RM
796 :param name: Name of the attribute
799 :param value: Value of the attribute
802 rm = self.get_resource(guid)
805 def get_global(self, rtype, name):
806 """ Returns the value of the global attribute with name 'name' on the
807 RMs of rtype 'rtype'.
809 :param guid: Guid of the RM
812 :param name: Name of the attribute
815 :return: The value of the attribute with name 'name'
818 rclass = ResourceFactory.get_resource_type(rtype)
819 return rclass.get_global(name)
821 def set_global(self, rtype, name, value):
822 """ Modifies the value of the global attribute with name 'name' on the
823 RMs of with rtype 'rtype'.
825 :param guid: Guid of the RM
828 :param name: Name of the attribute
831 :param value: Value of the attribute
834 rclass = ResourceFactory.get_resource_type(rtype)
835 return rclass.set_global(name, value)
837 def state(self, guid, hr = False):
838 """ Returns the state of a resource
840 :param guid: Resource guid
843 :param hr: Human readable. Forces return of a
844 status string instead of a number
848 rm = self.get_resource(guid)
852 return ResourceState2str.get(state)
856 def stop(self, guid):
857 """ Stops the RM with guid 'guid'
859 Stopping a RM means that the resource it controls will
860 no longer take part of the experiment.
862 :param guid: Guid of the RM
866 rm = self.get_resource(guid)
869 def start(self, guid):
870 """ Starts the RM with guid 'guid'
872 Starting a RM means that the resource it controls will
873 begin taking part of the experiment.
875 :param guid: Guid of the RM
879 rm = self.get_resource(guid)
882 def get_start_time(self, guid):
883 """ Returns the start time of the RM as a timestamp """
884 rm = self.get_resource(guid)
887 def get_stop_time(self, guid):
888 """ Returns the stop time of the RM as a timestamp """
889 rm = self.get_resource(guid)
892 def get_discover_time(self, guid):
893 """ Returns the discover time of the RM as a timestamp """
894 rm = self.get_resource(guid)
895 return rm.discover_time
897 def get_provision_time(self, guid):
898 """ Returns the provision time of the RM as a timestamp """
899 rm = self.get_resource(guid)
900 return rm.provision_time
902 def get_ready_time(self, guid):
903 """ Returns the deployment time of the RM as a timestamp """
904 rm = self.get_resource(guid)
907 def get_release_time(self, guid):
908 """ Returns the release time of the RM as a timestamp """
909 rm = self.get_resource(guid)
910 return rm.release_time
912 def get_failed_time(self, guid):
913 """ Returns the time failure occured for the RM as a timestamp """
914 rm = self.get_resource(guid)
915 return rm.failed_time
917 def set_with_conditions(self, name, value, guids1, guids2, state,
919 """ Modifies the value of attribute with name 'name' on all RMs
920 on the guids1 list when time 'time' has elapsed since all
921 elements in guids2 list have reached state 'state'.
923 :param name: Name of attribute to set in RM
926 :param value: Value of attribute to set in RM
929 :param guids1: List of guids of RMs subjected to action
932 :param action: Action to register (either START or STOP)
933 :type action: ResourceAction
935 :param guids2: List of guids of RMs to we waited for
938 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
939 :type state: ResourceState
941 :param time: Time to wait after guids2 has reached status
945 if isinstance(guids1, int):
947 if isinstance(guids2, int):
951 rm = self.get_resource(guid)
952 rm.set_with_conditions(name, value, guids2, state, time)
954 def deploy(self, guids = None, wait_all_ready = True, group = None):
955 """ Deploys all ResourceManagers in the guids list.
957 If the argument 'guids' is not given, all RMs with state NEW
960 :param guids: List of guids of RMs to deploy
963 :param wait_all_ready: Wait until all RMs are ready in
964 order to start the RMs
967 :param group: Id of deployment group in which to deploy RMs
971 self.logger.debug(" ------- DEPLOY START ------ ")
974 # If no guids list was passed, all 'NEW' RMs will be deployed
976 for guid, rm in self._resources.iteritems():
977 if rm.state == ResourceState.NEW:
980 if isinstance(guids, int):
983 # Create deployment group
984 # New guids can be added to a same deployment group later on
988 group = self._group_id_generator.next()
990 if group not in self._groups:
991 self._groups[group] = []
993 self._groups[group].extend(guids)
995 def wait_all_and_start(group):
996 # Function that checks if all resources are READY
997 # before scheduling a start_with_conditions for each RM
1000 # Get all guids in group
1001 guids = self._groups[group]
1004 if self.state(guid) < ResourceState.READY:
1009 callback = functools.partial(wait_all_and_start, group)
1010 self.schedule("1s", callback)
1012 # If all resources are ready, we schedule the start
1014 rm = self.get_resource(guid)
1015 self.schedule("0s", rm.start_with_conditions)
1017 if rm.conditions.get(ResourceAction.STOP):
1018 # Only if the RM has STOP conditions we
1019 # schedule a stop. Otherwise the RM will stop immediately
1020 self.schedule("0s", rm.stop_with_conditions)
1022 if wait_all_ready and new_group:
1023 # Schedule a function to check that all resources are
1024 # READY, and only then schedule the start.
1025 # This aims at reducing the number of tasks looping in the
1027 # Instead of having many start tasks, we will have only one for
1029 callback = functools.partial(wait_all_and_start, group)
1030 self.schedule("0s", callback)
1033 rm = self.get_resource(guid)
1034 rm.deployment_group = group
1035 self.schedule("0s", rm.deploy_with_conditions)
1037 if not wait_all_ready:
1038 self.schedule("0s", rm.start_with_conditions)
1040 if rm.conditions.get(ResourceAction.STOP):
1041 # Only if the RM has STOP conditions we
1042 # schedule a stop. Otherwise the RM will stop immediately
1043 self.schedule("0s", rm.stop_with_conditions)
1045 def release(self, guids = None):
1046 """ Releases all ResourceManagers in the guids list.
1048 If the argument 'guids' is not given, all RMs registered
1049 in the experiment are released.
1051 :param guids: List of RM guids
1055 if self._state == ECState.RELEASED:
1058 if isinstance(guids, int):
1062 guids = self.resources
1065 rm = self.get_resource(guid)
1066 self.schedule("0s", rm.release)
1068 self.wait_released(guids)
1074 if self.get(guid, "hardRelease"):
1075 self.remove_resource(guid)\
1077 # Mark the EC state as RELEASED
1078 self._state = ECState.RELEASED
1081 """ Releases all resources and stops the ExperimentController
1084 # If there was a major failure we can't exit gracefully
1085 if self._state == ECState.FAILED:
1086 raise RuntimeError("EC failure. Can not exit gracefully")
1088 # Remove all pending tasks from the scheduler queue
1089 for tid in list(self._scheduler.pending):
1090 self._scheduler.remove(tid)
1092 # Remove pending tasks from the workers queue
1093 self._runner.empty()
1097 # Mark the EC state as TERMINATED
1098 self._state = ECState.TERMINATED
1100 # Stop processing thread
1103 # Notify condition to wake up the processing thread
1106 if self._thread.is_alive():
1109 def schedule(self, date, callback, track = False):
1110 """ Schedules a callback to be executed at time 'date'.
1112 :param date: string containing execution time for the task.
1113 It can be expressed as an absolute time, using
1114 timestamp format, or as a relative time matching
1115 ^\d+.\d+(h|m|s|ms|us)$
1117 :param callback: code to be executed for the task. Must be a
1118 Python function, and receives args and kwargs
1121 :param track: if set to True, the task will be retrievable with
1122 the get_task() method
1124 :return : The Id of the task
1128 timestamp = stabsformat(date)
1129 task = Task(timestamp, callback)
1130 task = self._scheduler.schedule(task)
1133 self._tasks[task.id] = task
1135 # Notify condition to wake up the processing thread
1141 """ Process scheduled tasks.
1145 Tasks are scheduled by invoking the schedule method with a target
1146 callback and an execution time.
1147 The schedule method creates a new Task object with that callback
1148 and execution time, and pushes it into the '_scheduler' queue.
1149 The execution time and the order of arrival of tasks are used
1150 to order the tasks in the queue.
1152 The _process method is executed in an independent thread held by
1153 the ExperimentController for as long as the experiment is running.
1154 This method takes tasks from the '_scheduler' queue in a loop
1155 and processes them in parallel using multithreading.
1156 The environmental variable NEPI_NTHREADS can be used to control
1157 the number of threads used to process tasks. The default value is
1160 To execute tasks in parallel, a ParallelRunner (PR) object is used.
1161 This object keeps a pool of threads (workers), and a queue of tasks
1162 scheduled for 'immediate' execution.
1164 On each iteration, the '_process' loop will take the next task that
1165 is scheduled for 'future' execution from the '_scheduler' queue,
1166 and if the execution time of that task is >= to the current time,
1167 it will push that task into the PR for 'immediate execution'.
1168 As soon as a worker is free, the PR will assign the next task to
1171 Upon receiving a task to execute, each PR worker (thread) will
1172 invoke the _execute method of the EC, passing the task as
1174 The _execute method will then invoke task.callback inside a
1175 try/except block. If an exception is raised by the tasks.callback,
1176 it will be trapped by the try block, logged to standard error
1177 (usually the console), and the task will be marked as failed.
1181 self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
1182 self._runner = ParallelRun(maxthreads = self.nthreads)
1183 self._runner.start()
1185 while not self._stop:
1187 self._cond.acquire()
1189 task = self._scheduler.next()
1192 # No task to execute. Wait for a new task to be scheduled.
1195 # The task timestamp is in the future. Wait for timeout
1196 # or until another task is scheduled.
1198 if now < task.timestamp:
1199 # Calculate timeout in seconds
1200 timeout = tdiffsec(task.timestamp, now)
1202 # Re-schedule task with the same timestamp
1203 self._scheduler.schedule(task)
1207 # Wait timeout or until a new task awakes the condition
1208 self._cond.wait(timeout)
1210 self._cond.release()
1213 # Process tasks in parallel
1214 self._runner.put(self._execute, task)
1217 err = traceback.format_exc()
1218 self.logger.error("Error while processing tasks in the EC: %s" % err)
1220 # Set the EC to FAILED state
1221 self._state = ECState.FAILED
1223 # Set the FailureManager failure level to EC failure
1224 self._fm.set_ec_failure()
1226 self.logger.debug("Exiting the task processing loop ... ")
1229 self._runner.destroy()
1231 def _execute(self, task):
1232 """ Executes a single task.
1234 :param task: Object containing the callback to execute
1240 task.result = task.callback()
1241 task.status = TaskStatus.DONE
1244 err = traceback.format_exc()
1246 task.status = TaskStatus.ERROR
1248 self.logger.error("Error occurred while executing task: %s" % err)
1251 """ Awakes the processing thread if it is blocked waiting
1252 for new tasks to arrive
1255 self._cond.acquire()
1257 self._cond.release()
1259 def _build_from_netgraph(self, add_node_callback, add_edge_callback,
1261 """ Automates experiment description using a NetGraph instance.
1263 self._netgraph = NetGraph(**kwargs)
1265 if add_node_callback:
1266 ### Add resources to the EC
1267 for nid in self.netgraph.nodes():
1268 add_node_callback(self, nid)
1270 if add_edge_callback:
1271 #### Add connections between resources
1272 for nid1, nid2 in self.netgraph.edges():
1273 add_edge_callback(self, nid1, nid2)