44b95780723c16b78f68ed2fc86a5f4d527a1df1
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24         ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27
28 # TODO: use multiprocessing instead of threading
29 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
30
31 import functools
32 import logging
33 import os
34 import sys
35 import time
36 import threading
37 import weakref
38
39 class FailureLevel(object):
40     """ Describes the system failure state """
41     OK = 1
42     RM_FAILURE = 2
43     EC_FAILURE = 3
44
45 class FailureManager(object):
46     """ The FailureManager is responsible for handling errors
47     and deciding whether an experiment should be aborted or not
48
49     """
50
51     def __init__(self, ec):
52         self._ec = weakref.ref(ec)
53         self._failure_level = FailureLevel.OK
54
55     @property
56     def ec(self):
57         """ Returns the ExperimentController associated to this FailureManager 
58         
59         """
60         
61         return self._ec()
62
63     @property
64     def abort(self):
65         if self._failure_level == FailureLevel.OK:
66             for guid in self.ec.resources:
67                 state = self.ec.state(guid)
68                 critical = self.ec.get(guid, "critical")
69                 if state == ResourceState.FAILED and critical:
70                     self._failure_level = FailureLevel.RM_FAILURE
71                     self.ec.logger.debug("RM critical failure occurred on guid %d." \
72                             " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
73                     break
74
75         return self._failure_level != FailureLevel.OK
76
77     def set_ec_failure(self):
78         self._failure_level = FailureLevel.EC_FAILURE
79
80
81 class ECState(object):
82     """ Possible states for an ExperimentController
83    
84     """
85     RUNNING = 1
86     FAILED = 2
87     TERMINATED = 3
88
89 class ExperimentController(object):
90     """
91     .. class:: Class Args :
92       
93         :param exp_id: Human readable identifier for the experiment scenario. 
94         :type exp_id: str
95
96     .. note::
97
98     An experiment, or scenario, is defined by a concrete set of resources,
99     behavior, configuration and interconnection of those resources. 
100     The Experiment Description (ED) is a detailed representation of a
101     single experiment. It contains all the necessary information to 
102     allow repeating the experiment. NEPI allows to describe
103     experiments by registering components (resources), configuring them
104     and interconnecting them.
105     
106     A same experiment (scenario) can be executed many times, generating 
107     different results. We call an experiment execution (instance) a 'run'.
108
109     The ExperimentController (EC), is the entity responsible of
110     managing an experiment run. The same scenario can be 
111     recreated (and re-run) by instantiating an EC and recreating 
112     the same experiment description. 
113
114     In NEPI, an experiment is represented as a graph of interconnected
115     resources. A resource is a generic concept in the sense that any
116     component taking part of an experiment, whether physical of
117     virtual, is considered a resource. A resources could be a host, 
118     a virtual machine, an application, a simulator, a IP address.
119
120     A ResourceManager (RM), is the entity responsible for managing a 
121     single resource. ResourceManagers are specific to a resource
122     type (i.e. An RM to control a Linux application will not be
123     the same as the RM used to control a ns-3 simulation).
124     To support a new type of resource in NEPI, a new RM must be 
125     implemented. NEPI already provides a variety of
126     RMs to control basic resources, and new can be extended from
127     the existing ones.
128
129     Through the EC interface the user can create ResourceManagers (RMs),
130     configure them and interconnect them, to describe an experiment.
131     Describing an experiment through the EC does not run the experiment.
132     Only when the 'deploy()' method is invoked on the EC, the EC will take 
133     actions to transform the 'described' experiment into a 'running' experiment.
134
135     While the experiment is running, it is possible to continue to
136     create/configure/connect RMs, and to deploy them to involve new
137     resources in the experiment (this is known as 'interactive' deployment).
138     
139     An experiments in NEPI is identified by a string id, 
140     which is either given by the user, or automatically generated by NEPI.  
141     The purpose of this identifier is to separate files and results that 
142     belong to different experiment scenarios. 
143     However, since a same 'experiment' can be run many times, the experiment
144     id is not enough to identify an experiment instance (run).
145     For this reason, the ExperimentController has two identifier, the 
146     exp_id, which can be re-used in different ExperimentController,
147     and the run_id, which is unique to one ExperimentController instance, and
148     is automatically generated by NEPI.
149         
150     """
151
152     def __init__(self, exp_id = None): 
153         super(ExperimentController, self).__init__()
154
155         # Logging
156         self._logger = logging.getLogger("ExperimentController")
157
158         # Run identifier. It identifies a concrete execution instance (run) 
159         # of an experiment.
160         # Since a same experiment (same configuration) can be executed many 
161         # times, this run_id permits to separate result files generated on 
162         # different experiment executions
163         self._run_id = tsformat()
164
165         # Experiment identifier. Usually assigned by the user
166         # Identifies the experiment scenario (i.e. configuration, 
167         # resources used, etc)
168         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
169
170         # generator of globally unique ids
171         self._guid_generator = guid.GuidGenerator()
172         
173         # Resource managers
174         self._resources = dict()
175
176         # Scheduler. It a queue that holds tasks scheduled for
177         # execution, and yields the next task to be executed 
178         # ordered by execution and arrival time
179         self._scheduler = HeapScheduler()
180
181         # Tasks
182         self._tasks = dict()
183
184         # RM groups (for deployment) 
185         self._groups = dict()
186
187         # generator of globally unique id for groups
188         self._group_id_generator = guid.GuidGenerator()
189
190         # Flag to stop processing thread
191         self._stop = False
192     
193         # Entity in charge of managing system failures
194         self._fm = FailureManager(self)
195
196         # EC state
197         self._state = ECState.RUNNING
198
199         # Blacklist file for PL nodes
200         nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
201         plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
202         if not os.path.exists(plblacklist_file):
203             if os.path.isdir(nepi_home):
204                 open(plblacklist_file, 'w').close()
205             else:
206                 os.makedirs(nepi_home)
207                 open(plblacklist_file, 'w').close()
208                     
209         # The runner is a pool of threads used to parallelize 
210         # execution of tasks
211         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
212         self._runner = ParallelRun(maxthreads = nthreads)
213
214         # Event processing thread
215         self._cond = threading.Condition()
216         self._thread = threading.Thread(target = self._process)
217         self._thread.setDaemon(True)
218         self._thread.start()
219
220     @property
221     def logger(self):
222         """ Returns the logger instance of the Experiment Controller
223
224         """
225         return self._logger
226
227     @property
228     def ecstate(self):
229         """ Returns the state of the Experiment Controller
230
231         """
232         return self._state
233
234     @property
235     def exp_id(self):
236         """ Returns the experiment id assigned by the user
237
238         """
239         return self._exp_id
240
241     @property
242     def run_id(self):
243         """ Returns the experiment instance (run) identifier (automatically 
244         generated)
245
246         """
247         return self._run_id
248
249     @property
250     def abort(self):
251         """ Returns True if the experiment has failed and should be interrupted,
252         False otherwise.
253
254         """
255         return self._fm.abort
256
257     def wait_finished(self, guids):
258         """ Blocking method that waits until all RMs in the 'guids' list 
259         have reached a state >= STOPPED (i.e. STOPPED, FAILED or 
260         RELEASED ), or until a failure in the experiment occurs 
261         (i.e. abort == True) 
262         
263             :param guids: List of guids
264             :type guids: list
265
266         """
267
268         def quit():
269             return self.abort
270
271         return self.wait(guids, state = ResourceState.STOPPED, 
272                 quit = quit)
273
274     def wait_started(self, guids):
275         """ Blocking method that waits until all RMs in the 'guids' list 
276         have reached a state >= STARTED, or until a failure in the 
277         experiment occurs (i.e. abort == True) 
278         
279             :param guids: List of guids
280             :type guids: list
281
282         """
283
284         def quit():
285             return self.abort
286
287         return self.wait(guids, state = ResourceState.STARTED, 
288                 quit = quit)
289
290     def wait_released(self, guids):
291         """ Blocking method that waits until all RMs in the 'guids' list 
292         have reached a state == RELEASED, or until the EC fails 
293         
294             :param guids: List of guids
295             :type guids: list
296
297         """
298
299         def quit():
300             return self._state == ECState.FAILED
301
302         return self.wait(guids, state = ResourceState.RELEASED, 
303                 quit = quit)
304
305     def wait_deployed(self, guids):
306         """ Blocking method that waits until all RMs in the 'guids' list 
307         have reached a state >= READY, or until a failure in the 
308         experiment occurs (i.e. abort == True) 
309         
310             :param guids: List of guids
311             :type guids: list
312
313         """
314
315         def quit():
316             return self.abort
317
318         return self.wait(guids, state = ResourceState.READY, 
319                 quit = quit)
320
321     def wait(self, guids, state, quit):
322         """ Blocking method that waits until all RMs in the 'guids' list 
323         have reached a state >= 'state', or until the 'quit' callback
324         yields True
325            
326             :param guids: List of guids
327             :type guids: list
328         
329         """
330         
331         if isinstance(guids, int):
332             guids = [guids]
333
334         # Make a copy to avoid modifying the original guids list
335         guids = list(guids)
336
337         while True:
338             # If there are no more guids to wait for
339             # or the quit function returns True, exit the loop
340             if len(guids) == 0 or quit():
341                 break
342
343             # If a guid reached one of the target states, remove it from list
344             guid = guids[0]
345             rstate = self.state(guid)
346             
347             hrrstate = ResourceState2str.get(rstate)
348             hrstate = ResourceState2str.get(state)
349
350             if rstate >= state:
351                 guids.remove(guid)
352                 rm = self.get_resource(guid)
353                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
354                     rm.get_rtype(), guid, hrrstate, hrstate))
355             else:
356                 # Debug...
357                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
358                     guid, hrrstate, hrstate))
359                 time.sleep(0.5)
360   
361     def get_task(self, tid):
362         """ Returns a task by its id
363
364             :param tid: Id of the task
365             :type tid: int
366             
367             :rtype: Task
368             
369         """
370         return self._tasks.get(tid)
371
372     def get_resource(self, guid):
373         """ Returns a registered ResourceManager by its guid
374
375             :param guid: Id of the task
376             :type guid: int
377             
378             :rtype: ResourceManager
379             
380         """
381         return self._resources.get(guid)
382
383     @property
384     def resources(self):
385         """ Returns the set() of guids of all the ResourceManager
386
387             :return: Set of all RM guids
388             :rtype: set
389
390         """
391         return self._resources.keys()
392
393     def register_resource(self, rtype, guid = None):
394         """ Registers a new ResourceManager of type 'rtype' in the experiment
395         
396         This method will assign a new 'guid' for the RM, if no guid
397         is specified.
398
399             :param rtype: Type of the RM
400             :type rtype: str
401
402             :return: Guid of the RM
403             :rtype: int
404             
405         """
406         # Get next available guid
407         guid = self._guid_generator.next(guid)
408         
409         # Instantiate RM
410         rm = ResourceFactory.create(rtype, self, guid)
411
412         # Store RM
413         self._resources[guid] = rm
414
415         return guid
416
417     def get_attributes(self, guid):
418         """ Returns all the attributes of the RM with guid 'guid'
419
420             :param guid: Guid of the RM
421             :type guid: int
422
423             :return: List of attributes
424             :rtype: list
425
426         """
427         rm = self.get_resource(guid)
428         return rm.get_attributes()
429
430     def get_attribute(self, guid, name):
431         """ Returns the attribute 'name' of the RM with guid 'guid'
432
433             :param guid: Guid of the RM
434             :type guid: int
435
436             :param name: Name of the attribute
437             :type name: str
438
439             :return: The attribute with name 'name'
440             :rtype: Attribute
441
442         """
443         rm = self.get_resource(guid)
444         return rm.get_attribute(name)
445
446     def register_connection(self, guid1, guid2):
447         """ Registers a connection between a RM with guid 'guid1'
448         and another RM with guid 'guid2'. 
449     
450         The order of the in which the two guids are provided is not
451         important, since the connection relationship is symmetric.
452
453             :param guid1: First guid to connect
454             :type guid1: ResourceManager
455
456             :param guid2: Second guid to connect
457             :type guid: ResourceManager
458
459         """
460         rm1 = self.get_resource(guid1)
461         rm2 = self.get_resource(guid2)
462
463         rm1.register_connection(guid2)
464         rm2.register_connection(guid1)
465
466     def register_condition(self, guids1, action, guids2, state,
467             time = None):
468         """ Registers an action START, STOP or DEPLOY for all RM on list
469         guids1 to occur at time 'time' after all elements in list guids2 
470         have reached state 'state'.
471
472             :param guids1: List of guids of RMs subjected to action
473             :type guids1: list
474
475             :param action: Action to perform (either START, STOP or DEPLOY)
476             :type action: ResourceAction
477
478             :param guids2: List of guids of RMs to we waited for
479             :type guids2: list
480
481             :param state: State to wait for on RMs of list guids2 (STARTED,
482                 STOPPED, etc)
483             :type state: ResourceState
484
485             :param time: Time to wait after guids2 has reached status 
486             :type time: string
487
488         """
489         if isinstance(guids1, int):
490             guids1 = [guids1]
491         if isinstance(guids2, int):
492             guids2 = [guids2]
493
494         for guid1 in guids1:
495             rm = self.get_resource(guid1)
496             rm.register_condition(action, guids2, state, time)
497
498     def enable_trace(self, guid, name):
499         """ Enables a trace to be collected during the experiment run
500
501             :param name: Name of the trace
502             :type name: str
503
504         """
505         rm = self.get_resource(guid)
506         rm.enable_trace(name)
507
508     def trace_enabled(self, guid, name):
509         """ Returns True if the trace of name 'name' is enabled
510
511             :param name: Name of the trace
512             :type name: str
513
514         """
515         rm = self.get_resource(guid)
516         return rm.trace_enabled(name)
517
518     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
519         """ Returns information on a collected trace, the trace stream or 
520         blocks (chunks) of the trace stream
521
522             :param name: Name of the trace
523             :type name: str
524
525             :param attr: Can be one of:
526                          - TraceAttr.ALL (complete trace content), 
527                          - TraceAttr.STREAM (block in bytes to read starting 
528                                 at offset),
529                          - TraceAttr.PATH (full path to the trace file),
530                          - TraceAttr.SIZE (size of trace file). 
531             :type attr: str
532
533             :param block: Number of bytes to retrieve from trace, when attr is 
534                 TraceAttr.STREAM 
535             :type name: int
536
537             :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
538             :type name: int
539
540             :rtype: str
541
542         """
543         rm = self.get_resource(guid)
544         return rm.trace(name, attr, block, offset)
545
546     def get_traces(self, guid):
547         """ Returns the list of the trace names of the RM with guid 'guid'
548
549             :param guid: Guid of the RM
550             :type guid: int
551
552             :return: List of trace names
553             :rtype: list
554
555         """
556         rm = self.get_resource(guid)
557         return rm.get_traces()
558
559
560     def discover(self, guid):
561         """ Discovers an available resource matching the criteria defined
562         by the RM with guid 'guid', and associates that resource to the RM
563
564         Not all RM types require (or are capable of) performing resource 
565         discovery. For the RM types which are not capable of doing so, 
566         invoking this method does not have any consequences. 
567
568             :param guid: Guid of the RM
569             :type guid: int
570
571         """
572         rm = self.get_resource(guid)
573         return rm.discover()
574
575     def provision(self, guid):
576         """ Provisions the resource associated to the RM with guid 'guid'.
577
578         Provisioning means making a resource 'accessible' to the user. 
579         Not all RM types require (or are capable of) performing resource 
580         provisioning. For the RM types which are not capable of doing so, 
581         invoking this method does not have any consequences. 
582
583             :param guid: Guid of the RM
584             :type guid: int
585
586         """
587         rm = self.get_resource(guid)
588         return rm.provision()
589
590     def get(self, guid, name):
591         """ Returns the value of the attribute with name 'name' on the
592         RM with guid 'guid'
593
594             :param guid: Guid of the RM
595             :type guid: int
596
597             :param name: Name of the attribute 
598             :type name: str
599
600             :return: The value of the attribute with name 'name'
601
602         """
603         rm = self.get_resource(guid)
604         return rm.get(name)
605
606     def set(self, guid, name, value):
607         """ Modifies the value of the attribute with name 'name' on the 
608         RM with guid 'guid'.
609
610             :param guid: Guid of the RM
611             :type guid: int
612
613             :param name: Name of the attribute
614             :type name: str
615
616             :param value: Value of the attribute
617
618         """
619         rm = self.get_resource(guid)
620         rm.set(name, value)
621
622     def get_global(self, rtype, name):
623         """ Returns the value of the global attribute with name 'name' on the
624         RMs of rtype 'rtype'.
625
626             :param guid: Guid of the RM
627             :type guid: int
628
629             :param name: Name of the attribute 
630             :type name: str
631
632             :return: The value of the attribute with name 'name'
633
634         """
635         rclass = ResourceFactory.get_resource_type(rtype)
636         return rclass.get_global(name)
637
638     def set_global(self, rtype, name, value):
639         """ Modifies the value of the global attribute with name 'name' on the 
640         RMs of with rtype 'rtype'.
641
642             :param guid: Guid of the RM
643             :type guid: int
644
645             :param name: Name of the attribute
646             :type name: str
647
648             :param value: Value of the attribute
649
650         """
651         rclass = ResourceFactory.get_resource_type(rtype)
652         return rclass.set_global(name, value)
653
654     def state(self, guid, hr = False):
655         """ Returns the state of a resource
656
657             :param guid: Resource guid
658             :type guid: integer
659
660             :param hr: Human readable. Forces return of a 
661                 status string instead of a number 
662             :type hr: boolean
663
664         """
665         rm = self.get_resource(guid)
666         state = rm.state
667
668         if hr:
669             return ResourceState2str.get(state)
670
671         return state
672
673     def stop(self, guid):
674         """ Stops the RM with guid 'guid'
675
676         Stopping a RM means that the resource it controls will
677         no longer take part of the experiment.
678
679             :param guid: Guid of the RM
680             :type guid: int
681
682         """
683         rm = self.get_resource(guid)
684         return rm.stop()
685
686     def start(self, guid):
687         """ Starts the RM with guid 'guid'
688
689         Starting a RM means that the resource it controls will
690         begin taking part of the experiment.
691
692             :param guid: Guid of the RM
693             :type guid: int
694
695         """
696         rm = self.get_resource(guid)
697         return rm.start()
698
699     def set_with_conditions(self, name, value, guids1, guids2, state,
700             time = None):
701         """ Modifies the value of attribute with name 'name' on all RMs 
702         on the guids1 list when time 'time' has elapsed since all 
703         elements in guids2 list have reached state 'state'.
704
705             :param name: Name of attribute to set in RM
706             :type name: string
707
708             :param value: Value of attribute to set in RM
709             :type name: string
710
711             :param guids1: List of guids of RMs subjected to action
712             :type guids1: list
713
714             :param action: Action to register (either START or STOP)
715             :type action: ResourceAction
716
717             :param guids2: List of guids of RMs to we waited for
718             :type guids2: list
719
720             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
721             :type state: ResourceState
722
723             :param time: Time to wait after guids2 has reached status 
724             :type time: string
725
726         """
727         if isinstance(guids1, int):
728             guids1 = [guids1]
729         if isinstance(guids2, int):
730             guids2 = [guids2]
731
732         for guid1 in guids1:
733             rm = self.get_resource(guid)
734             rm.set_with_conditions(name, value, guids2, state, time)
735
736     def deploy(self, guids = None, wait_all_ready = True, group = None):
737         """ Deploys all ResourceManagers in the guids list. 
738         
739         If the argument 'guids' is not given, all RMs with state NEW
740         are deployed.
741
742             :param guids: List of guids of RMs to deploy
743             :type guids: list
744
745             :param wait_all_ready: Wait until all RMs are ready in
746                 order to start the RMs
747             :type guid: int
748
749             :param group: Id of deployment group in which to deploy RMs
750             :type group: int
751
752         """
753         self.logger.debug(" ------- DEPLOY START ------ ")
754
755         if not guids:
756             # If no guids list was passed, all 'NEW' RMs will be deployed
757             guids = []
758             for guid in self.resources:
759                 if self.state(guid) == ResourceState.NEW:
760                     guids.append(guid)
761                 
762         if isinstance(guids, int):
763             guids = [guids]
764
765         # Create deployment group
766         # New guids can be added to a same deployment group later on
767         new_group = False
768         if not group:
769             new_group = True
770             group = self._group_id_generator.next()
771
772         if group not in self._groups:
773             self._groups[group] = []
774
775         self._groups[group].extend(guids)
776
777         def wait_all_and_start(group):
778             # Function that checks if all resources are READY
779             # before scheduling a start_with_conditions for each RM
780             reschedule = False
781             
782             # Get all guids in group
783             guids = self._groups[group]
784
785             for guid in guids:
786                 if self.state(guid) < ResourceState.READY:
787                     reschedule = True
788                     break
789
790             if reschedule:
791                 callback = functools.partial(wait_all_and_start, group)
792                 self.schedule("1s", callback)
793             else:
794                 # If all resources are ready, we schedule the start
795                 for guid in guids:
796                     rm = self.get_resource(guid)
797                     self.schedule("0s", rm.start_with_conditions)
798
799                     if rm.conditions.get(ResourceAction.STOP):
800                         # Only if the RM has STOP conditions we
801                         # schedule a stop. Otherwise the RM will stop immediately
802                         self.schedule("0s", rm.stop_with_conditions)
803
804         if wait_all_ready and new_group:
805             # Schedule a function to check that all resources are
806             # READY, and only then schedule the start.
807             # This aims at reducing the number of tasks looping in the 
808             # scheduler. 
809             # Instead of having many start tasks, we will have only one for 
810             # the whole group.
811             callback = functools.partial(wait_all_and_start, group)
812             self.schedule("0s", callback)
813
814         for guid in guids:
815             rm = self.get_resource(guid)
816             rm.deployment_group = group
817             self.schedule("0s", rm.deploy_with_conditions)
818
819             if not wait_all_ready:
820                 self.schedule("0s", rm.start_with_conditions)
821
822                 if rm.conditions.get(ResourceAction.STOP):
823                     # Only if the RM has STOP conditions we
824                     # schedule a stop. Otherwise the RM will stop immediately
825                     self.schedule("0s", rm.stop_with_conditions)
826
827     def release(self, guids = None):
828         """ Releases all ResourceManagers in the guids list.
829
830         If the argument 'guids' is not given, all RMs registered
831         in the experiment are released.
832
833             :param guids: List of RM guids
834             :type guids: list
835
836         """
837         if not guids:
838             guids = self.resources
839
840         # Remove all pending tasks from the scheduler queue
841         for tid in list(self._scheduler.pending):
842             self._scheduler.remove(tid)
843
844         self._runner.empty()
845
846         for guid in guids:
847             rm = self.get_resource(guid)
848             self.schedule("0s", rm.release)
849
850         self.wait_released(guids)
851         
852     def shutdown(self):
853         """ Releases all resources and stops the ExperimentController
854
855         """
856         # If there was a major failure we can't exit gracefully
857         if self._state == ECState.FAILED:
858             raise RuntimeError("EC failure. Can not exit gracefully")
859
860         self.release()
861
862         # Mark the EC state as TERMINATED
863         self._state = ECState.TERMINATED
864
865         # Stop processing thread
866         self._stop = True
867
868         # Notify condition to wake up the processing thread
869         self._notify()
870         
871         if self._thread.is_alive():
872            self._thread.join()
873
874     def schedule(self, date, callback, track = False):
875         """ Schedules a callback to be executed at time 'date'.
876
877             :param date: string containing execution time for the task.
878                     It can be expressed as an absolute time, using
879                     timestamp format, or as a relative time matching
880                     ^\d+.\d+(h|m|s|ms|us)$
881
882             :param callback: code to be executed for the task. Must be a
883                         Python function, and receives args and kwargs
884                         as arguments.
885
886             :param track: if set to True, the task will be retrievable with
887                     the get_task() method
888
889             :return : The Id of the task
890             :rtype: int
891             
892         """
893         timestamp = stabsformat(date)
894         task = Task(timestamp, callback)
895         task = self._scheduler.schedule(task)
896
897         if track:
898             self._tasks[task.id] = task
899
900         # Notify condition to wake up the processing thread
901         self._notify()
902
903         return task.id
904      
905     def _process(self):
906         """ Process scheduled tasks.
907
908         .. note::
909         
910         Tasks are scheduled by invoking the schedule method with a target 
911         callback and an execution time. 
912         The schedule method creates a new Task object with that callback 
913         and execution time, and pushes it into the '_scheduler' queue. 
914         The execution time and the order of arrival of tasks are used 
915         to order the tasks in the queue.
916
917         The _process method is executed in an independent thread held by 
918         the ExperimentController for as long as the experiment is running.
919         This method takes tasks from the '_scheduler' queue in a loop 
920         and processes them in parallel using multithreading. 
921         The environmental variable NEPI_NTHREADS can be used to control
922         the number of threads used to process tasks. The default value is 
923         50.
924
925         To execute tasks in parallel, a ParallelRunner (PR) object is used.
926         This object keeps a pool of threads (workers), and a queue of tasks
927         scheduled for 'immediate' execution. 
928         
929         On each iteration, the '_process' loop will take the next task that 
930         is scheduled for 'future' execution from the '_scheduler' queue, 
931         and if the execution time of that task is >= to the current time, 
932         it will push that task into the PR for 'immediate execution'. 
933         As soon as a worker is free, the PR will assign the next task to
934         that worker.
935
936         Upon receiving a task to execute, each PR worker (thread) will 
937         invoke the  _execute method of the EC, passing the task as 
938         argument.         
939         The _execute method will then invoke task.callback inside a 
940         try/except block. If an exception is raised by the tasks.callback, 
941         it will be trapped by the try block, logged to standard error 
942         (usually the console), and the task will be marked as failed.
943
944         """
945
946         self._runner.start()
947
948         while not self._stop:
949             try:
950                 self._cond.acquire()
951
952                 task = self._scheduler.next()
953                 
954                 if not task:
955                     # No task to execute. Wait for a new task to be scheduled.
956                     self._cond.wait()
957                 else:
958                     # The task timestamp is in the future. Wait for timeout 
959                     # or until another task is scheduled.
960                     now = tnow()
961                     if now < task.timestamp:
962                         # Calculate timeout in seconds
963                         timeout = tdiffsec(task.timestamp, now)
964
965                         # Re-schedule task with the same timestamp
966                         self._scheduler.schedule(task)
967                         
968                         task = None
969
970                         # Wait timeout or until a new task awakes the condition
971                         self._cond.wait(timeout)
972                
973                 self._cond.release()
974
975                 if task:
976                     # Process tasks in parallel
977                     self._runner.put(self._execute, task)
978             except: 
979                 import traceback
980                 err = traceback.format_exc()
981                 self.logger.error("Error while processing tasks in the EC: %s" % err)
982
983                 # Set the EC to FAILED state 
984                 self._state = ECState.FAILED
985             
986                 # Set the FailureManager failure level to EC failure
987                 self._fm.set_ec_failure()
988
989         self.logger.debug("Exiting the task processing loop ... ")
990         
991         self._runner.sync()
992         self._runner.destroy()
993
994     def _execute(self, task):
995         """ Executes a single task. 
996
997             :param task: Object containing the callback to execute
998             :type task: Task
999
1000         """
1001         try:
1002             # Invoke callback
1003             task.result = task.callback()
1004             task.status = TaskStatus.DONE
1005         except:
1006             import traceback
1007             err = traceback.format_exc()
1008             task.result = err
1009             task.status = TaskStatus.ERROR
1010             
1011             self.logger.error("Error occurred while executing task: %s" % err)
1012
1013     def _notify(self):
1014         """ Awakes the processing thread if it is blocked waiting 
1015         for new tasks to arrive
1016         
1017         """
1018         self._cond.acquire()
1019         self._cond.notify()
1020         self._cond.release()
1021