c323e36f5ccd8071c4bc2e10984d9ca4168163a2
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 import functools
21 import logging
22 import os
23 import random
24 import sys
25 import time
26 import threading
27
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat 
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32         ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
35
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
38
39 class ECState(object):
40     """ State of the Experiment Controller
41    
42     """
43     RUNNING = 1
44     FAILED = 2
45     TERMINATED = 3
46
47 class ExperimentController(object):
48     """
49     .. class:: Class Args :
50       
51         :param exp_id: Human readable identifier for the experiment. 
52                         It will be used in the name of the directory 
53                         where experiment related information is stored
54         :type exp_id: int
55
56         :param root_dir: Root directory where experiment specific folder
57                          will be created to store experiment information
58         :type root_dir: str
59
60     .. note::
61         The ExperimentController (EC), is the entity responsible for 
62         managing a single experiment. 
63         Through the EC interface the user can create ResourceManagers (RMs),
64         configure them and interconnect them, in order to describe the experiment.
65         
66         Only when the 'deploy()' method is invoked, the EC will take actions
67         to transform the 'described' experiment into a 'running' experiment.
68
69         While the experiment is running, it is possible to continue to
70         create/configure/connect RMs, and to deploy them to involve new
71         resources in the experiment.
72
73     """
74
75     def __init__(self, exp_id = None, root_dir = "/tmp"): 
76         super(ExperimentController, self).__init__()
77         # root directory to store files
78         self._root_dir = root_dir
79
80         # experiment identifier given by the user
81         self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
82
83         # generator of globally unique ids
84         self._guid_generator = guid.GuidGenerator()
85         
86         # Resource managers
87         self._resources = dict()
88
89         # Scheduler
90         self._scheduler = HeapScheduler()
91
92         # Tasks
93         self._tasks = dict()
94
95         # Event processing thread
96         self._cond = threading.Condition()
97         self._thread = threading.Thread(target = self._process)
98         self._thread.setDaemon(True)
99         self._thread.start()
100
101         # EC state
102         self._state = ECState.RUNNING
103
104         # Logging
105         self._logger = logging.getLogger("ExperimentController")
106
107     @property
108     def logger(self):
109         """ Return the logger of the Experiment Controller
110
111         """
112         return self._logger
113
114     @property
115     def ecstate(self):
116         """ Return the state of the Experiment Controller
117
118         """
119         return self._state
120
121     @property
122     def exp_id(self):
123         """ Return the experiment ID
124
125         """
126         exp_id = self._exp_id
127         if not exp_id.startswith("nepi-"):
128             exp_id = "nepi-" + exp_id
129         return exp_id
130
131     @property
132     def finished(self):
133         """ Put the state of the Experiment Controller into a final state :
134             Either TERMINATED or FAILED
135
136         """
137         return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
138
139     def wait_finished(self, guids):
140         """ Blocking method that wait until all the RM from the 'guid' list 
141             reached the state FINISHED
142
143         :param guids: List of guids
144         :type guids: list
145         """
146         return self.wait(guids)
147
148     def wait_started(self, guids):
149         """ Blocking method that wait until all the RM from the 'guid' list 
150             reached the state STARTED
151
152         :param guids: List of guids
153         :type guids: list
154         """
155         return self.wait(guids, states = [ResourceState.STARTED,
156             ResourceState.STOPPED,
157             ResourceState.FINISHED])
158
159     def wait(self, guids, states = [ResourceState.FINISHED, 
160         ResourceState.STOPPED]):
161         """ Blocking method that waits until all the RM from the 'guid' list 
162             reached state 'state' or until a failure occurs
163             
164         :param guids: List of guids
165         :type guids: list
166         """
167         if isinstance(guids, int):
168             guids = [guids]
169
170         while not all([self.state(guid) in states for guid in guids]) and \
171                 not any([self.state(guid) in [
172                         ResourceState.FAILED] for guid in guids]) and \
173                 not self.finished:
174             # debug logging
175             waited = ""
176             for guid in guids:
177                 waited += "guid %d - %s \n" % (guid, self.state(guid, hr = True))
178             self.logger.debug(" WAITING FOR %s " % waited )
179             
180             # We keep the sleep big to decrease the number of RM state queries
181             time.sleep(2)
182    
183     def get_task(self, tid):
184         """ Get a specific task
185
186         :param tid: Id of the task
187         :type tid: int
188         :rtype: Task
189         """
190         return self._tasks.get(tid)
191
192     def get_resource(self, guid):
193         """ Get a specific Resource Manager
194
195         :param guid: Id of the task
196         :type guid: int
197         :rtype: ResourceManager
198         """
199         return self._resources.get(guid)
200
201     @property
202     def resources(self):
203         """ Returns the list of all the Resource Manager Id
204
205         :rtype: set
206
207         """
208         return self._resources.keys()
209
210     def register_resource(self, rtype, guid = None):
211         """ Register a Resource Manager. It creates a new 'guid', if it is not specified, 
212         for the RM of type 'rtype' and add it to the list of Resources.
213
214         :param rtype: Type of the RM
215         :type rtype: str
216         :return: Id of the RM
217         :rtype: int
218         """
219         # Get next available guid
220         guid = self._guid_generator.next(guid)
221         
222         # Instantiate RM
223         rm = ResourceFactory.create(rtype, self, guid)
224
225         # Store RM
226         self._resources[guid] = rm
227
228         return guid
229
230     def get_attributes(self, guid):
231         """ Return all the attibutes of a specific RM
232
233         :param guid: Guid of the RM
234         :type guid: int
235         :return: List of attributes
236         :rtype: list
237         """
238         rm = self.get_resource(guid)
239         return rm.get_attributes()
240
241     def register_connection(self, guid1, guid2):
242         """ Registers a guid1 with a guid2. 
243             The declaration order is not important
244
245             :param guid1: First guid to connect
246             :type guid1: ResourceManager
247
248             :param guid2: Second guid to connect
249             :type guid: ResourceManager
250         """
251         rm1 = self.get_resource(guid1)
252         rm2 = self.get_resource(guid2)
253
254         rm1.register_connection(guid2)
255         rm2.register_connection(guid1)
256
257     def register_condition(self, group1, action, group2, state,
258             time = None):
259         """ Registers an action START or STOP for all RM on group1 to occur 
260             time 'time' after all elements in group2 reached state 'state'.
261
262             :param group1: List of guids of RMs subjected to action
263             :type group1: list
264
265             :param action: Action to register (either START or STOP)
266             :type action: ResourceAction
267
268             :param group2: List of guids of RMs to we waited for
269             :type group2: list
270
271             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
272             :type state: ResourceState
273
274             :param time: Time to wait after group2 has reached status 
275             :type time: string
276
277         """
278         if isinstance(group1, int):
279             group1 = [group1]
280         if isinstance(group2, int):
281             group2 = [group2]
282
283         for guid1 in group1:
284             rm = self.get_resource(guid1)
285             rm.register_condition(action, group2, state, time)
286
287     def register_trace(self, guid, name):
288         """ Enable trace
289
290         :param name: Name of the trace
291         :type name: str
292         """
293         rm = self.get_resource(guid)
294         rm.register_trace(name)
295
296     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
297         """ Get information on collected trace
298
299         :param name: Name of the trace
300         :type name: str
301
302         :param attr: Can be one of:
303                          - TraceAttr.ALL (complete trace content), 
304                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
305                          - TraceAttr.PATH (full path to the trace file),
306                          - TraceAttr.SIZE (size of trace file). 
307         :type attr: str
308
309         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
310         :type name: int
311
312         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
313         :type name: int
314
315         :rtype: str
316         """
317         rm = self.get_resource(guid)
318         return rm.trace(name, attr, block, offset)
319
320     def discover(self, guid):
321         """ Discover a specific RM defined by its 'guid'
322
323             :param guid: Guid of the RM
324             :type guid: int
325
326         """
327         rm = self.get_resource(guid)
328         return rm.discover()
329
330     def provision(self, guid):
331         """ Provision a specific RM defined by its 'guid'
332
333             :param guid: Guid of the RM
334             :type guid: int
335
336         """
337         rm = self.get_resource(guid)
338         return rm.provision()
339
340     def get(self, guid, name):
341         """ Get a specific attribute 'name' from the RM 'guid'
342
343             :param guid: Guid of the RM
344             :type guid: int
345
346             :param name: attribute's name
347             :type name: str
348
349         """
350         rm = self.get_resource(guid)
351         return rm.get(name)
352
353     def set(self, guid, name, value):
354         """ Set a specific attribute 'name' from the RM 'guid' 
355             with the value 'value' 
356
357             :param guid: Guid of the RM
358             :type guid: int
359
360             :param name: attribute's name
361             :type name: str
362
363             :param value: attribute's value
364
365         """
366         rm = self.get_resource(guid)
367         return rm.set(name, value)
368
369     def state(self, guid, hr = False):
370         """ Returns the state of a resource
371
372             :param guid: Resource guid
373             :type guid: integer
374
375             :param hr: Human readable. Forces return of a 
376                 status string instead of a number 
377             :type hr: boolean
378
379         """
380         rm = self.get_resource(guid)
381         state = rm.state
382
383         if hr:
384             return ResourceState2str.get(state)
385
386         return state
387
388     def stop(self, guid):
389         """ Stop a specific RM defined by its 'guid'
390
391             :param guid: Guid of the RM
392             :type guid: int
393
394         """
395         rm = self.get_resource(guid)
396         return rm.stop()
397
398     def start(self, guid):
399         """ Start a specific RM defined by its 'guid'
400
401             :param guid: Guid of the RM
402             :type guid: int
403
404         """
405         rm = self.get_resource(guid)
406         return rm.start()
407
408     def set_with_conditions(self, name, value, group1, group2, state,
409             time = None):
410         """ Set value 'value' on attribute with name 'name' on all RMs of
411             group1 when 'time' has elapsed since all elements in group2 
412             have reached state 'state'.
413
414             :param name: Name of attribute to set in RM
415             :type name: string
416
417             :param value: Value of attribute to set in RM
418             :type name: string
419
420             :param group1: List of guids of RMs subjected to action
421             :type group1: list
422
423             :param action: Action to register (either START or STOP)
424             :type action: ResourceAction
425
426             :param group2: List of guids of RMs to we waited for
427             :type group2: list
428
429             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
430             :type state: ResourceState
431
432             :param time: Time to wait after group2 has reached status 
433             :type time: string
434
435         """
436         if isinstance(group1, int):
437             group1 = [group1]
438         if isinstance(group2, int):
439             group2 = [group2]
440
441         for guid1 in group1:
442             rm = self.get_resource(guid)
443             rm.set_with_conditions(name, value, group2, state, time)
444
445     def stop_with_conditions(self, guid):
446         """ Stop a specific RM defined by its 'guid' only if all the conditions are true
447
448             :param guid: Guid of the RM
449             :type guid: int
450
451         """
452         rm = self.get_resource(guid)
453         return rm.stop_with_conditions()
454
455     def start_with_conditions(self, guid):
456         """ Start a specific RM defined by its 'guid' only if all the conditions are true
457
458             :param guid: Guid of the RM
459             :type guid: int
460
461         """
462         rm = self.get_resource(guid)
463         return rm.start_with_condition()
464
465     def deploy(self, group = None, wait_all_ready = True):
466         """ Deploy all resource manager in group
467
468         :param group: List of guids of RMs to deploy
469         :type group: list
470
471         :param wait_all_ready: Wait until all RMs are ready in
472             order to start the RMs
473         :type guid: int
474
475         """
476         self.logger.debug(" ------- DEPLOY START ------ ")
477
478         if not group:
479             # By default, if not deployment group is indicated, 
480             # all RMs that are undeployed will be deployed
481             group = []
482             for guid in self.resources:
483                 if self.state(guid) == ResourceState.NEW:
484                     group.append(guid)
485                 
486         if isinstance(group, int):
487             group = [group]
488
489         # Before starting deployment we disorder the group list with the
490         # purpose of speeding up the whole deployment process.
491         # It is likely that the user inserted in the 'group' list closely
492         # resources one after another (e.g. all applications
493         # connected to the same node can likely appear one after another).
494         # This can originate a slow down in the deployment since the N 
495         # threads the parallel runner uses to processes tasks may all
496         # be taken up by the same family of resources waiting for the 
497         # same conditions (e.g. LinuxApplications running on a same 
498         # node share a single lock, so they will tend to be serialized).
499         # If we disorder the group list, this problem can be mitigated.
500         random.shuffle(group)
501
502         def wait_all_and_start(group):
503             reschedule = False
504             for guid in group:
505                 if self.state(guid) < ResourceState.READY:
506                     reschedule = True
507                     break
508
509             if reschedule:
510                 callback = functools.partial(wait_all_and_start, group)
511                 self.schedule("1s", callback)
512             else:
513                 # If all resources are read, we schedule the start
514                 for guid in group:
515                     rm = self.get_resource(guid)
516                     self.schedule("0s", rm.start_with_conditions)
517
518         if wait_all_ready:
519             # Schedule the function that will check all resources are
520             # READY, and only then it will schedule the start.
521             # This is aimed to reduce the number of tasks looping in the scheduler.
522             # Intead of having N start tasks, we will have only one
523             callback = functools.partial(wait_all_and_start, group)
524             self.schedule("1s", callback)
525
526         for guid in group:
527             rm = self.get_resource(guid)
528             self.schedule("0s", rm.deploy)
529
530             if not wait_all_ready:
531                 self.schedule("1s", rm.start_with_conditions)
532
533             if rm.conditions.get(ResourceAction.STOP):
534                 # Only if the RM has STOP conditions we
535                 # schedule a stop. Otherwise the RM will stop immediately
536                 self.schedule("2s", rm.stop_with_conditions)
537
538     def release(self, group = None):
539         """ Release the elements of the list 'group' or 
540         all the resources if any group is specified
541
542             :param group: List of RM
543             :type group: list
544
545         """
546         if not group:
547             group = self.resources
548
549         threads = []
550         for guid in group:
551             rm = self.get_resource(guid)
552             thread = threading.Thread(target=rm.release)
553             threads.append(thread)
554             thread.setDaemon(True)
555             thread.start()
556
557         while list(threads) and not self.finished:
558             thread = threads[0]
559             # Time out after 5 seconds to check EC not terminated
560             thread.join(5)
561             if not thread.is_alive():
562                 threads.remove(thread)
563         
564     def shutdown(self):
565         """ Shutdown the Experiment Controller. 
566         Releases all the resources and stops task processing thread
567
568         """
569         self.release()
570
571         # Mark the EC state as TERMINATED
572         self._state = ECState.TERMINATED
573
574         # Notify condition to wake up the processing thread
575         self._notify()
576         
577         if self._thread.is_alive():
578            self._thread.join()
579
580     def schedule(self, date, callback, track = False):
581         """ Schedule a callback to be executed at time date.
582
583             :param date: string containing execution time for the task.
584                     It can be expressed as an absolute time, using
585                     timestamp format, or as a relative time matching
586                     ^\d+.\d+(h|m|s|ms|us)$
587
588             :param callback: code to be executed for the task. Must be a
589                         Python function, and receives args and kwargs
590                         as arguments.
591
592             :param track: if set to True, the task will be retrivable with
593                     the get_task() method
594
595             :return : The Id of the task
596         """
597         timestamp = stabsformat(date)
598         task = Task(timestamp, callback)
599         task = self._scheduler.schedule(task)
600
601         if track:
602             self._tasks[task.id] = task
603
604         # Notify condition to wake up the processing thread
605         self._notify()
606
607         return task.id
608      
609     def _process(self):
610         """ Process scheduled tasks.
611
612         .. note::
613
614         The _process method is executed in an independent thread held by the 
615         ExperimentController for as long as the experiment is running.
616         
617         Tasks are scheduled by invoking the schedule method with a target callback. 
618         The schedule method is given a execution time which controls the
619         order in which tasks are processed. 
620
621         Tasks are processed in parallel using multithreading. 
622         The environmental variable NEPI_NTHREADS can be used to control
623         the number of threads used to process tasks. The default value is 50.
624
625         Exception handling:
626
627         To execute tasks in parallel, an ParallelRunner (PR) object, holding
628         a pool of threads (workers), is used.
629         For each available thread in the PR, the next task popped from 
630         the scheduler queue is 'put' in the PR.
631         Upon receiving a task to execute, each PR worker (thread) invokes the 
632         _execute method of the EC, passing the task as argument. 
633         This method, calls task.callback inside a try/except block. If an 
634         exception is raised by the tasks.callback, it will be trapped by the 
635         try block, logged to standard error (usually the console), and the EC 
636         state will be set to ECState.FAILED.
637         The invocation of _notify immediately after, forces the processing
638         loop in the _process method, to wake up if it was blocked waiting for new 
639         tasks to arrived, and to check the EC state.
640         As the EC is in FAILED state, the processing loop exits and the 
641         'finally' block is invoked. In the 'finally' block, the 'sync' method
642         of the PR is invoked, which forces the PR to raise any unchecked errors
643         that might have been raised by the workers.
644
645         """
646         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
647
648         runner = ParallelRun(maxthreads = nthreads)
649         runner.start()
650
651         try:
652             while not self.finished:
653                 self._cond.acquire()
654
655                 task = self._scheduler.next()
656                 
657                 if not task:
658                     # No task to execute. Wait for a new task to be scheduled.
659                     self._cond.wait()
660                 else:
661                     # The task timestamp is in the future. Wait for timeout 
662                     # or until another task is scheduled.
663                     now = tnow()
664                     if now < task.timestamp:
665                         # Calculate timeout in seconds
666                         timeout = tdiffsec(task.timestamp, now)
667
668                         # Re-schedule task with the same timestamp
669                         self._scheduler.schedule(task)
670                         
671                         task = None
672
673                         # Wait timeout or until a new task awakes the condition
674                         self._cond.wait(timeout)
675                
676                 self._cond.release()
677
678                 if task:
679                     # Process tasks in parallel
680                     runner.put(self._execute, task)
681         except: 
682             import traceback
683             err = traceback.format_exc()
684             self.logger.error("Error while processing tasks in the EC: %s" % err)
685
686             self._state = ECState.FAILED
687         finally:   
688             self.logger.debug("Exiting the task processing loop ... ")
689             runner.sync()
690
691     def _execute(self, task):
692         """ Executes a single task. 
693
694             :param task: Object containing the callback to execute
695             :type task: Task
696
697         .. note::
698
699         If the invokation of the task callback raises an
700         exception, the processing thread of the ExperimentController
701         will be stopped and the experiment will be aborted.
702
703         """
704         # Invoke callback
705         task.status = TaskStatus.DONE
706
707         try:
708             task.result = task.callback()
709         except:
710             import traceback
711             err = traceback.format_exc()
712             task.result = err
713             task.status = TaskStatus.ERROR
714             
715             self.logger.error("Error occurred while executing task: %s" % err)
716
717             # Set the EC to FAILED state (this will force to exit the task
718             # processing thread)
719             self._state = ECState.FAILED
720
721             # Notify condition to wake up the processing thread
722             self._notify()
723
724             # Propage error to the ParallelRunner
725             raise
726
727     def _notify(self):
728         """ Awakes the processing thread in case it is blocked waiting
729         for a new task to be scheduled.
730         """
731         self._cond.acquire()
732         self._cond.notify()
733         self._cond.release()
734