2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24 ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27 from nepi.util.serializer import ECSerializer, SFormats
28 from nepi.util.plotter import ECPlotter, PFormats
29 from nepi.util.netgraph import NetGraph, TopologyType
31 # TODO: use multiprocessing instead of threading
32 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
43 class FailureLevel(object):
44 """ Possible failure states for the experiment """
49 class FailureManager(object):
50 """ The FailureManager is responsible for handling errors
51 and deciding whether an experiment should be aborted or not
56 self._failure_level = FailureLevel.OK
60 self._ec = weakref.ref(ec)
64 """ Returns the ExperimentController associated to this FailureManager
72 def eval_failure(self, guid):
73 """ Implements failure policy and sets the abort state of the
74 experiment based on the failure state and criticality of
77 :param guid: Guid of the RM upon which the failure of the experiment
82 if self._failure_level == FailureLevel.OK:
83 rm = self.ec.get_resource(guid)
85 critical = rm.get("critical")
87 if state == ResourceState.FAILED and critical:
88 self._failure_level = FailureLevel.RM_FAILURE
90 self.ec.logger.debug("RM critical failure occurred on guid %d." \
91 " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
93 def set_ec_failure(self):
94 self._failure_level = FailureLevel.EC_FAILURE
96 class ECState(object):
97 """ Possible states of the ExperimentController
105 # historical note: this class used to be in util/guid.py but is used only here
106 # FIXME: This class is not thread-safe. Should it be made thread-safe?
107 class GuidGenerator(object):
111 # historical note: this used to be called `next`
112 # which confused 2to3 - and me - while it has
113 # nothing to do at all with the iteration protocol
114 def generate(self, guid = None):
116 guid = self._last_guid + 1
118 self._last_guid = self._last_guid if guid <= self._last_guid else guid
122 class ExperimentController(object):
126 An experiment, or scenario, is defined by a concrete set of resources,
127 and the behavior, configuration and interconnection of those resources.
128 The Experiment Description (ED) is a detailed representation of a
129 single experiment. It contains all the necessary information to
130 allow repeating the experiment. NEPI allows to describe
131 experiments by registering components (resources), configuring them
132 and interconnecting them.
134 A same experiment (scenario) can be executed many times, generating
135 different results. We call an experiment execution (instance) a 'run'.
137 The ExperimentController (EC), is the entity responsible of
138 managing an experiment run. The same scenario can be
139 recreated (and re-run) by instantiating an EC and recreating
140 the same experiment description.
142 An experiment is represented as a graph of interconnected
143 resources. A resource is a generic concept in the sense that any
144 component taking part of an experiment, whether physical of
145 virtual, is considered a resource. A resources could be a host,
146 a virtual machine, an application, a simulator, a IP address.
148 A ResourceManager (RM), is the entity responsible for managing a
149 single resource. ResourceManagers are specific to a resource
150 type (i.e. An RM to control a Linux application will not be
151 the same as the RM used to control a ns-3 simulation).
152 To support a new type of resource, a new RM must be implemented.
153 NEPI already provides a variety of RMs to control basic resources,
154 and new can be extended from the existing ones.
156 Through the EC interface the user can create ResourceManagers (RMs),
157 configure them and interconnect them, to describe an experiment.
158 Describing an experiment through the EC does not run the experiment.
159 Only when the 'deploy()' method is invoked on the EC, the EC will take
160 actions to transform the 'described' experiment into a 'running' experiment.
162 While the experiment is running, it is possible to continue to
163 create/configure/connect RMs, and to deploy them to involve new
164 resources in the experiment (this is known as 'interactive' deployment).
166 An experiments in NEPI is identified by a string id,
167 which is either given by the user, or automatically generated by NEPI.
168 The purpose of this identifier is to separate files and results that
169 belong to different experiment scenarios.
170 However, since a same 'experiment' can be run many times, the experiment
171 id is not enough to identify an experiment instance (run).
172 For this reason, the ExperimentController has two identifier, the
173 exp_id, which can be re-used in different ExperimentController,
174 and the run_id, which is unique to one ExperimentController instance, and
175 is automatically generated by NEPI.
180 def load(cls, filepath, format = SFormats.XML):
181 serializer = ECSerializer()
182 ec = serializer.load(filepath)
185 def __init__(self, exp_id = None, local_dir = None, persist = False,
186 fm = None, add_node_callback = None, add_edge_callback = None,
188 """ ExperimentController entity to model an execute a network
191 :param exp_id: Human readable name to identify the experiment
194 :param local_dir: Path to local directory where to store experiment
198 :param persist: Save an XML description of the experiment after
199 completion at local_dir
202 :param fm: FailureManager object. If None is given, the default
203 FailureManager class will be used
204 :type fm: FailureManager
206 :param add_node_callback: Callback to invoke for node instantiation
207 when automatic topology creation mode is used
208 :type add_node_callback: function
210 :param add_edge_callback: Callback to invoke for edge instantiation
211 when automatic topology creation mode is used
212 :type add_edge_callback: function
215 super(ExperimentController, self).__init__()
218 self._logger = logging.getLogger("ExperimentController")
220 # Run identifier. It identifies a concrete execution instance (run)
222 # Since a same experiment (same configuration) can be executed many
223 # times, this run_id permits to separate result files generated on
224 # different experiment executions
225 self._run_id = tsformat()
227 # Experiment identifier. Usually assigned by the user
228 # Identifies the experiment scenario (i.e. configuration,
229 # resources used, etc)
230 self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
232 # Local path where to store experiment related files (results, etc)
234 local_dir = tempfile.gettempdir() # /tmp
236 self._local_dir = local_dir
237 self._exp_dir = os.path.join(local_dir, self.exp_id)
238 self._run_dir = os.path.join(self.exp_dir, self.run_id)
240 # If True persist the experiment controller in XML format, after completion
241 self._persist = persist
243 # generator of globally unique ids
244 self._guid_generator = GuidGenerator()
247 self._resources = dict()
249 # Scheduler. It a queue that holds tasks scheduled for
250 # execution, and yields the next task to be executed
251 # ordered by execution and arrival time
252 self._scheduler = HeapScheduler()
257 # RM groups (for deployment)
258 self._groups = dict()
260 # generator of globally unique id for groups
261 self._group_id_generator = GuidGenerator()
263 # Flag to stop processing thread
266 # Entity in charge of managing system failures
268 self._fm = FailureManager()
269 self._fm.set_ec(self)
272 self._state = ECState.RUNNING
274 # Automatically construct experiment description
275 self._netgraph = None
276 if add_node_callback or add_edge_callback or kwargs.get("topology"):
277 self._build_from_netgraph(add_node_callback, add_edge_callback,
280 # The runner is a pool of threads used to parallelize
285 # Event processing thread
286 self._cond = threading.Condition()
287 self._thread = threading.Thread(target = self._process)
288 self._thread.setDaemon(True)
293 """ Returns the logger instance of the Experiment Controller
300 """ Returns the failure manager
307 def failure_level(self):
308 """ Returns the level of FAILURE of th experiment
312 return self._fm._failure_level
316 """ Returns the state of the Experiment Controller
323 """ Returns the experiment id assigned by the user
330 """ Returns the experiment instance (run) identifier (automatically
338 """ Returns the number of processing nthreads used
341 return self._nthreads
345 """ Root local directory for experiment files
348 return self._local_dir
352 """ Local directory to store results and other files related to the
360 """ Local directory to store results and other files related to the
368 """ If True, persists the ExperimentController to XML format upon
369 experiment completion
376 """ Return NetGraph instance if experiment description was automatically
380 return self._netgraph
384 """ Returns True if the experiment has failed and should be interrupted,
388 return self._fm.abort
390 def inform_failure(self, guid):
391 """ Reports a failure in a RM to the EC for evaluation
393 :param guid: Resource id
398 return self._fm.eval_failure(guid)
400 def wait_finished(self, guids):
401 """ Blocking method that waits until all RMs in the 'guids' list
402 have reached a state >= STOPPED (i.e. STOPPED, FAILED or
403 RELEASED ), or until a failure in the experiment occurs
406 :param guids: List of guids
414 return self.wait(guids, state = ResourceState.STOPPED,
417 def wait_started(self, guids):
418 """ Blocking method that waits until all RMs in the 'guids' list
419 have reached a state >= STARTED, or until a failure in the
420 experiment occurs (i.e. abort == True)
422 :param guids: List of guids
430 return self.wait(guids, state = ResourceState.STARTED,
433 def wait_released(self, guids):
434 """ Blocking method that waits until all RMs in the 'guids' list
435 have reached a state == RELEASED, or until the EC fails
437 :param guids: List of guids
443 return self._state == ECState.FAILED
445 return self.wait(guids, state = ResourceState.RELEASED,
448 def wait_deployed(self, guids):
449 """ Blocking method that waits until all RMs in the 'guids' list
450 have reached a state >= READY, or until a failure in the
451 experiment occurs (i.e. abort == True)
453 :param guids: List of guids
461 return self.wait(guids, state = ResourceState.READY,
464 def wait(self, guids, state, quit):
465 """ Blocking method that waits until all RMs in the 'guids' list
466 have reached a state >= 'state', or until the 'quit' callback
469 :param guids: List of guids
473 if isinstance(guids, int):
476 # Make a copy to avoid modifying the original guids list
480 # If there are no more guids to wait for
481 # or the quit function returns True, exit the loop
482 if len(guids) == 0 or quit():
485 # If a guid reached one of the target states, remove it from list
487 rm = self.get_resource(guid)
491 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
492 rm.get_rtype(), guid, rstate, state))
495 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
496 guid, rstate, state))
502 def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
503 plotter = ECPlotter()
504 fpath = plotter.plot(self, dirpath = dirpath, format= format,
508 def serialize(self, format = SFormats.XML):
509 serializer = ECSerializer()
510 sec = serializer.load(self, format = format)
513 def save(self, dirpath = None, format = SFormats.XML):
515 dirpath = self.run_dir
522 serializer = ECSerializer()
523 path = serializer.save(self, dirpath, format = format)
526 def get_task(self, tid):
527 """ Returns a task by its id
529 :param tid: Id of the task
535 return self._tasks.get(tid)
537 def get_resource(self, guid):
538 """ Returns a registered ResourceManager by its guid
540 :param guid: Id of the resource
543 :rtype: ResourceManager
546 rm = self._resources.get(guid)
549 def get_resources_by_type(self, rtype):
550 """ Returns the ResourceManager objects of type rtype
552 :param rtype: Resource type
555 :rtype: list of ResourceManagers
559 for guid, rm in self._resources.items():
560 if rm.get_rtype() == rtype:
564 def remove_resource(self, guid):
565 del self._resources[guid]
569 """ Returns the guids of all ResourceManagers
571 :return: Set of all RM guids
575 keys = list(self._resources.keys())
579 def filter_resources(self, rtype):
580 """ Returns the guids of all ResourceManagers of type rtype
582 :param rtype: Resource type
585 :rtype: list of guids
589 for guid, rm in self._resources.items():
590 if rm.get_rtype() == rtype:
594 def register_resource(self, rtype, guid = None, **keywords):
595 """ Registers a new ResourceManager of type 'rtype' in the experiment
597 This method will assign a new 'guid' for the RM, if no guid
600 :param rtype: Type of the RM
603 :return: Guid of the RM
606 Specifying additional keywords results in the following actions
608 boolean: if set, causes the created object to be `deploy`ed
609 before returning from register_resource
611 resourceObject: if set, causes the `register_connection` method
612 to be called before returning and after auto-deployment if relevant
613 * other keywords are used to call `set` to set attributes
616 app = ec.register_resource("linux::Application",
617 command = "systemctl start httpd",
621 app = ec.register_resource("linux::Application")
622 ec.set(app, "command", "systemctl start httpd")
624 ec.register_connection(app, node)
627 # Get next available guid
629 guid = self._guid_generator.generate(guid)
632 rm = ResourceFactory.create(rtype, self, guid)
635 self._resources[guid] = rm
640 # is there a need to call deploy
641 special = 'autoDeploy'
642 specials.append(special)
643 auto_deploy = special in keywords and keywords[special]
645 # is there a need to call register_connection, and if so to what
646 special = 'connectedTo'
647 specials.append(special)
648 connected_to = special in keywords and keywords[special]
650 ### now we can do all the calls to 'set'
651 for name, value in keywords.items():
652 # specials are handled locally and not propagated to 'set'
653 if name not in specials:
654 self.set(guid, name, value)
656 ### deal with specials
660 self.register_connection(guid, connected_to)
664 def get_attributes(self, guid):
665 """ Returns all the attributes of the RM with guid 'guid'
667 :param guid: Guid of the RM
670 :return: List of attributes
674 rm = self.get_resource(guid)
675 return rm.get_attributes()
677 def get_attribute(self, guid, name):
678 """ Returns the attribute 'name' of the RM with guid 'guid'
680 :param guid: Guid of the RM
683 :param name: Name of the attribute
686 :return: The attribute with name 'name'
690 rm = self.get_resource(guid)
691 return rm.get_attribute(name)
693 def register_connection(self, guid1, guid2):
694 """ Registers a connection between a RM with guid 'guid1'
695 and another RM with guid 'guid2'.
697 The order of the in which the two guids are provided is not
698 important, since the connection relationship is symmetric.
700 :param guid1: First guid to connect
701 :type guid1: ResourceManager
703 :param guid2: Second guid to connect
704 :type guid: ResourceManager
707 rm1 = self.get_resource(guid1)
708 rm2 = self.get_resource(guid2)
710 rm1.register_connection(guid2)
711 rm2.register_connection(guid1)
713 def register_condition(self, guids1, action, guids2, state,
715 """ Registers an action START, STOP or DEPLOY for all RM on list
716 guids1 to occur at time 'time' after all elements in list guids2
717 have reached state 'state'.
719 :param guids1: List of guids of RMs subjected to action
722 :param action: Action to perform (either START, STOP or DEPLOY)
723 :type action: ResourceAction
725 :param guids2: List of guids of RMs to we waited for
728 :param state: State to wait for on RMs of list guids2 (STARTED,
730 :type state: ResourceState
732 :param time: Time to wait after guids2 has reached status
736 if isinstance(guids1, int):
738 if isinstance(guids2, int):
742 rm = self.get_resource(guid1)
743 rm.register_condition(action, guids2, state, time)
745 def enable_trace(self, guid, name):
746 """ Enables a trace to be collected during the experiment run
748 :param name: Name of the trace
752 rm = self.get_resource(guid)
753 rm.enable_trace(name)
755 def trace_enabled(self, guid, name):
756 """ Returns True if the trace of name 'name' is enabled
758 :param name: Name of the trace
762 rm = self.get_resource(guid)
763 return rm.trace_enabled(name)
765 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
766 """ Returns information on a collected trace, the trace stream or
767 blocks (chunks) of the trace stream
769 :param name: Name of the trace
772 :param attr: Can be one of:
773 - TraceAttr.ALL (complete trace content),
774 - TraceAttr.STREAM (block in bytes to read starting
776 - TraceAttr.PATH (full path to the trace file),
777 - TraceAttr.SIZE (size of trace file).
780 :param block: Number of bytes to retrieve from trace, when attr is
784 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
790 rm = self.get_resource(guid)
791 return rm.trace(name, attr, block, offset)
793 def get_traces(self, guid):
794 """ Returns the list of the trace names of the RM with guid 'guid'
796 :param guid: Guid of the RM
799 :return: List of trace names
803 rm = self.get_resource(guid)
804 return rm.get_traces()
807 def discover(self, guid):
808 """ Discovers an available resource matching the criteria defined
809 by the RM with guid 'guid', and associates that resource to the RM
811 Not all RM types require (or are capable of) performing resource
812 discovery. For the RM types which are not capable of doing so,
813 invoking this method does not have any consequences.
815 :param guid: Guid of the RM
819 rm = self.get_resource(guid)
822 def provision(self, guid):
823 """ Provisions the resource associated to the RM with guid 'guid'.
825 Provisioning means making a resource 'accessible' to the user.
826 Not all RM types require (or are capable of) performing resource
827 provisioning. For the RM types which are not capable of doing so,
828 invoking this method does not have any consequences.
830 :param guid: Guid of the RM
834 rm = self.get_resource(guid)
835 return rm.provision()
837 def get(self, guid, name):
838 """ Returns the value of the attribute with name 'name' on the
841 :param guid: Guid of the RM
844 :param name: Name of the attribute
847 :return: The value of the attribute with name 'name'
850 rm = self.get_resource(guid)
853 def set(self, guid, name, value):
854 """ Modifies the value of the attribute with name 'name' on the
857 :param guid: Guid of the RM
860 :param name: Name of the attribute
863 :param value: Value of the attribute
866 rm = self.get_resource(guid)
869 def get_global(self, rtype, name):
870 """ Returns the value of the global attribute with name 'name' on the
871 RMs of rtype 'rtype'.
873 :param guid: Guid of the RM
876 :param name: Name of the attribute
879 :return: The value of the attribute with name 'name'
882 rclass = ResourceFactory.get_resource_type(rtype)
883 return rclass.get_global(name)
885 def set_global(self, rtype, name, value):
886 """ Modifies the value of the global attribute with name 'name' on the
887 RMs of with rtype 'rtype'.
889 :param guid: Guid of the RM
892 :param name: Name of the attribute
895 :param value: Value of the attribute
898 rclass = ResourceFactory.get_resource_type(rtype)
899 return rclass.set_global(name, value)
901 def state(self, guid, hr = False):
902 """ Returns the state of a resource
904 :param guid: Resource guid
907 :param hr: Human readable. Forces return of a
908 status string instead of a number
912 rm = self.get_resource(guid)
916 return ResourceState2str.get(state)
920 def stop(self, guid):
921 """ Stops the RM with guid 'guid'
923 Stopping a RM means that the resource it controls will
924 no longer take part of the experiment.
926 :param guid: Guid of the RM
930 rm = self.get_resource(guid)
933 def start(self, guid):
934 """ Starts the RM with guid 'guid'
936 Starting a RM means that the resource it controls will
937 begin taking part of the experiment.
939 :param guid: Guid of the RM
943 rm = self.get_resource(guid)
946 def get_start_time(self, guid):
947 """ Returns the start time of the RM as a timestamp """
948 rm = self.get_resource(guid)
951 def get_stop_time(self, guid):
952 """ Returns the stop time of the RM as a timestamp """
953 rm = self.get_resource(guid)
956 def get_discover_time(self, guid):
957 """ Returns the discover time of the RM as a timestamp """
958 rm = self.get_resource(guid)
959 return rm.discover_time
961 def get_provision_time(self, guid):
962 """ Returns the provision time of the RM as a timestamp """
963 rm = self.get_resource(guid)
964 return rm.provision_time
966 def get_ready_time(self, guid):
967 """ Returns the deployment time of the RM as a timestamp """
968 rm = self.get_resource(guid)
971 def get_release_time(self, guid):
972 """ Returns the release time of the RM as a timestamp """
973 rm = self.get_resource(guid)
974 return rm.release_time
976 def get_failed_time(self, guid):
977 """ Returns the time failure occured for the RM as a timestamp """
978 rm = self.get_resource(guid)
979 return rm.failed_time
981 def set_with_conditions(self, name, value, guids1, guids2, state,
983 """ Modifies the value of attribute with name 'name' on all RMs
984 on the guids1 list when time 'time' has elapsed since all
985 elements in guids2 list have reached state 'state'.
987 :param name: Name of attribute to set in RM
990 :param value: Value of attribute to set in RM
993 :param guids1: List of guids of RMs subjected to action
996 :param action: Action to register (either START or STOP)
997 :type action: ResourceAction
999 :param guids2: List of guids of RMs to we waited for
1002 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
1003 :type state: ResourceState
1005 :param time: Time to wait after guids2 has reached status
1009 if isinstance(guids1, int):
1011 if isinstance(guids2, int):
1014 for guid1 in guids1:
1015 rm = self.get_resource(guid)
1016 rm.set_with_conditions(name, value, guids2, state, time)
1018 def deploy(self, guids = None, wait_all_ready = True, group = None):
1019 """ Deploys all ResourceManagers in the guids list.
1021 If the argument 'guids' is not given, all RMs with state NEW
1024 :param guids: List of guids of RMs to deploy
1027 :param wait_all_ready: Wait until all RMs are ready in
1028 order to start the RMs
1031 :param group: Id of deployment group in which to deploy RMs
1035 self.logger.debug(" ------- DEPLOY START ------ ")
1038 # If no guids list was passed, all 'NEW' RMs will be deployed
1040 for guid, rm in self._resources.items():
1041 if rm.state == ResourceState.NEW:
1044 if isinstance(guids, int):
1047 # Create deployment group
1048 # New guids can be added to a same deployment group later on
1053 group = self._group_id_generator.generate()
1055 if group not in self._groups:
1056 self._groups[group] = []
1058 self._groups[group].extend(guids)
1060 def wait_all_and_start(group):
1061 # Function that checks if all resources are READY
1062 # before scheduling a start_with_conditions for each RM
1065 # Get all guids in group
1066 guids = self._groups[group]
1069 if self.state(guid) < ResourceState.READY:
1074 callback = functools.partial(wait_all_and_start, group)
1075 self.schedule("1s", callback)
1077 # If all resources are ready, we schedule the start
1079 rm = self.get_resource(guid)
1080 self.schedule("0s", rm.start_with_conditions)
1082 if rm.conditions.get(ResourceAction.STOP):
1083 # Only if the RM has STOP conditions we
1084 # schedule a stop. Otherwise the RM will stop immediately
1085 self.schedule("0s", rm.stop_with_conditions)
1087 if wait_all_ready and new_group:
1088 # Schedule a function to check that all resources are
1089 # READY, and only then schedule the start.
1090 # This aims at reducing the number of tasks looping in the
1092 # Instead of having many start tasks, we will have only one for
1094 callback = functools.partial(wait_all_and_start, group)
1095 self.schedule("0s", callback)
1098 rm = self.get_resource(guid)
1099 rm.deployment_group = group
1100 self.schedule("0s", rm.deploy_with_conditions)
1102 if not wait_all_ready:
1103 self.schedule("0s", rm.start_with_conditions)
1105 if rm.conditions.get(ResourceAction.STOP):
1106 # Only if the RM has STOP conditions we
1107 # schedule a stop. Otherwise the RM will stop immediately
1108 self.schedule("0s", rm.stop_with_conditions)
1110 def release(self, guids = None):
1111 """ Releases all ResourceManagers in the guids list.
1113 If the argument 'guids' is not given, all RMs registered
1114 in the experiment are released.
1116 :param guids: List of RM guids
1120 if self._state == ECState.RELEASED:
1123 if isinstance(guids, int):
1127 guids = self.resources
1130 rm = self.get_resource(guid)
1131 self.schedule("0s", rm.release)
1133 self.wait_released(guids)
1139 if self.get(guid, "hardRelease"):
1140 self.remove_resource(guid)\
1142 # Mark the EC state as RELEASED
1143 self._state = ECState.RELEASED
1146 """ Releases all resources and stops the ExperimentController
1149 # If there was a major failure we can't exit gracefully
1150 if self._state == ECState.FAILED:
1151 raise RuntimeError("EC failure. Can not exit gracefully")
1153 # Remove all pending tasks from the scheduler queue
1154 for tid in list(self._scheduler.pending):
1155 self._scheduler.remove(tid)
1157 # Remove pending tasks from the workers queue
1158 self._runner.empty()
1162 # Mark the EC state as TERMINATED
1163 self._state = ECState.TERMINATED
1165 # Stop processing thread
1168 # Notify condition to wake up the processing thread
1171 if self._thread.is_alive():
1174 def schedule(self, date, callback, track = False):
1175 """ Schedules a callback to be executed at time 'date'.
1177 :param date: string containing execution time for the task.
1178 It can be expressed as an absolute time, using
1179 timestamp format, or as a relative time matching
1180 ^\d+.\d+(h|m|s|ms|us)$
1182 :param callback: code to be executed for the task. Must be a
1183 Python function, and receives args and kwargs
1186 :param track: if set to True, the task will be retrievable with
1187 the get_task() method
1189 :return : The Id of the task
1193 timestamp = stabsformat(date)
1194 task = Task(timestamp, callback)
1195 task = self._scheduler.schedule(task)
1198 self._tasks[task.id] = task
1200 # Notify condition to wake up the processing thread
1206 """ Process scheduled tasks.
1210 Tasks are scheduled by invoking the schedule method with a target
1211 callback and an execution time.
1212 The schedule method creates a new Task object with that callback
1213 and execution time, and pushes it into the '_scheduler' queue.
1214 The execution time and the order of arrival of tasks are used
1215 to order the tasks in the queue.
1217 The _process method is executed in an independent thread held by
1218 the ExperimentController for as long as the experiment is running.
1219 This method takes tasks from the '_scheduler' queue in a loop
1220 and processes them in parallel using multithreading.
1221 The environmental variable NEPI_NTHREADS can be used to control
1222 the number of threads used to process tasks. The default value is
1225 To execute tasks in parallel, a ParallelRunner (PR) object is used.
1226 This object keeps a pool of threads (workers), and a queue of tasks
1227 scheduled for 'immediate' execution.
1229 On each iteration, the '_process' loop will take the next task that
1230 is scheduled for 'future' execution from the '_scheduler' queue,
1231 and if the execution time of that task is >= to the current time,
1232 it will push that task into the PR for 'immediate execution'.
1233 As soon as a worker is free, the PR will assign the next task to
1236 Upon receiving a task to execute, each PR worker (thread) will
1237 invoke the _execute method of the EC, passing the task as
1239 The _execute method will then invoke task.callback inside a
1240 try/except block. If an exception is raised by the tasks.callback,
1241 it will be trapped by the try block, logged to standard error
1242 (usually the console), and the task will be marked as failed.
1246 self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
1247 self._runner = ParallelRun(maxthreads = self.nthreads)
1248 self._runner.start()
1250 while not self._stop:
1252 self._cond.acquire()
1254 task = next(self._scheduler)
1257 # No task to execute. Wait for a new task to be scheduled.
1260 # The task timestamp is in the future. Wait for timeout
1261 # or until another task is scheduled.
1263 if now < task.timestamp:
1264 # Calculate timeout in seconds
1265 timeout = tdiffsec(task.timestamp, now)
1267 # Re-schedule task with the same timestamp
1268 self._scheduler.schedule(task)
1272 # Wait timeout or until a new task awakes the condition
1273 self._cond.wait(timeout)
1275 self._cond.release()
1278 # Process tasks in parallel
1279 self._runner.put(self._execute, task)
1282 err = traceback.format_exc()
1283 self.logger.error("Error while processing tasks in the EC: %s" % err)
1285 # Set the EC to FAILED state
1286 self._state = ECState.FAILED
1288 # Set the FailureManager failure level to EC failure
1289 self._fm.set_ec_failure()
1291 self.logger.debug("Exiting the task processing loop ... ")
1294 self._runner.destroy()
1296 def _execute(self, task):
1297 """ Executes a single task.
1299 :param task: Object containing the callback to execute
1305 task.result = task.callback()
1306 task.status = TaskStatus.DONE
1309 err = traceback.format_exc()
1311 task.status = TaskStatus.ERROR
1313 self.logger.error("Error occurred while executing task: %s" % err)
1316 """ Awakes the processing thread if it is blocked waiting
1317 for new tasks to arrive
1320 self._cond.acquire()
1322 self._cond.release()
1324 def _build_from_netgraph(self, add_node_callback, add_edge_callback,
1326 """ Automates experiment description using a NetGraph instance.
1328 self._netgraph = NetGraph(**kwargs)
1330 if add_node_callback:
1331 ### Add resources to the EC
1332 for nid in self.netgraph.nodes():
1333 add_node_callback(self, nid)
1335 if add_edge_callback:
1336 #### Add connections between resources
1337 for nid1, nid2 in self.netgraph.edges():
1338 add_edge_callback(self, nid1, nid2)