Changing reschedule_delay internals
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24         ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27 from nepi.util.serializer import ECSerializer, SFormats
28 from nepi.util.plotter import ECPlotter, PFormats
29 from nepi.util.netgraph import NetGraph, TopologyType 
30
31 # TODO: use multiprocessing instead of threading
32 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
33
34 import functools
35 import logging
36 import os
37 import sys
38 import tempfile
39 import time
40 import threading
41 import weakref
42
43 class FailureLevel(object):
44     """ Possible failure states for the experiment """
45     OK = 1
46     RM_FAILURE = 2
47     EC_FAILURE = 3
48
49 class FailureManager(object):
50     """ The FailureManager is responsible for handling errors
51     and deciding whether an experiment should be aborted or not
52     """
53
54     def __init__(self):
55         self._ec = None
56         self._failure_level = FailureLevel.OK
57         self._abort = False
58
59     def set_ec(self, ec):
60         self._ec = weakref.ref(ec)
61
62     @property
63     def ec(self):
64         """ Returns the ExperimentController associated to this FailureManager 
65         """
66         return self._ec()
67
68     @property
69     def abort(self):
70         return self._abort
71
72     def eval_failure(self, guid):
73         """ Implements failure policy and sets the abort state of the
74         experiment based on the failure state and criticality of
75         the RM
76
77         :param guid: Guid of the RM upon which the failure of the experiment
78             is evaluated
79         :type guid: int
80
81         """
82         if self._failure_level == FailureLevel.OK:
83             rm = self.ec.get_resource(guid)
84             state = rm.state
85             critical = rm.get("critical")
86
87             if state == ResourceState.FAILED and critical:
88                 self._failure_level = FailureLevel.RM_FAILURE
89                 self._abort = True
90                 self.ec.logger.debug("RM critical failure occurred on guid %d." \
91                     " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
92
93     def set_ec_failure(self):
94         self._failure_level = FailureLevel.EC_FAILURE
95
96 class ECState(object):
97     """ Possible states of the ExperimentController
98    
99     """
100     RUNNING = 1
101     FAILED = 2
102     RELEASED = 3
103     TERMINATED = 4
104
105 class ExperimentController(object):
106     """
107     .. note::
108
109     An experiment, or scenario, is defined by a concrete set of resources,
110     and the behavior, configuration and interconnection of those resources. 
111     The Experiment Description (ED) is a detailed representation of a
112     single experiment. It contains all the necessary information to 
113     allow repeating the experiment. NEPI allows to describe
114     experiments by registering components (resources), configuring them
115     and interconnecting them.
116     
117     A same experiment (scenario) can be executed many times, generating 
118     different results. We call an experiment execution (instance) a 'run'.
119
120     The ExperimentController (EC), is the entity responsible of
121     managing an experiment run. The same scenario can be 
122     recreated (and re-run) by instantiating an EC and recreating 
123     the same experiment description. 
124
125     An experiment is represented as a graph of interconnected
126     resources. A resource is a generic concept in the sense that any
127     component taking part of an experiment, whether physical of
128     virtual, is considered a resource. A resources could be a host, 
129     a virtual machine, an application, a simulator, a IP address.
130
131     A ResourceManager (RM), is the entity responsible for managing a 
132     single resource. ResourceManagers are specific to a resource
133     type (i.e. An RM to control a Linux application will not be
134     the same as the RM used to control a ns-3 simulation).
135     To support a new type of resource, a new RM must be implemented. 
136     NEPI already provides a variety of RMs to control basic resources, 
137     and new can be extended from the existing ones.
138
139     Through the EC interface the user can create ResourceManagers (RMs),
140     configure them and interconnect them, to describe an experiment.
141     Describing an experiment through the EC does not run the experiment.
142     Only when the 'deploy()' method is invoked on the EC, the EC will take 
143     actions to transform the 'described' experiment into a 'running' experiment.
144
145     While the experiment is running, it is possible to continue to
146     create/configure/connect RMs, and to deploy them to involve new
147     resources in the experiment (this is known as 'interactive' deployment).
148     
149     An experiments in NEPI is identified by a string id, 
150     which is either given by the user, or automatically generated by NEPI.  
151     The purpose of this identifier is to separate files and results that 
152     belong to different experiment scenarios. 
153     However, since a same 'experiment' can be run many times, the experiment
154     id is not enough to identify an experiment instance (run).
155     For this reason, the ExperimentController has two identifier, the 
156     exp_id, which can be re-used in different ExperimentController,
157     and the run_id, which is unique to one ExperimentController instance, and
158     is automatically generated by NEPI.
159    
160     """
161
162     @classmethod
163     def load(cls, filepath, format = SFormats.XML):
164         serializer = ECSerializer()
165         ec = serializer.load(filepath)
166         return ec
167
168     def __init__(self, exp_id = None, local_dir = None, persist = False,
169             fm = None, add_node_callback = None, add_edge_callback = None, 
170             **kwargs):
171         """ ExperimentController entity to model an execute a network 
172         experiment.
173         
174         :param exp_id: Human readable name to identify the experiment
175         :type exp_id: str
176
177         :param local_dir: Path to local directory where to store experiment
178             related files
179         :type local_dir: str
180
181         :param persist: Save an XML description of the experiment after 
182         completion at local_dir
183         :type persist: bool
184
185         :param fm: FailureManager object. If None is given, the default 
186             FailureManager class will be used
187         :type fm: FailureManager
188
189         :param add_node_callback: Callback to invoke for node instantiation
190         when automatic topology creation mode is used 
191         :type add_node_callback: function
192
193         :param add_edge_callback: Callback to invoke for edge instantiation 
194         when automatic topology creation mode is used 
195         :type add_edge_callback: function
196
197         """
198         super(ExperimentController, self).__init__()
199
200         # Logging
201         self._logger = logging.getLogger("ExperimentController")
202
203         # Run identifier. It identifies a concrete execution instance (run) 
204         # of an experiment.
205         # Since a same experiment (same configuration) can be executed many 
206         # times, this run_id permits to separate result files generated on 
207         # different experiment executions
208         self._run_id = tsformat()
209
210         # Experiment identifier. Usually assigned by the user
211         # Identifies the experiment scenario (i.e. configuration, 
212         # resources used, etc)
213         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
214
215         # Local path where to store experiment related files (results, etc)
216         if not local_dir:
217             local_dir = tempfile.gettempdir() # /tmp
218
219         self._local_dir = local_dir
220         self._exp_dir = os.path.join(local_dir, self.exp_id)
221         self._run_dir = os.path.join(self.exp_dir, self.run_id)
222
223         # If True persist the experiment controller in XML format, after completion
224         self._persist = persist
225
226         # generator of globally unique ids
227         self._guid_generator = guid.GuidGenerator()
228         
229         # Resource managers
230         self._resources = dict()
231
232         # Scheduler. It a queue that holds tasks scheduled for
233         # execution, and yields the next task to be executed 
234         # ordered by execution and arrival time
235         self._scheduler = HeapScheduler()
236
237         # Tasks
238         self._tasks = dict()
239
240         # RM groups (for deployment) 
241         self._groups = dict()
242
243         # generator of globally unique id for groups
244         self._group_id_generator = guid.GuidGenerator()
245
246         # Flag to stop processing thread
247         self._stop = False
248     
249         # Entity in charge of managing system failures
250         if not fm:
251             self._fm = FailureManager()
252         self._fm.set_ec(self)
253
254         # EC state
255         self._state = ECState.RUNNING
256
257         # Automatically construct experiment description 
258         self._netgraph = None
259         if add_node_callback or add_edge_callback or kwargs.get("topology"):
260             self._build_from_netgraph(add_node_callback, add_edge_callback, 
261                     **kwargs)
262
263         # The runner is a pool of threads used to parallelize 
264         # execution of tasks
265         self._nthreads = 20
266         self._runner = None
267
268         # Event processing thread
269         self._cond = threading.Condition()
270         self._thread = threading.Thread(target = self._process)
271         self._thread.setDaemon(True)
272         self._thread.start()
273         
274     @property
275     def logger(self):
276         """ Returns the logger instance of the Experiment Controller
277
278         """
279         return self._logger
280
281     @property
282     def fm(self):
283         """ Returns the failure manager
284
285         """
286
287         return self.fm
288
289     @property
290     def failure_level(self):
291         """ Returns the level of FAILURE of th experiment
292
293         """
294
295         return self._fm._failure_level
296
297     @property
298     def ecstate(self):
299         """ Returns the state of the Experiment Controller
300
301         """
302         return self._state
303
304     @property
305     def exp_id(self):
306         """ Returns the experiment id assigned by the user
307
308         """
309         return self._exp_id
310
311     @property
312     def run_id(self):
313         """ Returns the experiment instance (run) identifier (automatically 
314         generated)
315
316         """
317         return self._run_id
318
319     @property
320     def nthreads(self):
321         """ Returns the number of processing nthreads used
322
323         """
324         return self._nthreads
325
326     @property
327     def local_dir(self):
328         """ Root local directory for experiment files
329
330         """
331         return self._local_dir
332
333     @property
334     def exp_dir(self):
335         """ Local directory to store results and other files related to the 
336         experiment.
337
338         """
339         return self._exp_dir
340
341     @property
342     def run_dir(self):
343         """ Local directory to store results and other files related to the 
344         experiment run.
345
346         """
347         return self._run_dir
348
349     @property
350     def persist(self):
351         """ If True, persists the ExperimentController to XML format upon 
352         experiment completion
353
354         """
355         return self._persist
356
357     @property
358     def netgraph(self):
359         """ Return NetGraph instance if experiment description was automatically 
360         generated
361
362         """
363         return self._netgraph
364
365     @property
366     def abort(self):
367         """ Returns True if the experiment has failed and should be interrupted,
368         False otherwise.
369
370         """
371         return self._fm.abort
372
373     def inform_failure(self, guid):
374         """ Reports a failure in a RM to the EC for evaluation
375
376             :param guid: Resource id
377             :type guid: int
378
379         """
380
381         return self._fm.eval_failure(guid)
382
383     def wait_finished(self, guids):
384         """ Blocking method that waits until all RMs in the 'guids' list 
385         have reached a state >= STOPPED (i.e. STOPPED, FAILED or 
386         RELEASED ), or until a failure in the experiment occurs 
387         (i.e. abort == True) 
388         
389             :param guids: List of guids
390             :type guids: list
391
392         """
393
394         def quit():
395             return self.abort
396
397         return self.wait(guids, state = ResourceState.STOPPED, 
398                 quit = quit)
399
400     def wait_started(self, guids):
401         """ Blocking method that waits until all RMs in the 'guids' list 
402         have reached a state >= STARTED, or until a failure in the 
403         experiment occurs (i.e. abort == True) 
404         
405             :param guids: List of guids
406             :type guids: list
407
408         """
409
410         def quit():
411             return self.abort
412
413         return self.wait(guids, state = ResourceState.STARTED, 
414                 quit = quit)
415
416     def wait_released(self, guids):
417         """ Blocking method that waits until all RMs in the 'guids' list 
418         have reached a state == RELEASED, or until the EC fails 
419         
420             :param guids: List of guids
421             :type guids: list
422
423         """
424
425         def quit():
426             return self._state == ECState.FAILED
427
428         return self.wait(guids, state = ResourceState.RELEASED, 
429                 quit = quit)
430
431     def wait_deployed(self, guids):
432         """ Blocking method that waits until all RMs in the 'guids' list 
433         have reached a state >= READY, or until a failure in the 
434         experiment occurs (i.e. abort == True) 
435         
436             :param guids: List of guids
437             :type guids: list
438
439         """
440
441         def quit():
442             return self.abort
443
444         return self.wait(guids, state = ResourceState.READY, 
445                 quit = quit)
446
447     def wait(self, guids, state, quit):
448         """ Blocking method that waits until all RMs in the 'guids' list 
449         have reached a state >= 'state', or until the 'quit' callback
450         yields True
451            
452             :param guids: List of guids
453             :type guids: list
454         
455         """
456         if isinstance(guids, int):
457             guids = [guids]
458
459         # Make a copy to avoid modifying the original guids list
460         guids = list(guids)
461
462         while True:
463             # If there are no more guids to wait for
464             # or the quit function returns True, exit the loop
465             if len(guids) == 0 or quit():
466                 break
467
468             # If a guid reached one of the target states, remove it from list
469             guid = guids.pop()
470             rm = self.get_resource(guid)
471             rstate = rm.state
472             
473             if rstate >= state:
474                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
475                     rm.get_rtype(), guid, rstate, state))
476             else:
477                 # Debug...
478                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
479                     guid, rstate, state))
480
481                 guids.append(guid)
482
483                 time.sleep(0.5)
484
485     def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
486         plotter = ECPlotter()
487         fpath = plotter.plot(self, dirpath = dirpath, format= format, 
488                 show = show)
489         return fpath
490
491     def serialize(self, format = SFormats.XML):
492         serializer = ECSerializer()
493         sec = serializer.load(self, format = format)
494         return sec
495
496     def save(self, dirpath = None, format = SFormats.XML):
497         if dirpath == None:
498             dirpath = self.run_dir
499
500         try:
501             os.makedirs(dirpath)
502         except OSError:
503             pass
504
505         serializer = ECSerializer()
506         path = serializer.save(self, dirpath, format = format)
507         return path
508
509     def get_task(self, tid):
510         """ Returns a task by its id
511
512             :param tid: Id of the task
513             :type tid: int
514             
515             :rtype: Task
516             
517         """
518         return self._tasks.get(tid)
519
520     def get_resource(self, guid):
521         """ Returns a registered ResourceManager by its guid
522
523             :param guid: Id of the resource
524             :type guid: int
525             
526             :rtype: ResourceManager
527             
528         """
529         rm = self._resources.get(guid)
530         return rm
531
532     def get_resources_by_type(self, rtype):
533         """ Returns the ResourceManager objects of type rtype
534
535             :param rtype: Resource type
536             :type rtype: string
537             
538             :rtype: list of ResourceManagers
539             
540         """
541         rms = []
542         for guid, rm in self._resources.iteritems():
543             if rm.get_rtype() == rtype: 
544                 rms.append(rm)
545         return rms
546
547     def remove_resource(self, guid):
548         del self._resources[guid]
549
550     @property
551     def resources(self):
552         """ Returns the guids of all ResourceManagers 
553
554             :return: Set of all RM guids
555             :rtype: list
556
557         """
558         keys = self._resources.keys()
559
560         return keys
561
562     def filter_resources(self, rtype):
563         """ Returns the guids of all ResourceManagers of type rtype
564
565             :param rtype: Resource type
566             :type rtype: string
567             
568             :rtype: list of guids
569             
570         """
571         rms = []
572         for guid, rm in self._resources.iteritems():
573             if rm.get_rtype() == rtype: 
574                 rms.append(rm.guid)
575         return rms
576
577     def register_resource(self, rtype, guid = None):
578         """ Registers a new ResourceManager of type 'rtype' in the experiment
579         
580         This method will assign a new 'guid' for the RM, if no guid
581         is specified.
582
583             :param rtype: Type of the RM
584             :type rtype: str
585
586             :return: Guid of the RM
587             :rtype: int
588             
589         """
590         # Get next available guid
591         guid = self._guid_generator.next(guid)
592         
593         # Instantiate RM
594         rm = ResourceFactory.create(rtype, self, guid)
595
596         # Store RM
597         self._resources[guid] = rm
598
599         return guid
600
601     def get_attributes(self, guid):
602         """ Returns all the attributes of the RM with guid 'guid'
603
604             :param guid: Guid of the RM
605             :type guid: int
606
607             :return: List of attributes
608             :rtype: list
609
610         """
611         rm = self.get_resource(guid)
612         return rm.get_attributes()
613
614     def get_attribute(self, guid, name):
615         """ Returns the attribute 'name' of the RM with guid 'guid'
616
617             :param guid: Guid of the RM
618             :type guid: int
619
620             :param name: Name of the attribute
621             :type name: str
622
623             :return: The attribute with name 'name'
624             :rtype: Attribute
625
626         """
627         rm = self.get_resource(guid)
628         return rm.get_attribute(name)
629
630     def register_connection(self, guid1, guid2):
631         """ Registers a connection between a RM with guid 'guid1'
632         and another RM with guid 'guid2'. 
633     
634         The order of the in which the two guids are provided is not
635         important, since the connection relationship is symmetric.
636
637             :param guid1: First guid to connect
638             :type guid1: ResourceManager
639
640             :param guid2: Second guid to connect
641             :type guid: ResourceManager
642
643         """
644         rm1 = self.get_resource(guid1)
645         rm2 = self.get_resource(guid2)
646
647         rm1.register_connection(guid2)
648         rm2.register_connection(guid1)
649
650     def register_condition(self, guids1, action, guids2, state,
651             time = None):
652         """ Registers an action START, STOP or DEPLOY for all RM on list
653         guids1 to occur at time 'time' after all elements in list guids2 
654         have reached state 'state'.
655
656             :param guids1: List of guids of RMs subjected to action
657             :type guids1: list
658
659             :param action: Action to perform (either START, STOP or DEPLOY)
660             :type action: ResourceAction
661
662             :param guids2: List of guids of RMs to we waited for
663             :type guids2: list
664
665             :param state: State to wait for on RMs of list guids2 (STARTED,
666                 STOPPED, etc)
667             :type state: ResourceState
668
669             :param time: Time to wait after guids2 has reached status 
670             :type time: string
671
672         """
673         if isinstance(guids1, int):
674             guids1 = [guids1]
675         if isinstance(guids2, int):
676             guids2 = [guids2]
677
678         for guid1 in guids1:
679             rm = self.get_resource(guid1)
680             rm.register_condition(action, guids2, state, time)
681
682     def enable_trace(self, guid, name):
683         """ Enables a trace to be collected during the experiment run
684
685             :param name: Name of the trace
686             :type name: str
687
688         """
689         rm = self.get_resource(guid)
690         rm.enable_trace(name)
691
692     def trace_enabled(self, guid, name):
693         """ Returns True if the trace of name 'name' is enabled
694
695             :param name: Name of the trace
696             :type name: str
697
698         """
699         rm = self.get_resource(guid)
700         return rm.trace_enabled(name)
701
702     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
703         """ Returns information on a collected trace, the trace stream or 
704         blocks (chunks) of the trace stream
705
706             :param name: Name of the trace
707             :type name: str
708
709             :param attr: Can be one of:
710                          - TraceAttr.ALL (complete trace content), 
711                          - TraceAttr.STREAM (block in bytes to read starting 
712                                 at offset),
713                          - TraceAttr.PATH (full path to the trace file),
714                          - TraceAttr.SIZE (size of trace file). 
715             :type attr: str
716
717             :param block: Number of bytes to retrieve from trace, when attr is 
718                 TraceAttr.STREAM 
719             :type name: int
720
721             :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
722             :type name: int
723
724             :rtype: str
725
726         """
727         rm = self.get_resource(guid)
728         return rm.trace(name, attr, block, offset)
729
730     def get_traces(self, guid):
731         """ Returns the list of the trace names of the RM with guid 'guid'
732
733             :param guid: Guid of the RM
734             :type guid: int
735
736             :return: List of trace names
737             :rtype: list
738
739         """
740         rm = self.get_resource(guid)
741         return rm.get_traces()
742
743
744     def discover(self, guid):
745         """ Discovers an available resource matching the criteria defined
746         by the RM with guid 'guid', and associates that resource to the RM
747
748         Not all RM types require (or are capable of) performing resource 
749         discovery. For the RM types which are not capable of doing so, 
750         invoking this method does not have any consequences. 
751
752             :param guid: Guid of the RM
753             :type guid: int
754
755         """
756         rm = self.get_resource(guid)
757         return rm.discover()
758
759     def provision(self, guid):
760         """ Provisions the resource associated to the RM with guid 'guid'.
761
762         Provisioning means making a resource 'accessible' to the user. 
763         Not all RM types require (or are capable of) performing resource 
764         provisioning. For the RM types which are not capable of doing so, 
765         invoking this method does not have any consequences. 
766
767             :param guid: Guid of the RM
768             :type guid: int
769
770         """
771         rm = self.get_resource(guid)
772         return rm.provision()
773
774     def get(self, guid, name):
775         """ Returns the value of the attribute with name 'name' on the
776         RM with guid 'guid'
777
778             :param guid: Guid of the RM
779             :type guid: int
780
781             :param name: Name of the attribute 
782             :type name: str
783
784             :return: The value of the attribute with name 'name'
785
786         """
787         rm = self.get_resource(guid)
788         return rm.get(name)
789
790     def set(self, guid, name, value):
791         """ Modifies the value of the attribute with name 'name' on the 
792         RM with guid 'guid'.
793
794             :param guid: Guid of the RM
795             :type guid: int
796
797             :param name: Name of the attribute
798             :type name: str
799
800             :param value: Value of the attribute
801
802         """
803         rm = self.get_resource(guid)
804         rm.set(name, value)
805
806     def get_global(self, rtype, name):
807         """ Returns the value of the global attribute with name 'name' on the
808         RMs of rtype 'rtype'.
809
810             :param guid: Guid of the RM
811             :type guid: int
812
813             :param name: Name of the attribute 
814             :type name: str
815
816             :return: The value of the attribute with name 'name'
817
818         """
819         rclass = ResourceFactory.get_resource_type(rtype)
820         return rclass.get_global(name)
821
822     def set_global(self, rtype, name, value):
823         """ Modifies the value of the global attribute with name 'name' on the 
824         RMs of with rtype 'rtype'.
825
826             :param guid: Guid of the RM
827             :type guid: int
828
829             :param name: Name of the attribute
830             :type name: str
831
832             :param value: Value of the attribute
833
834         """
835         rclass = ResourceFactory.get_resource_type(rtype)
836         return rclass.set_global(name, value)
837
838     def state(self, guid, hr = False):
839         """ Returns the state of a resource
840
841             :param guid: Resource guid
842             :type guid: integer
843
844             :param hr: Human readable. Forces return of a 
845                 status string instead of a number 
846             :type hr: boolean
847
848         """
849         rm = self.get_resource(guid)
850         state = rm.state
851
852         if hr:
853             return ResourceState2str.get(state)
854
855         return state
856
857     def stop(self, guid):
858         """ Stops the RM with guid 'guid'
859
860         Stopping a RM means that the resource it controls will
861         no longer take part of the experiment.
862
863             :param guid: Guid of the RM
864             :type guid: int
865
866         """
867         rm = self.get_resource(guid)
868         return rm.stop()
869
870     def start(self, guid):
871         """ Starts the RM with guid 'guid'
872
873         Starting a RM means that the resource it controls will
874         begin taking part of the experiment.
875
876             :param guid: Guid of the RM
877             :type guid: int
878
879         """
880         rm = self.get_resource(guid)
881         return rm.start()
882
883     def get_start_time(self, guid):
884         """ Returns the start time of the RM as a timestamp """
885         rm = self.get_resource(guid)
886         return rm.start_time
887
888     def get_stop_time(self, guid):
889         """ Returns the stop time of the RM as a timestamp """
890         rm = self.get_resource(guid)
891         return rm.stop_time
892
893     def get_discover_time(self, guid):
894         """ Returns the discover time of the RM as a timestamp """
895         rm = self.get_resource(guid)
896         return rm.discover_time
897
898     def get_provision_time(self, guid):
899         """ Returns the provision time of the RM as a timestamp """
900         rm = self.get_resource(guid)
901         return rm.provision_time
902
903     def get_ready_time(self, guid):
904         """ Returns the deployment time of the RM as a timestamp """
905         rm = self.get_resource(guid)
906         return rm.ready_time
907
908     def get_release_time(self, guid):
909         """ Returns the release time of the RM as a timestamp """
910         rm = self.get_resource(guid)
911         return rm.release_time
912
913     def get_failed_time(self, guid):
914         """ Returns the time failure occured for the RM as a timestamp """
915         rm = self.get_resource(guid)
916         return rm.failed_time
917
918     def set_with_conditions(self, name, value, guids1, guids2, state,
919             time = None):
920         """ Modifies the value of attribute with name 'name' on all RMs 
921         on the guids1 list when time 'time' has elapsed since all 
922         elements in guids2 list have reached state 'state'.
923
924             :param name: Name of attribute to set in RM
925             :type name: string
926
927             :param value: Value of attribute to set in RM
928             :type name: string
929
930             :param guids1: List of guids of RMs subjected to action
931             :type guids1: list
932
933             :param action: Action to register (either START or STOP)
934             :type action: ResourceAction
935
936             :param guids2: List of guids of RMs to we waited for
937             :type guids2: list
938
939             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
940             :type state: ResourceState
941
942             :param time: Time to wait after guids2 has reached status 
943             :type time: string
944
945         """
946         if isinstance(guids1, int):
947             guids1 = [guids1]
948         if isinstance(guids2, int):
949             guids2 = [guids2]
950
951         for guid1 in guids1:
952             rm = self.get_resource(guid)
953             rm.set_with_conditions(name, value, guids2, state, time)
954
955     def deploy(self, guids = None, wait_all_ready = True, group = None):
956         """ Deploys all ResourceManagers in the guids list. 
957         
958         If the argument 'guids' is not given, all RMs with state NEW
959         are deployed.
960
961             :param guids: List of guids of RMs to deploy
962             :type guids: list
963
964             :param wait_all_ready: Wait until all RMs are ready in
965                 order to start the RMs
966             :type guid: int
967
968             :param group: Id of deployment group in which to deploy RMs
969             :type group: int
970
971         """
972         self.logger.debug(" ------- DEPLOY START ------ ")
973
974         if not guids:
975             # If no guids list was passed, all 'NEW' RMs will be deployed
976             guids = []
977             for guid, rm in self._resources.iteritems():
978                 if rm.state == ResourceState.NEW:
979                     guids.append(guid)
980                 
981         if isinstance(guids, int):
982             guids = [guids]
983
984         # Create deployment group
985         # New guids can be added to a same deployment group later on
986         new_group = False
987         if not group:
988             new_group = True
989             group = self._group_id_generator.next()
990
991         if group not in self._groups:
992             self._groups[group] = []
993
994         self._groups[group].extend(guids)
995
996         def wait_all_and_start(group):
997             # Function that checks if all resources are READY
998             # before scheduling a start_with_conditions for each RM
999             reschedule = False
1000             
1001             # Get all guids in group
1002             guids = self._groups[group]
1003
1004             for guid in guids:
1005                 if self.state(guid) < ResourceState.READY:
1006                     reschedule = True
1007                     break
1008
1009             if reschedule:
1010                 callback = functools.partial(wait_all_and_start, group)
1011                 self.schedule("1s", callback)
1012             else:
1013                 # If all resources are ready, we schedule the start
1014                 for guid in guids:
1015                     rm = self.get_resource(guid)
1016                     self.schedule("0s", rm.start_with_conditions)
1017
1018                     if rm.conditions.get(ResourceAction.STOP):
1019                         # Only if the RM has STOP conditions we
1020                         # schedule a stop. Otherwise the RM will stop immediately
1021                         self.schedule("0s", rm.stop_with_conditions)
1022
1023         if wait_all_ready and new_group:
1024             # Schedule a function to check that all resources are
1025             # READY, and only then schedule the start.
1026             # This aims at reducing the number of tasks looping in the 
1027             # scheduler. 
1028             # Instead of having many start tasks, we will have only one for 
1029             # the whole group.
1030             callback = functools.partial(wait_all_and_start, group)
1031             self.schedule("0s", callback)
1032
1033         for guid in guids:
1034             rm = self.get_resource(guid)
1035             rm.deployment_group = group
1036             self.schedule("0s", rm.deploy_with_conditions)
1037
1038             if not wait_all_ready:
1039                 self.schedule("0s", rm.start_with_conditions)
1040
1041                 if rm.conditions.get(ResourceAction.STOP):
1042                     # Only if the RM has STOP conditions we
1043                     # schedule a stop. Otherwise the RM will stop immediately
1044                     self.schedule("0s", rm.stop_with_conditions)
1045
1046     def release(self, guids = None):
1047         """ Releases all ResourceManagers in the guids list.
1048
1049         If the argument 'guids' is not given, all RMs registered
1050         in the experiment are released.
1051
1052             :param guids: List of RM guids
1053             :type guids: list
1054
1055         """
1056         if self._state == ECState.RELEASED:
1057             return 
1058
1059         if isinstance(guids, int):
1060             guids = [guids]
1061
1062         if not guids:
1063             guids = self.resources
1064
1065         for guid in guids:
1066             rm = self.get_resource(guid)
1067             self.schedule("0s", rm.release)
1068
1069         self.wait_released(guids)
1070
1071         if self.persist:
1072             self.save()
1073
1074         for guid in guids:
1075             if self.get(guid, "hardRelease"):
1076                 self.remove_resource(guid)\
1077
1078         # Mark the EC state as RELEASED
1079         self._state = ECState.RELEASED
1080         
1081     def shutdown(self):
1082         """ Releases all resources and stops the ExperimentController
1083
1084         """
1085         # If there was a major failure we can't exit gracefully
1086         if self._state == ECState.FAILED:
1087             raise RuntimeError("EC failure. Can not exit gracefully")
1088
1089         # Remove all pending tasks from the scheduler queue
1090         for tid in list(self._scheduler.pending):
1091             self._scheduler.remove(tid)
1092
1093         # Remove pending tasks from the workers queue
1094         self._runner.empty()
1095
1096         self.release()
1097
1098         # Mark the EC state as TERMINATED
1099         self._state = ECState.TERMINATED
1100
1101         # Stop processing thread
1102         self._stop = True
1103
1104         # Notify condition to wake up the processing thread
1105         self._notify()
1106         
1107         if self._thread.is_alive():
1108            self._thread.join()
1109
1110     def schedule(self, date, callback, track = False):
1111         """ Schedules a callback to be executed at time 'date'.
1112
1113             :param date: string containing execution time for the task.
1114                     It can be expressed as an absolute time, using
1115                     timestamp format, or as a relative time matching
1116                     ^\d+.\d+(h|m|s|ms|us)$
1117
1118             :param callback: code to be executed for the task. Must be a
1119                         Python function, and receives args and kwargs
1120                         as arguments.
1121
1122             :param track: if set to True, the task will be retrievable with
1123                     the get_task() method
1124
1125             :return : The Id of the task
1126             :rtype: int
1127             
1128         """
1129         timestamp = stabsformat(date)
1130         task = Task(timestamp, callback)
1131         task = self._scheduler.schedule(task)
1132
1133         if track:
1134             self._tasks[task.id] = task
1135
1136         # Notify condition to wake up the processing thread
1137         self._notify()
1138
1139         return task.id
1140      
1141     def _process(self):
1142         """ Process scheduled tasks.
1143
1144         .. note::
1145         
1146         Tasks are scheduled by invoking the schedule method with a target 
1147         callback and an execution time. 
1148         The schedule method creates a new Task object with that callback 
1149         and execution time, and pushes it into the '_scheduler' queue. 
1150         The execution time and the order of arrival of tasks are used 
1151         to order the tasks in the queue.
1152
1153         The _process method is executed in an independent thread held by 
1154         the ExperimentController for as long as the experiment is running.
1155         This method takes tasks from the '_scheduler' queue in a loop 
1156         and processes them in parallel using multithreading. 
1157         The environmental variable NEPI_NTHREADS can be used to control
1158         the number of threads used to process tasks. The default value is 
1159         50.
1160
1161         To execute tasks in parallel, a ParallelRunner (PR) object is used.
1162         This object keeps a pool of threads (workers), and a queue of tasks
1163         scheduled for 'immediate' execution. 
1164         
1165         On each iteration, the '_process' loop will take the next task that 
1166         is scheduled for 'future' execution from the '_scheduler' queue, 
1167         and if the execution time of that task is >= to the current time, 
1168         it will push that task into the PR for 'immediate execution'. 
1169         As soon as a worker is free, the PR will assign the next task to
1170         that worker.
1171
1172         Upon receiving a task to execute, each PR worker (thread) will 
1173         invoke the  _execute method of the EC, passing the task as 
1174         argument.         
1175         The _execute method will then invoke task.callback inside a 
1176         try/except block. If an exception is raised by the tasks.callback, 
1177         it will be trapped by the try block, logged to standard error 
1178         (usually the console), and the task will be marked as failed.
1179
1180         """
1181
1182         self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
1183         self._runner = ParallelRun(maxthreads = self.nthreads)
1184         self._runner.start()
1185
1186         while not self._stop:
1187             try:
1188                 self._cond.acquire()
1189
1190                 task = self._scheduler.next()
1191                 
1192                 if not task:
1193                     # No task to execute. Wait for a new task to be scheduled.
1194                     self._cond.wait()
1195                 else:
1196                     # The task timestamp is in the future. Wait for timeout 
1197                     # or until another task is scheduled.
1198                     now = tnow()
1199                     if now < task.timestamp:
1200                         # Calculate timeout in seconds
1201                         timeout = tdiffsec(task.timestamp, now)
1202
1203                         # Re-schedule task with the same timestamp
1204                         self._scheduler.schedule(task)
1205                         
1206                         task = None
1207
1208                         # Wait timeout or until a new task awakes the condition
1209                         self._cond.wait(timeout)
1210                
1211                 self._cond.release()
1212
1213                 if task:
1214                     # Process tasks in parallel
1215                     self._runner.put(self._execute, task)
1216             except: 
1217                 import traceback
1218                 err = traceback.format_exc()
1219                 self.logger.error("Error while processing tasks in the EC: %s" % err)
1220
1221                 # Set the EC to FAILED state 
1222                 self._state = ECState.FAILED
1223             
1224                 # Set the FailureManager failure level to EC failure
1225                 self._fm.set_ec_failure()
1226
1227         self.logger.debug("Exiting the task processing loop ... ")
1228         
1229         self._runner.sync()
1230         self._runner.destroy()
1231
1232     def _execute(self, task):
1233         """ Executes a single task. 
1234
1235             :param task: Object containing the callback to execute
1236             :type task: Task
1237
1238         """
1239         try:
1240             # Invoke callback
1241             task.result = task.callback()
1242             task.status = TaskStatus.DONE
1243         except:
1244             import traceback
1245             err = traceback.format_exc()
1246             task.result = err
1247             task.status = TaskStatus.ERROR
1248             
1249             self.logger.error("Error occurred while executing task: %s" % err)
1250
1251     def _notify(self):
1252         """ Awakes the processing thread if it is blocked waiting 
1253         for new tasks to arrive
1254         
1255         """
1256         self._cond.acquire()
1257         self._cond.notify()
1258         self._cond.release()
1259
1260     def _build_from_netgraph(self, add_node_callback, add_edge_callback, 
1261             **kwargs):
1262         """ Automates experiment description using a NetGraph instance.
1263         """
1264         self._netgraph = NetGraph(**kwargs)
1265
1266         if add_node_callback:
1267             ### Add resources to the EC
1268             for nid in self.netgraph.nodes():
1269                 add_node_callback(self, nid)
1270
1271         if add_edge_callback:
1272             #### Add connections between resources
1273             for nid1, nid2 in self.netgraph.edges():
1274                 add_edge_callback(self, nid1, nid2)
1275