LinuxApplication: Changed directory structure to store experiment files in the Linux...
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 import functools
21 import logging
22 import os
23 import random
24 import sys
25 import time
26 import threading
27
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32         ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
35
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
38 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
39
40 class ECState(object):
41     """ State of the Experiment Controller
42    
43     """
44     RUNNING = 1
45     FAILED = 2
46     TERMINATED = 3
47
48 class ExperimentController(object):
49     """
50     .. class:: Class Args :
51       
52         :param exp_id: Human readable identifier for the experiment scenario. 
53                        It will be used in the name of the directory 
54                         where experiment related information is stored
55         :type exp_id: str
56
57     .. note::
58
59         An experiment, or scenario, is defined by a concrete use, behavior,
60         configuration and interconnection of resources that describe a single
61         experiment case (We call this the experiment description). 
62         A same experiment (scenario) can be run many times.
63
64         The ExperimentController (EC), is the entity responsible for 
65         managing an experiment instance (run). The same scenario can be 
66         recreated (and re-run) by instantiating an EC and recreating 
67         the same experiment description. 
68
69         In NEPI, an experiment is represented as a graph of interconnected
70         resources. A resource is a generic concept in the sense that any
71         component taking part of an experiment, whether physical of
72         virtual, is considered a resource. A resources could be a host, 
73         a virtual machine, an application, a simulator, a IP address.
74
75         A ResourceManager (RM), is the entity responsible for managing a 
76         single resource. ResourceManagers are specific to a resource
77         type (i.e. An RM to control a Linux application will not be
78         the same as the RM used to control a ns-3 simulation).
79         In order for a new type of resource to be supported in NEPI
80         a new RM must be implemented. NEPI already provides different
81         RMs to control basic resources, and new can be extended from
82         the existing ones.
83
84         Through the EC interface the user can create ResourceManagers (RMs),
85         configure them and interconnect them, in order to describe an experiment.
86         Describing an experiment through the EC does not run the experiment.
87         Only when the 'deploy()' method is invoked on the EC, will the EC take 
88         actions to transform the 'described' experiment into a 'running' experiment.
89
90         While the experiment is running, it is possible to continue to
91         create/configure/connect RMs, and to deploy them to involve new
92         resources in the experiment (this is known as 'interactive' deployment).
93         
94         An experiments in NEPI is identified by a string id, 
95         which is either given by the user, or automatically generated by NEPI.  
96         The purpose of this identifier is to separate files and results that 
97         belong to different experiment scenarios. 
98         However, since a same 'experiment' can be run many times, the experiment
99         id is not enough to identify an experiment instance (run).
100         For this reason, the ExperimentController has two identifier, the 
101         exp_id, which can be re-used by different ExperimentController instances,
102         and the run_id, which unique to a ExperimentController instance, and
103         is automatically generated by NEPI.
104         
105     """
106
107     def __init__(self, exp_id = None): 
108         super(ExperimentController, self).__init__()
109         # root directory to store files
110
111         # Run identifier. It identifies a concrete instance (run) of an experiment.
112         # Since a same experiment (same configuration) can be run many times,
113         # this id permits to identify concrete exoeriment run
114         self._run_id = tsformat()
115
116         # Experiment identifier. Usually assigned by the user
117         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
118
119         # generator of globally unique ids
120         self._guid_generator = guid.GuidGenerator()
121         
122         # Resource managers
123         self._resources = dict()
124
125         # Scheduler
126         self._scheduler = HeapScheduler()
127
128         # Tasks
129         self._tasks = dict()
130
131         # Event processing thread
132         self._cond = threading.Condition()
133         self._thread = threading.Thread(target = self._process)
134         self._thread.setDaemon(True)
135         self._thread.start()
136
137         # EC state
138         self._state = ECState.RUNNING
139
140         # Logging
141         self._logger = logging.getLogger("ExperimentController")
142
143     @property
144     def logger(self):
145         """ Return the logger of the Experiment Controller
146
147         """
148         return self._logger
149
150     @property
151     def ecstate(self):
152         """ Return the state of the Experiment Controller
153
154         """
155         return self._state
156
157     @property
158     def exp_id(self):
159         """ Return the experiment id assigned by the user
160
161         """
162         return self._exp_id
163
164     @property
165     def run_id(self):
166         """ Return the experiment instance (run) identifier  
167
168         """
169         return self._run_id
170
171     @property
172     def finished(self):
173         """ Put the state of the Experiment Controller into a final state :
174             Either TERMINATED or FAILED
175
176         """
177         return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
178
179     def wait_finished(self, guids):
180         """ Blocking method that wait until all the RM from the 'guid' list 
181             reached the state FINISHED
182
183         :param guids: List of guids
184         :type guids: list
185         """
186         return self.wait(guids)
187
188     def wait_started(self, guids):
189         """ Blocking method that wait until all the RM from the 'guid' list 
190             reached the state STARTED
191
192         :param guids: List of guids
193         :type guids: list
194         """
195         return self.wait(guids, states = [ResourceState.STARTED,
196             ResourceState.STOPPED,
197             ResourceState.FINISHED])
198
199     def wait(self, guids, states = [ResourceState.FINISHED, 
200         ResourceState.STOPPED]):
201         """ Blocking method that waits until all the RM from the 'guid' list 
202             reached state 'state' or until a failure occurs
203             
204         :param guids: List of guids
205         :type guids: list
206         """
207         if isinstance(guids, int):
208             guids = [guids]
209
210         while not all([self.state(guid) in states for guid in guids]) and \
211                 not any([self.state(guid) in [
212                         ResourceState.FAILED] for guid in guids]) and \
213                 not self.finished:
214             # debug logging
215             waited = ""
216             for guid in guids:
217                 waited += "guid %d - %s \n" % (guid, self.state(guid, hr = True))
218             self.logger.debug(" WAITING FOR %s " % waited )
219             
220             # We keep the sleep big to decrease the number of RM state queries
221             time.sleep(2)
222    
223     def get_task(self, tid):
224         """ Get a specific task
225
226         :param tid: Id of the task
227         :type tid: int
228         :rtype: Task
229         """
230         return self._tasks.get(tid)
231
232     def get_resource(self, guid):
233         """ Get a specific Resource Manager
234
235         :param guid: Id of the task
236         :type guid: int
237         :rtype: ResourceManager
238         """
239         return self._resources.get(guid)
240
241     @property
242     def resources(self):
243         """ Returns the list of all the Resource Manager Id
244
245         :rtype: set
246
247         """
248         return self._resources.keys()
249
250     def register_resource(self, rtype, guid = None):
251         """ Register a Resource Manager. It creates a new 'guid', if it is not specified, 
252         for the RM of type 'rtype' and add it to the list of Resources.
253
254         :param rtype: Type of the RM
255         :type rtype: str
256         :return: Id of the RM
257         :rtype: int
258         """
259         # Get next available guid
260         guid = self._guid_generator.next(guid)
261         
262         # Instantiate RM
263         rm = ResourceFactory.create(rtype, self, guid)
264
265         # Store RM
266         self._resources[guid] = rm
267
268         return guid
269
270     def get_attributes(self, guid):
271         """ Return all the attibutes of a specific RM
272
273         :param guid: Guid of the RM
274         :type guid: int
275         :return: List of attributes
276         :rtype: list
277         """
278         rm = self.get_resource(guid)
279         return rm.get_attributes()
280
281     def register_connection(self, guid1, guid2):
282         """ Registers a guid1 with a guid2. 
283             The declaration order is not important
284
285             :param guid1: First guid to connect
286             :type guid1: ResourceManager
287
288             :param guid2: Second guid to connect
289             :type guid: ResourceManager
290         """
291         rm1 = self.get_resource(guid1)
292         rm2 = self.get_resource(guid2)
293
294         rm1.register_connection(guid2)
295         rm2.register_connection(guid1)
296
297     def register_condition(self, group1, action, group2, state,
298             time = None):
299         """ Registers an action START or STOP for all RM on group1 to occur 
300             time 'time' after all elements in group2 reached state 'state'.
301
302             :param group1: List of guids of RMs subjected to action
303             :type group1: list
304
305             :param action: Action to register (either START or STOP)
306             :type action: ResourceAction
307
308             :param group2: List of guids of RMs to we waited for
309             :type group2: list
310
311             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
312             :type state: ResourceState
313
314             :param time: Time to wait after group2 has reached status 
315             :type time: string
316
317         """
318         if isinstance(group1, int):
319             group1 = [group1]
320         if isinstance(group2, int):
321             group2 = [group2]
322
323         for guid1 in group1:
324             rm = self.get_resource(guid1)
325             rm.register_condition(action, group2, state, time)
326
327     def register_trace(self, guid, name):
328         """ Enable trace
329
330         :param name: Name of the trace
331         :type name: str
332         """
333         rm = self.get_resource(guid)
334         rm.register_trace(name)
335
336     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
337         """ Get information on collected trace
338
339         :param name: Name of the trace
340         :type name: str
341
342         :param attr: Can be one of:
343                          - TraceAttr.ALL (complete trace content), 
344                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
345                          - TraceAttr.PATH (full path to the trace file),
346                          - TraceAttr.SIZE (size of trace file). 
347         :type attr: str
348
349         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
350         :type name: int
351
352         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
353         :type name: int
354
355         :rtype: str
356         """
357         rm = self.get_resource(guid)
358         return rm.trace(name, attr, block, offset)
359
360     def discover(self, guid):
361         """ Discover a specific RM defined by its 'guid'
362
363             :param guid: Guid of the RM
364             :type guid: int
365
366         """
367         rm = self.get_resource(guid)
368         return rm.discover()
369
370     def provision(self, guid):
371         """ Provision a specific RM defined by its 'guid'
372
373             :param guid: Guid of the RM
374             :type guid: int
375
376         """
377         rm = self.get_resource(guid)
378         return rm.provision()
379
380     def get(self, guid, name):
381         """ Get a specific attribute 'name' from the RM 'guid'
382
383             :param guid: Guid of the RM
384             :type guid: int
385
386             :param name: attribute's name
387             :type name: str
388
389         """
390         rm = self.get_resource(guid)
391         return rm.get(name)
392
393     def set(self, guid, name, value):
394         """ Set a specific attribute 'name' from the RM 'guid' 
395             with the value 'value' 
396
397             :param guid: Guid of the RM
398             :type guid: int
399
400             :param name: attribute's name
401             :type name: str
402
403             :param value: attribute's value
404
405         """
406         rm = self.get_resource(guid)
407         return rm.set(name, value)
408
409     def state(self, guid, hr = False):
410         """ Returns the state of a resource
411
412             :param guid: Resource guid
413             :type guid: integer
414
415             :param hr: Human readable. Forces return of a 
416                 status string instead of a number 
417             :type hr: boolean
418
419         """
420         rm = self.get_resource(guid)
421         state = rm.state
422
423         if hr:
424             return ResourceState2str.get(state)
425
426         return state
427
428     def stop(self, guid):
429         """ Stop a specific RM defined by its 'guid'
430
431             :param guid: Guid of the RM
432             :type guid: int
433
434         """
435         rm = self.get_resource(guid)
436         return rm.stop()
437
438     def start(self, guid):
439         """ Start a specific RM defined by its 'guid'
440
441             :param guid: Guid of the RM
442             :type guid: int
443
444         """
445         rm = self.get_resource(guid)
446         return rm.start()
447
448     def set_with_conditions(self, name, value, group1, group2, state,
449             time = None):
450         """ Set value 'value' on attribute with name 'name' on all RMs of
451             group1 when 'time' has elapsed since all elements in group2 
452             have reached state 'state'.
453
454             :param name: Name of attribute to set in RM
455             :type name: string
456
457             :param value: Value of attribute to set in RM
458             :type name: string
459
460             :param group1: List of guids of RMs subjected to action
461             :type group1: list
462
463             :param action: Action to register (either START or STOP)
464             :type action: ResourceAction
465
466             :param group2: List of guids of RMs to we waited for
467             :type group2: list
468
469             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
470             :type state: ResourceState
471
472             :param time: Time to wait after group2 has reached status 
473             :type time: string
474
475         """
476         if isinstance(group1, int):
477             group1 = [group1]
478         if isinstance(group2, int):
479             group2 = [group2]
480
481         for guid1 in group1:
482             rm = self.get_resource(guid)
483             rm.set_with_conditions(name, value, group2, state, time)
484
485     def stop_with_conditions(self, guid):
486         """ Stop a specific RM defined by its 'guid' only if all the conditions are true
487
488             :param guid: Guid of the RM
489             :type guid: int
490
491         """
492         rm = self.get_resource(guid)
493         return rm.stop_with_conditions()
494
495     def start_with_conditions(self, guid):
496         """ Start a specific RM defined by its 'guid' only if all the conditions are true
497
498             :param guid: Guid of the RM
499             :type guid: int
500
501         """
502         rm = self.get_resource(guid)
503         return rm.start_with_condition()
504
505     def deploy(self, group = None, wait_all_ready = True):
506         """ Deploy all resource manager in group
507
508         :param group: List of guids of RMs to deploy
509         :type group: list
510
511         :param wait_all_ready: Wait until all RMs are ready in
512             order to start the RMs
513         :type guid: int
514
515         """
516         self.logger.debug(" ------- DEPLOY START ------ ")
517
518         if not group:
519             # By default, if not deployment group is indicated, 
520             # all RMs that are undeployed will be deployed
521             group = []
522             for guid in self.resources:
523                 if self.state(guid) == ResourceState.NEW:
524                     group.append(guid)
525                 
526         if isinstance(group, int):
527             group = [group]
528
529         # Before starting deployment we disorder the group list with the
530         # purpose of speeding up the whole deployment process.
531         # It is likely that the user inserted in the 'group' list closely
532         # resources one after another (e.g. all applications
533         # connected to the same node can likely appear one after another).
534         # This can originate a slow down in the deployment since the N 
535         # threads the parallel runner uses to processes tasks may all
536         # be taken up by the same family of resources waiting for the 
537         # same conditions (e.g. LinuxApplications running on a same 
538         # node share a single lock, so they will tend to be serialized).
539         # If we disorder the group list, this problem can be mitigated.
540         random.shuffle(group)
541
542         def wait_all_and_start(group):
543             reschedule = False
544             for guid in group:
545                 if self.state(guid) < ResourceState.READY:
546                     reschedule = True
547                     break
548
549             if reschedule:
550                 callback = functools.partial(wait_all_and_start, group)
551                 self.schedule("1s", callback)
552             else:
553                 # If all resources are read, we schedule the start
554                 for guid in group:
555                     rm = self.get_resource(guid)
556                     self.schedule("0s", rm.start_with_conditions)
557
558         if wait_all_ready:
559             # Schedule the function that will check all resources are
560             # READY, and only then it will schedule the start.
561             # This is aimed to reduce the number of tasks looping in the scheduler.
562             # Intead of having N start tasks, we will have only one
563             callback = functools.partial(wait_all_and_start, group)
564             self.schedule("1s", callback)
565
566         for guid in group:
567             rm = self.get_resource(guid)
568             self.schedule("0s", rm.deploy)
569
570             if not wait_all_ready:
571                 self.schedule("1s", rm.start_with_conditions)
572
573             if rm.conditions.get(ResourceAction.STOP):
574                 # Only if the RM has STOP conditions we
575                 # schedule a stop. Otherwise the RM will stop immediately
576                 self.schedule("2s", rm.stop_with_conditions)
577
578     def release(self, group = None):
579         """ Release the elements of the list 'group' or 
580         all the resources if any group is specified
581
582             :param group: List of RM
583             :type group: list
584
585         """
586         if not group:
587             group = self.resources
588
589         threads = []
590         for guid in group:
591             rm = self.get_resource(guid)
592             thread = threading.Thread(target=rm.release)
593             threads.append(thread)
594             thread.setDaemon(True)
595             thread.start()
596
597         while list(threads) and not self.finished:
598             thread = threads[0]
599             # Time out after 5 seconds to check EC not terminated
600             thread.join(5)
601             if not thread.is_alive():
602                 threads.remove(thread)
603         
604     def shutdown(self):
605         """ Shutdown the Experiment Controller. 
606         Releases all the resources and stops task processing thread
607
608         """
609         self.release()
610
611         # Mark the EC state as TERMINATED
612         self._state = ECState.TERMINATED
613
614         # Notify condition to wake up the processing thread
615         self._notify()
616         
617         if self._thread.is_alive():
618            self._thread.join()
619
620     def schedule(self, date, callback, track = False):
621         """ Schedule a callback to be executed at time date.
622
623             :param date: string containing execution time for the task.
624                     It can be expressed as an absolute time, using
625                     timestamp format, or as a relative time matching
626                     ^\d+.\d+(h|m|s|ms|us)$
627
628             :param callback: code to be executed for the task. Must be a
629                         Python function, and receives args and kwargs
630                         as arguments.
631
632             :param track: if set to True, the task will be retrivable with
633                     the get_task() method
634
635             :return : The Id of the task
636         """
637         timestamp = stabsformat(date)
638         task = Task(timestamp, callback)
639         task = self._scheduler.schedule(task)
640
641         if track:
642             self._tasks[task.id] = task
643
644         # Notify condition to wake up the processing thread
645         self._notify()
646
647         return task.id
648      
649     def _process(self):
650         """ Process scheduled tasks.
651
652         .. note::
653
654         The _process method is executed in an independent thread held by the 
655         ExperimentController for as long as the experiment is running.
656         
657         Tasks are scheduled by invoking the schedule method with a target callback. 
658         The schedule method is given a execution time which controls the
659         order in which tasks are processed. 
660
661         Tasks are processed in parallel using multithreading. 
662         The environmental variable NEPI_NTHREADS can be used to control
663         the number of threads used to process tasks. The default value is 50.
664
665         Exception handling:
666
667         To execute tasks in parallel, an ParallelRunner (PR) object, holding
668         a pool of threads (workers), is used.
669         For each available thread in the PR, the next task popped from 
670         the scheduler queue is 'put' in the PR.
671         Upon receiving a task to execute, each PR worker (thread) invokes the 
672         _execute method of the EC, passing the task as argument. 
673         This method, calls task.callback inside a try/except block. If an 
674         exception is raised by the tasks.callback, it will be trapped by the 
675         try block, logged to standard error (usually the console), and the EC 
676         state will be set to ECState.FAILED.
677         The invocation of _notify immediately after, forces the processing
678         loop in the _process method, to wake up if it was blocked waiting for new 
679         tasks to arrived, and to check the EC state.
680         As the EC is in FAILED state, the processing loop exits and the 
681         'finally' block is invoked. In the 'finally' block, the 'sync' method
682         of the PR is invoked, which forces the PR to raise any unchecked errors
683         that might have been raised by the workers.
684
685         """
686         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
687
688         runner = ParallelRun(maxthreads = nthreads)
689         runner.start()
690
691         try:
692             while not self.finished:
693                 self._cond.acquire()
694
695                 task = self._scheduler.next()
696                 
697                 if not task:
698                     # No task to execute. Wait for a new task to be scheduled.
699                     self._cond.wait()
700                 else:
701                     # The task timestamp is in the future. Wait for timeout 
702                     # or until another task is scheduled.
703                     now = tnow()
704                     if now < task.timestamp:
705                         # Calculate timeout in seconds
706                         timeout = tdiffsec(task.timestamp, now)
707
708                         # Re-schedule task with the same timestamp
709                         self._scheduler.schedule(task)
710                         
711                         task = None
712
713                         # Wait timeout or until a new task awakes the condition
714                         self._cond.wait(timeout)
715                
716                 self._cond.release()
717
718                 if task:
719                     # Process tasks in parallel
720                     runner.put(self._execute, task)
721         except: 
722             import traceback
723             err = traceback.format_exc()
724             self.logger.error("Error while processing tasks in the EC: %s" % err)
725
726             self._state = ECState.FAILED
727         finally:   
728             self.logger.debug("Exiting the task processing loop ... ")
729             runner.sync()
730
731     def _execute(self, task):
732         """ Executes a single task. 
733
734             :param task: Object containing the callback to execute
735             :type task: Task
736
737         .. note::
738
739         If the invokation of the task callback raises an
740         exception, the processing thread of the ExperimentController
741         will be stopped and the experiment will be aborted.
742
743         """
744         # Invoke callback
745         task.status = TaskStatus.DONE
746
747         try:
748             task.result = task.callback()
749         except:
750             import traceback
751             err = traceback.format_exc()
752             task.result = err
753             task.status = TaskStatus.ERROR
754             
755             self.logger.error("Error occurred while executing task: %s" % err)
756
757             # Set the EC to FAILED state (this will force to exit the task
758             # processing thread)
759             self._state = ECState.FAILED
760
761             # Notify condition to wake up the processing thread
762             self._notify()
763
764             # Propage error to the ParallelRunner
765             raise
766
767     def _notify(self):
768         """ Awakes the processing thread in case it is blocked waiting
769         for a new task to be scheduled.
770         """
771         self._cond.acquire()
772         self._cond.notify()
773         self._cond.release()
774