2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24 ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27 from nepi.util.serializer import ECSerializer, SFormats
28 from nepi.util.plotter import ECPlotter, PFormats
29 from nepi.util.netgraph import NetGraph, TopologyType
31 # TODO: use multiprocessing instead of threading
32 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
43 class FailureLevel(object):
44 """ Describes the system failure state """
49 class FailureManager(object):
50 """ The FailureManager is responsible for handling errors
51 and deciding whether an experiment should be aborted or not
55 def __init__(self, ec):
56 self._ec = weakref.ref(ec)
57 self._failure_level = FailureLevel.OK
62 """ Returns the ExperimentController associated to this FailureManager
72 def eval_failure(self, guid):
73 if self._failure_level == FailureLevel.OK:
74 rm = self.ec.get_resource(guid)
76 critical = rm.get("critical")
78 if state == ResourceState.FAILED and critical:
79 self._failure_level = FailureLevel.RM_FAILURE
81 self.ec.logger.debug("RM critical failure occurred on guid %d." \
82 " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
84 def set_ec_failure(self):
85 self._failure_level = FailureLevel.EC_FAILURE
87 class ECState(object):
88 """ Possible states for an ExperimentController
95 class ExperimentController(object):
97 .. class:: Class Args :
99 :param exp_id: Human readable identifier for the experiment scenario.
104 An experiment, or scenario, is defined by a concrete set of resources,
105 and the behavior, configuration and interconnection of those resources.
106 The Experiment Description (ED) is a detailed representation of a
107 single experiment. It contains all the necessary information to
108 allow repeating the experiment. NEPI allows to describe
109 experiments by registering components (resources), configuring them
110 and interconnecting them.
112 A same experiment (scenario) can be executed many times, generating
113 different results. We call an experiment execution (instance) a 'run'.
115 The ExperimentController (EC), is the entity responsible of
116 managing an experiment run. The same scenario can be
117 recreated (and re-run) by instantiating an EC and recreating
118 the same experiment description.
120 An experiment is represented as a graph of interconnected
121 resources. A resource is a generic concept in the sense that any
122 component taking part of an experiment, whether physical of
123 virtual, is considered a resource. A resources could be a host,
124 a virtual machine, an application, a simulator, a IP address.
126 A ResourceManager (RM), is the entity responsible for managing a
127 single resource. ResourceManagers are specific to a resource
128 type (i.e. An RM to control a Linux application will not be
129 the same as the RM used to control a ns-3 simulation).
130 To support a new type of resource, a new RM must be implemented.
131 NEPI already provides a variety of RMs to control basic resources,
132 and new can be extended from the existing ones.
134 Through the EC interface the user can create ResourceManagers (RMs),
135 configure them and interconnect them, to describe an experiment.
136 Describing an experiment through the EC does not run the experiment.
137 Only when the 'deploy()' method is invoked on the EC, the EC will take
138 actions to transform the 'described' experiment into a 'running' experiment.
140 While the experiment is running, it is possible to continue to
141 create/configure/connect RMs, and to deploy them to involve new
142 resources in the experiment (this is known as 'interactive' deployment).
144 An experiments in NEPI is identified by a string id,
145 which is either given by the user, or automatically generated by NEPI.
146 The purpose of this identifier is to separate files and results that
147 belong to different experiment scenarios.
148 However, since a same 'experiment' can be run many times, the experiment
149 id is not enough to identify an experiment instance (run).
150 For this reason, the ExperimentController has two identifier, the
151 exp_id, which can be re-used in different ExperimentController,
152 and the run_id, which is unique to one ExperimentController instance, and
153 is automatically generated by NEPI.
158 def load(cls, filepath, format = SFormats.XML):
159 serializer = ECSerializer()
160 ec = serializer.load(filepath)
163 def __init__(self, exp_id = None, local_dir = None, persist = False,
164 add_node_callback = None, add_edge_callback = None, **kwargs):
165 """ ExperimentController entity to model an execute a network
168 :param exp_id: Human readable name to identify the experiment
171 :param local_dir: Path to local directory where to store experiment
175 :param persist: Save an XML description of the experiment after
176 completion at local_dir
179 :param add_node_callback: Callback to invoke for node instantiation
180 when automatic topology creation mode is used
181 :type add_node_callback: function
183 :param add_edge_callback: Callback to invoke for edge instantiation
184 when automatic topology creation mode is used
185 :type add_edge_callback: function
188 super(ExperimentController, self).__init__()
191 self._logger = logging.getLogger("ExperimentController")
193 # Run identifier. It identifies a concrete execution instance (run)
195 # Since a same experiment (same configuration) can be executed many
196 # times, this run_id permits to separate result files generated on
197 # different experiment executions
198 self._run_id = tsformat()
200 # Experiment identifier. Usually assigned by the user
201 # Identifies the experiment scenario (i.e. configuration,
202 # resources used, etc)
203 self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
205 # Local path where to store experiment related files (results, etc)
207 local_dir = tempfile.gettempdir() # /tmp
209 self._local_dir = local_dir
210 self._exp_dir = os.path.join(local_dir, self.exp_id)
211 self._run_dir = os.path.join(self.exp_dir, self.run_id)
213 # If True persist the experiment controller in XML format, after completion
214 self._persist = persist
216 # generator of globally unique ids
217 self._guid_generator = guid.GuidGenerator()
220 self._resources = dict()
222 # Scheduler. It a queue that holds tasks scheduled for
223 # execution, and yields the next task to be executed
224 # ordered by execution and arrival time
225 self._scheduler = HeapScheduler()
230 # RM groups (for deployment)
231 self._groups = dict()
233 # generator of globally unique id for groups
234 self._group_id_generator = guid.GuidGenerator()
236 # Flag to stop processing thread
239 # Entity in charge of managing system failures
240 self._fm = FailureManager(self)
243 self._state = ECState.RUNNING
245 # Automatically construct experiment description
246 self._netgraph = None
247 if add_node_callback or add_edge_callback or kwargs.get("topology"):
248 self._build_from_netgraph(add_node_callback, add_edge_callback,
251 # The runner is a pool of threads used to parallelize
256 # Event processing thread
257 self._cond = threading.Condition()
258 self._thread = threading.Thread(target = self._process)
259 self._thread.setDaemon(True)
264 """ Returns the logger instance of the Experiment Controller
270 def failure_level(self):
271 """ Returns the level of FAILURE of th experiment
275 return self._fm._failure_level
279 """ Returns the state of the Experiment Controller
286 """ Returns the experiment id assigned by the user
293 """ Returns the experiment instance (run) identifier (automatically
301 """ Returns the number of processing nthreads used
304 return self._nthreads
308 """ Root local directory for experiment files
311 return self._local_dir
315 """ Local directory to store results and other files related to the
323 """ Local directory to store results and other files related to the
331 """ If True, persists the ExperimentController to XML format upon
332 experiment completion
339 """ Return NetGraph instance if experiment description was automatically
343 return self._netgraph
347 """ Returns True if the experiment has failed and should be interrupted,
351 return self._fm.abort
353 def inform_failure(self, guid):
354 """ Reports a failure in a RM to the EC for evaluation
356 :param guid: Resource id
361 return self._fm.eval_failure(guid)
363 def wait_finished(self, guids):
364 """ Blocking method that waits until all RMs in the 'guids' list
365 have reached a state >= STOPPED (i.e. STOPPED, FAILED or
366 RELEASED ), or until a failure in the experiment occurs
369 :param guids: List of guids
377 return self.wait(guids, state = ResourceState.STOPPED,
380 def wait_started(self, guids):
381 """ Blocking method that waits until all RMs in the 'guids' list
382 have reached a state >= STARTED, or until a failure in the
383 experiment occurs (i.e. abort == True)
385 :param guids: List of guids
393 return self.wait(guids, state = ResourceState.STARTED,
396 def wait_released(self, guids):
397 """ Blocking method that waits until all RMs in the 'guids' list
398 have reached a state == RELEASED, or until the EC fails
400 :param guids: List of guids
406 return self._state == ECState.FAILED
408 return self.wait(guids, state = ResourceState.RELEASED,
411 def wait_deployed(self, guids):
412 """ Blocking method that waits until all RMs in the 'guids' list
413 have reached a state >= READY, or until a failure in the
414 experiment occurs (i.e. abort == True)
416 :param guids: List of guids
424 return self.wait(guids, state = ResourceState.READY,
427 def wait(self, guids, state, quit):
428 """ Blocking method that waits until all RMs in the 'guids' list
429 have reached a state >= 'state', or until the 'quit' callback
432 :param guids: List of guids
436 if isinstance(guids, int):
439 # Make a copy to avoid modifying the original guids list
443 # If there are no more guids to wait for
444 # or the quit function returns True, exit the loop
445 if len(guids) == 0 or quit():
448 # If a guid reached one of the target states, remove it from list
450 rm = self.get_resource(guid)
454 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
455 rm.get_rtype(), guid, rstate, state))
458 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
459 guid, rstate, state))
465 def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
466 plotter = ECPlotter()
467 fpath = plotter.plot(self, dirpath = dirpath, format= format,
471 def serialize(self, format = SFormats.XML):
472 serializer = ECSerializer()
473 sec = serializer.load(self, format = format)
476 def save(self, dirpath = None, format = SFormats.XML):
478 dirpath = self.run_dir
485 serializer = ECSerializer()
486 path = serializer.save(self, dirpath, format = format)
489 def get_task(self, tid):
490 """ Returns a task by its id
492 :param tid: Id of the task
498 return self._tasks.get(tid)
500 def get_resource(self, guid):
501 """ Returns a registered ResourceManager by its guid
503 :param guid: Id of the resource
506 :rtype: ResourceManager
509 rm = self._resources.get(guid)
512 def get_resources_by_type(self, rtype):
513 """ Returns the ResourceManager objects of type rtype
515 :param rtype: Resource type
518 :rtype: list of ResourceManagers
522 for guid, rm in self._resources.iteritems():
523 if rm.get_rtype() == rtype:
527 def remove_resource(self, guid):
528 del self._resources[guid]
532 """ Returns the guids of all ResourceManagers
534 :return: Set of all RM guids
538 keys = self._resources.keys()
542 def filter_resources(self, rtype):
543 """ Returns the guids of all ResourceManagers of type rtype
545 :param rtype: Resource type
548 :rtype: list of guids
552 for guid, rm in self._resources.iteritems():
553 if rm.get_rtype() == rtype:
557 def register_resource(self, rtype, guid = None):
558 """ Registers a new ResourceManager of type 'rtype' in the experiment
560 This method will assign a new 'guid' for the RM, if no guid
563 :param rtype: Type of the RM
566 :return: Guid of the RM
570 # Get next available guid
571 guid = self._guid_generator.next(guid)
574 rm = ResourceFactory.create(rtype, self, guid)
577 self._resources[guid] = rm
581 def get_attributes(self, guid):
582 """ Returns all the attributes of the RM with guid 'guid'
584 :param guid: Guid of the RM
587 :return: List of attributes
591 rm = self.get_resource(guid)
592 return rm.get_attributes()
594 def get_attribute(self, guid, name):
595 """ Returns the attribute 'name' of the RM with guid 'guid'
597 :param guid: Guid of the RM
600 :param name: Name of the attribute
603 :return: The attribute with name 'name'
607 rm = self.get_resource(guid)
608 return rm.get_attribute(name)
610 def register_connection(self, guid1, guid2):
611 """ Registers a connection between a RM with guid 'guid1'
612 and another RM with guid 'guid2'.
614 The order of the in which the two guids are provided is not
615 important, since the connection relationship is symmetric.
617 :param guid1: First guid to connect
618 :type guid1: ResourceManager
620 :param guid2: Second guid to connect
621 :type guid: ResourceManager
624 rm1 = self.get_resource(guid1)
625 rm2 = self.get_resource(guid2)
627 rm1.register_connection(guid2)
628 rm2.register_connection(guid1)
630 def register_condition(self, guids1, action, guids2, state,
632 """ Registers an action START, STOP or DEPLOY for all RM on list
633 guids1 to occur at time 'time' after all elements in list guids2
634 have reached state 'state'.
636 :param guids1: List of guids of RMs subjected to action
639 :param action: Action to perform (either START, STOP or DEPLOY)
640 :type action: ResourceAction
642 :param guids2: List of guids of RMs to we waited for
645 :param state: State to wait for on RMs of list guids2 (STARTED,
647 :type state: ResourceState
649 :param time: Time to wait after guids2 has reached status
653 if isinstance(guids1, int):
655 if isinstance(guids2, int):
659 rm = self.get_resource(guid1)
660 rm.register_condition(action, guids2, state, time)
662 def enable_trace(self, guid, name):
663 """ Enables a trace to be collected during the experiment run
665 :param name: Name of the trace
669 rm = self.get_resource(guid)
670 rm.enable_trace(name)
672 def trace_enabled(self, guid, name):
673 """ Returns True if the trace of name 'name' is enabled
675 :param name: Name of the trace
679 rm = self.get_resource(guid)
680 return rm.trace_enabled(name)
682 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
683 """ Returns information on a collected trace, the trace stream or
684 blocks (chunks) of the trace stream
686 :param name: Name of the trace
689 :param attr: Can be one of:
690 - TraceAttr.ALL (complete trace content),
691 - TraceAttr.STREAM (block in bytes to read starting
693 - TraceAttr.PATH (full path to the trace file),
694 - TraceAttr.SIZE (size of trace file).
697 :param block: Number of bytes to retrieve from trace, when attr is
701 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
707 rm = self.get_resource(guid)
708 return rm.trace(name, attr, block, offset)
710 def get_traces(self, guid):
711 """ Returns the list of the trace names of the RM with guid 'guid'
713 :param guid: Guid of the RM
716 :return: List of trace names
720 rm = self.get_resource(guid)
721 return rm.get_traces()
724 def discover(self, guid):
725 """ Discovers an available resource matching the criteria defined
726 by the RM with guid 'guid', and associates that resource to the RM
728 Not all RM types require (or are capable of) performing resource
729 discovery. For the RM types which are not capable of doing so,
730 invoking this method does not have any consequences.
732 :param guid: Guid of the RM
736 rm = self.get_resource(guid)
739 def provision(self, guid):
740 """ Provisions the resource associated to the RM with guid 'guid'.
742 Provisioning means making a resource 'accessible' to the user.
743 Not all RM types require (or are capable of) performing resource
744 provisioning. For the RM types which are not capable of doing so,
745 invoking this method does not have any consequences.
747 :param guid: Guid of the RM
751 rm = self.get_resource(guid)
752 return rm.provision()
754 def get(self, guid, name):
755 """ Returns the value of the attribute with name 'name' on the
758 :param guid: Guid of the RM
761 :param name: Name of the attribute
764 :return: The value of the attribute with name 'name'
767 rm = self.get_resource(guid)
770 def set(self, guid, name, value):
771 """ Modifies the value of the attribute with name 'name' on the
774 :param guid: Guid of the RM
777 :param name: Name of the attribute
780 :param value: Value of the attribute
783 rm = self.get_resource(guid)
786 def get_global(self, rtype, name):
787 """ Returns the value of the global attribute with name 'name' on the
788 RMs of rtype 'rtype'.
790 :param guid: Guid of the RM
793 :param name: Name of the attribute
796 :return: The value of the attribute with name 'name'
799 rclass = ResourceFactory.get_resource_type(rtype)
800 return rclass.get_global(name)
802 def set_global(self, rtype, name, value):
803 """ Modifies the value of the global attribute with name 'name' on the
804 RMs of with rtype 'rtype'.
806 :param guid: Guid of the RM
809 :param name: Name of the attribute
812 :param value: Value of the attribute
815 rclass = ResourceFactory.get_resource_type(rtype)
816 return rclass.set_global(name, value)
818 def state(self, guid, hr = False):
819 """ Returns the state of a resource
821 :param guid: Resource guid
824 :param hr: Human readable. Forces return of a
825 status string instead of a number
829 rm = self.get_resource(guid)
833 return ResourceState2str.get(state)
837 def stop(self, guid):
838 """ Stops the RM with guid 'guid'
840 Stopping a RM means that the resource it controls will
841 no longer take part of the experiment.
843 :param guid: Guid of the RM
847 rm = self.get_resource(guid)
850 def start(self, guid):
851 """ Starts the RM with guid 'guid'
853 Starting a RM means that the resource it controls will
854 begin taking part of the experiment.
856 :param guid: Guid of the RM
860 rm = self.get_resource(guid)
863 def get_start_time(self, guid):
864 """ Returns the start time of the RM as a timestamp """
865 rm = self.get_resource(guid)
868 def get_stop_time(self, guid):
869 """ Returns the stop time of the RM as a timestamp """
870 rm = self.get_resource(guid)
873 def get_discover_time(self, guid):
874 """ Returns the discover time of the RM as a timestamp """
875 rm = self.get_resource(guid)
876 return rm.discover_time
878 def get_provision_time(self, guid):
879 """ Returns the provision time of the RM as a timestamp """
880 rm = self.get_resource(guid)
881 return rm.provision_time
883 def get_ready_time(self, guid):
884 """ Returns the deployment time of the RM as a timestamp """
885 rm = self.get_resource(guid)
888 def get_release_time(self, guid):
889 """ Returns the release time of the RM as a timestamp """
890 rm = self.get_resource(guid)
891 return rm.release_time
893 def get_failed_time(self, guid):
894 """ Returns the time failure occured for the RM as a timestamp """
895 rm = self.get_resource(guid)
896 return rm.failed_time
898 def set_with_conditions(self, name, value, guids1, guids2, state,
900 """ Modifies the value of attribute with name 'name' on all RMs
901 on the guids1 list when time 'time' has elapsed since all
902 elements in guids2 list have reached state 'state'.
904 :param name: Name of attribute to set in RM
907 :param value: Value of attribute to set in RM
910 :param guids1: List of guids of RMs subjected to action
913 :param action: Action to register (either START or STOP)
914 :type action: ResourceAction
916 :param guids2: List of guids of RMs to we waited for
919 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
920 :type state: ResourceState
922 :param time: Time to wait after guids2 has reached status
926 if isinstance(guids1, int):
928 if isinstance(guids2, int):
932 rm = self.get_resource(guid)
933 rm.set_with_conditions(name, value, guids2, state, time)
935 def deploy(self, guids = None, wait_all_ready = True, group = None):
936 """ Deploys all ResourceManagers in the guids list.
938 If the argument 'guids' is not given, all RMs with state NEW
941 :param guids: List of guids of RMs to deploy
944 :param wait_all_ready: Wait until all RMs are ready in
945 order to start the RMs
948 :param group: Id of deployment group in which to deploy RMs
952 self.logger.debug(" ------- DEPLOY START ------ ")
955 # If no guids list was passed, all 'NEW' RMs will be deployed
957 for guid, rm in self._resources.iteritems():
958 if rm.state == ResourceState.NEW:
961 if isinstance(guids, int):
964 # Create deployment group
965 # New guids can be added to a same deployment group later on
969 group = self._group_id_generator.next()
971 if group not in self._groups:
972 self._groups[group] = []
974 self._groups[group].extend(guids)
976 def wait_all_and_start(group):
977 # Function that checks if all resources are READY
978 # before scheduling a start_with_conditions for each RM
981 # Get all guids in group
982 guids = self._groups[group]
985 if self.state(guid) < ResourceState.READY:
990 callback = functools.partial(wait_all_and_start, group)
991 self.schedule("1s", callback)
993 # If all resources are ready, we schedule the start
995 rm = self.get_resource(guid)
996 self.schedule("0s", rm.start_with_conditions)
998 if rm.conditions.get(ResourceAction.STOP):
999 # Only if the RM has STOP conditions we
1000 # schedule a stop. Otherwise the RM will stop immediately
1001 self.schedule("0s", rm.stop_with_conditions)
1003 if wait_all_ready and new_group:
1004 # Schedule a function to check that all resources are
1005 # READY, and only then schedule the start.
1006 # This aims at reducing the number of tasks looping in the
1008 # Instead of having many start tasks, we will have only one for
1010 callback = functools.partial(wait_all_and_start, group)
1011 self.schedule("0s", callback)
1014 rm = self.get_resource(guid)
1015 rm.deployment_group = group
1016 self.schedule("0s", rm.deploy_with_conditions)
1018 if not wait_all_ready:
1019 self.schedule("0s", rm.start_with_conditions)
1021 if rm.conditions.get(ResourceAction.STOP):
1022 # Only if the RM has STOP conditions we
1023 # schedule a stop. Otherwise the RM will stop immediately
1024 self.schedule("0s", rm.stop_with_conditions)
1026 def release(self, guids = None):
1027 """ Releases all ResourceManagers in the guids list.
1029 If the argument 'guids' is not given, all RMs registered
1030 in the experiment are released.
1032 :param guids: List of RM guids
1036 if isinstance(guids, int):
1040 guids = self.resources
1043 rm = self.get_resource(guid)
1044 self.schedule("0s", rm.release)
1046 self.wait_released(guids)
1052 if self.get(guid, "hardRelease"):
1053 self.remove_resource(guid)
1056 """ Releases all resources and stops the ExperimentController
1059 # If there was a major failure we can't exit gracefully
1060 if self._state == ECState.FAILED:
1061 raise RuntimeError("EC failure. Can not exit gracefully")
1063 # Remove all pending tasks from the scheduler queue
1064 for tid in list(self._scheduler.pending):
1065 self._scheduler.remove(tid)
1067 # Remove pending tasks from the workers queue
1068 self._runner.empty()
1072 # Mark the EC state as TERMINATED
1073 self._state = ECState.TERMINATED
1075 # Stop processing thread
1078 # Notify condition to wake up the processing thread
1081 if self._thread.is_alive():
1084 def schedule(self, date, callback, track = False):
1085 """ Schedules a callback to be executed at time 'date'.
1087 :param date: string containing execution time for the task.
1088 It can be expressed as an absolute time, using
1089 timestamp format, or as a relative time matching
1090 ^\d+.\d+(h|m|s|ms|us)$
1092 :param callback: code to be executed for the task. Must be a
1093 Python function, and receives args and kwargs
1096 :param track: if set to True, the task will be retrievable with
1097 the get_task() method
1099 :return : The Id of the task
1103 timestamp = stabsformat(date)
1104 task = Task(timestamp, callback)
1105 task = self._scheduler.schedule(task)
1108 self._tasks[task.id] = task
1110 # Notify condition to wake up the processing thread
1116 """ Process scheduled tasks.
1120 Tasks are scheduled by invoking the schedule method with a target
1121 callback and an execution time.
1122 The schedule method creates a new Task object with that callback
1123 and execution time, and pushes it into the '_scheduler' queue.
1124 The execution time and the order of arrival of tasks are used
1125 to order the tasks in the queue.
1127 The _process method is executed in an independent thread held by
1128 the ExperimentController for as long as the experiment is running.
1129 This method takes tasks from the '_scheduler' queue in a loop
1130 and processes them in parallel using multithreading.
1131 The environmental variable NEPI_NTHREADS can be used to control
1132 the number of threads used to process tasks. The default value is
1135 To execute tasks in parallel, a ParallelRunner (PR) object is used.
1136 This object keeps a pool of threads (workers), and a queue of tasks
1137 scheduled for 'immediate' execution.
1139 On each iteration, the '_process' loop will take the next task that
1140 is scheduled for 'future' execution from the '_scheduler' queue,
1141 and if the execution time of that task is >= to the current time,
1142 it will push that task into the PR for 'immediate execution'.
1143 As soon as a worker is free, the PR will assign the next task to
1146 Upon receiving a task to execute, each PR worker (thread) will
1147 invoke the _execute method of the EC, passing the task as
1149 The _execute method will then invoke task.callback inside a
1150 try/except block. If an exception is raised by the tasks.callback,
1151 it will be trapped by the try block, logged to standard error
1152 (usually the console), and the task will be marked as failed.
1156 self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
1157 self._runner = ParallelRun(maxthreads = self.nthreads)
1158 self._runner.start()
1160 while not self._stop:
1162 self._cond.acquire()
1164 task = self._scheduler.next()
1167 # No task to execute. Wait for a new task to be scheduled.
1170 # The task timestamp is in the future. Wait for timeout
1171 # or until another task is scheduled.
1173 if now < task.timestamp:
1174 # Calculate timeout in seconds
1175 timeout = tdiffsec(task.timestamp, now)
1177 # Re-schedule task with the same timestamp
1178 self._scheduler.schedule(task)
1182 # Wait timeout or until a new task awakes the condition
1183 self._cond.wait(timeout)
1185 self._cond.release()
1188 # Process tasks in parallel
1189 self._runner.put(self._execute, task)
1192 err = traceback.format_exc()
1193 self.logger.error("Error while processing tasks in the EC: %s" % err)
1195 # Set the EC to FAILED state
1196 self._state = ECState.FAILED
1198 # Set the FailureManager failure level to EC failure
1199 self._fm.set_ec_failure()
1201 self.logger.debug("Exiting the task processing loop ... ")
1204 self._runner.destroy()
1206 def _execute(self, task):
1207 """ Executes a single task.
1209 :param task: Object containing the callback to execute
1215 task.result = task.callback()
1216 task.status = TaskStatus.DONE
1219 err = traceback.format_exc()
1221 task.status = TaskStatus.ERROR
1223 self.logger.error("Error occurred while executing task: %s" % err)
1226 """ Awakes the processing thread if it is blocked waiting
1227 for new tasks to arrive
1230 self._cond.acquire()
1232 self._cond.release()
1234 def _build_from_netgraph(self, add_node_callback, add_edge_callback,
1236 """ Automates experiment description using a NetGraph instance.
1238 self._netgraph = NetGraph(**kwargs)
1240 if add_node_callback:
1241 ### Add resources to the EC
1242 for nid in self.netgraph.nodes():
1243 add_node_callback(self, nid)
1245 if add_edge_callback:
1246 #### Add connections between resources
1247 for nid1, nid2 in self.netgraph.edges():
1248 add_edge_callback(self, nid1, nid2)