Documentation of the execution folder
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 import functools
21 import logging
22 import os
23 import random
24 import sys
25 import time
26 import threading
27
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid 
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32         ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
35
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
38
39 class ECState(object):
40     """ State of the Experiment Controller
41    
42     """
43     RUNNING = 1
44     FAILED = 2
45     TERMINATED = 3
46
47 class ExperimentController(object):
48     """
49     .. class:: Class Args :
50       
51         :param exp_id: Id of the experiment
52         :type exp_id: int
53         :param root_dir: Root directory of the experiment
54         :type root_dir: str
55
56     .. note::
57
58        This class is the only one used by the User. Indeed, the user "talks"
59        only with the Experiment Controller and this latter forward to 
60        the different Resources Manager the order provided by the user.
61
62     """
63
64     def __init__(self, exp_id = None, root_dir = "/tmp"): 
65         super(ExperimentController, self).__init__()
66         # root directory to store files
67         self._root_dir = root_dir
68
69         # experiment identifier given by the user
70         self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
71
72         # generator of globally unique ids
73         self._guid_generator = guid.GuidGenerator()
74         
75         # Resource managers
76         self._resources = dict()
77
78         # Scheduler
79         self._scheduler = HeapScheduler()
80
81         # Tasks
82         self._tasks = dict()
83
84         # Event processing thread
85         self._cond = threading.Condition()
86         self._thread = threading.Thread(target = self._process)
87         self._thread.setDaemon(True)
88         self._thread.start()
89
90         # EC state
91         self._state = ECState.RUNNING
92
93         # Logging
94         self._logger = logging.getLogger("ExperimentController")
95
96     @property
97     def logger(self):
98         """ Return the logger of the Experiment Controller
99
100         """
101         return self._logger
102
103     @property
104     def ecstate(self):
105         """ Return the state of the Experiment Controller
106
107         """
108         return self._state
109
110     @property
111     def exp_id(self):
112         """ Return the experiment ID
113
114         """
115         exp_id = self._exp_id
116         if not exp_id.startswith("nepi-"):
117             exp_id = "nepi-" + exp_id
118         return exp_id
119
120     @property
121     def finished(self):
122         """ Put the state of the Experiment Controller into a final state :
123             Either TERMINATED or FAILED
124
125         """
126         return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
127
128     def wait_finished(self, guids):
129         """ Blocking method that wait until all the RM from the 'guid' list 
130             reach the state FINISHED
131
132         :param guids: List of guids
133         :type guids: list
134         """
135         if isinstance(guids, int):
136             guids = [guids]
137
138         while not all([self.state(guid) in [ResourceState.FINISHED, 
139             ResourceState.STOPPED, 
140             ResourceState.FAILED] \
141                 for guid in guids]) and not self.finished:
142             # We keep the sleep as large as possible to 
143             # decrese the number of RM state requests
144             time.sleep(2)
145     
146     def get_task(self, tid):
147         """ Get a specific task
148
149         :param tid: Id of the task
150         :type tid: int
151         :rtype:  unknow
152         """
153         return self._tasks.get(tid)
154
155     def get_resource(self, guid):
156         """ Get a specific Resource Manager
157
158         :param guid: Id of the task
159         :type guid: int
160         :rtype:  ResourceManager
161         """
162         return self._resources.get(guid)
163
164     @property
165     def resources(self):
166         """ Returns the list of all the Resource Manager Id
167
168         :rtype:  set
169         """
170         return self._resources.keys()
171
172     def register_resource(self, rtype, guid = None):
173         """ Register a Resource Manager. It creates a new 'guid', if it is not specified, 
174         for the RM of type 'rtype' and add it to the list of Resources.
175
176         :param rtype: Type of the RM
177         :type rtype: str
178         :return : Id of the RM
179         :rtype:  int
180         """
181         # Get next available guid
182         guid = self._guid_generator.next(guid)
183         
184         # Instantiate RM
185         rm = ResourceFactory.create(rtype, self, guid)
186
187         # Store RM
188         self._resources[guid] = rm
189
190         return guid
191
192     def get_attributes(self, guid):
193         """ Return all the attibutes of a specific RM
194
195         :param guid: Guid of the RM
196         :type guid: int
197         :return : List of attributes
198         :rtype: list
199         """
200         rm = self.get_resource(guid)
201         return rm.get_attributes()
202
203     def register_connection(self, guid1, guid2):
204         """ Registers a guid1 with a guid2. 
205             The declaration order is not important
206
207             :param guid1: First guid to connect
208             :type guid1: ResourceManager
209
210             :param guid2: Second guid to connect
211             :type guid: ResourceManager
212
213         """
214         rm1 = self.get_resource(guid1)
215         rm2 = self.get_resource(guid2)
216
217         rm1.connect(guid2)
218         rm2.connect(guid1)
219
220     def register_condition(self, group1, action, group2, state,
221             time = None):
222         """ Registers an action START or STOP for all RM on group1 to occur 
223             time 'time' after all elements in group2 reached state 'state'.
224
225             :param group1: List of guids of RMs subjected to action
226             :type group1: list
227
228             :param action: Action to register (either START or STOP)
229             :type action: ResourceAction
230
231             :param group2: List of guids of RMs to we waited for
232             :type group2: list
233
234             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
235             :type state: ResourceState
236
237             :param time: Time to wait after group2 has reached status 
238             :type time: string
239
240         """
241         if isinstance(group1, int):
242             group1 = [group1]
243         if isinstance(group2, int):
244             group2 = [group2]
245
246         for guid1 in group1:
247             rm = self.get_resource(guid1)
248             rm.register_condition(action, group2, state, time)
249
250     def register_trace(self, guid, name):
251         """ Enable trace
252
253         :param name: Name of the trace
254         :type name: str
255         """
256         rm = self.get_resource(guid)
257         rm.register_trace(name)
258
259     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
260         """ Get information on collected trace
261
262         :param name: Name of the trace
263         :type name: str
264
265         :param attr: Can be one of:
266                          - TraceAttr.ALL (complete trace content), 
267                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
268                          - TraceAttr.PATH (full path to the trace file),
269                          - TraceAttr.SIZE (size of trace file). 
270         :type attr: str
271
272         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
273         :type name: int
274
275         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
276         :type name: int
277
278         :rtype: str
279         """
280         rm = self.get_resource(guid)
281         return rm.trace(name, attr, block, offset)
282
283     def discover(self, guid):
284         """ Discover a specific RM defined by its 'guid'
285
286             :param guid: Guid of the RM
287             :type guid: int
288
289         """
290         rm = self.get_resource(guid)
291         return rm.discover()
292
293     def provision(self, guid):
294         """ Provision a specific RM defined by its 'guid'
295
296             :param guid: Guid of the RM
297             :type guid: int
298
299         """
300         rm = self.get_resource(guid)
301         return rm.provision()
302
303     def get(self, guid, name):
304         """ Get a specific attribute 'name' from the RM 'guid'
305
306             :param guid: Guid of the RM
307             :type guid: int
308
309             :param name: attribute's name
310             :type name: str
311
312         """
313         rm = self.get_resource(guid)
314         return rm.get(name)
315
316     def set(self, guid, name, value):
317         """ Set a specific attribute 'name' from the RM 'guid' 
318             with the value 'value' 
319
320             :param guid: Guid of the RM
321             :type guid: int
322
323             :param name: attribute's name
324             :type name: str
325
326             :param value: attribute's value
327
328         """
329         rm = self.get_resource(guid)
330         return rm.set(name, value)
331
332     def state(self, guid, hr = False):
333         """ Returns the state of a resource
334
335             :param guid: Resource guid
336             :type guid: integer
337
338             :param hr: Human readable. Forces return of a 
339                 status string instead of a number 
340             :type hr: boolean
341
342         """
343         rm = self.get_resource(guid)
344         if hr:
345             return ResourceState2str.get(rm.state)
346
347         return rm.state
348
349     def stop(self, guid):
350         """ Stop a specific RM defined by its 'guid'
351
352             :param guid: Guid of the RM
353             :type guid: int
354
355         """
356         rm = self.get_resource(guid)
357         return rm.stop()
358
359     def start(self, guid):
360         """ Start a specific RM defined by its 'guid'
361
362             :param guid: Guid of the RM
363             :type guid: int
364
365         """
366         rm = self.get_resource(guid)
367         return rm.start()
368
369     def set_with_conditions(self, name, value, group1, group2, state,
370             time = None):
371         """ Set value 'value' on attribute with name 'name' on all RMs of
372             group1 when 'time' has elapsed since all elements in group2 
373             have reached state 'state'.
374
375             :param name: Name of attribute to set in RM
376             :type name: string
377
378             :param value: Value of attribute to set in RM
379             :type name: string
380
381             :param group1: List of guids of RMs subjected to action
382             :type group1: list
383
384             :param action: Action to register (either START or STOP)
385             :type action: ResourceAction
386
387             :param group2: List of guids of RMs to we waited for
388             :type group2: list
389
390             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
391             :type state: ResourceState
392
393             :param time: Time to wait after group2 has reached status 
394             :type time: string
395
396         """
397         if isinstance(group1, int):
398             group1 = [group1]
399         if isinstance(group2, int):
400             group2 = [group2]
401
402         for guid1 in group1:
403             rm = self.get_resource(guid)
404             rm.set_with_conditions(name, value, group2, state, time)
405
406     def stop_with_conditions(self, guid):
407         """ Stop a specific RM defined by its 'guid' only if all the conditions are true
408
409             :param guid: Guid of the RM
410             :type guid: int
411
412         """
413         rm = self.get_resource(guid)
414         return rm.stop_with_conditions()
415
416     def start_with_conditions(self, guid):
417         """ Start a specific RM defined by its 'guid' only if all the conditions are true
418
419             :param guid: Guid of the RM
420             :type guid: int
421
422         """
423         rm = self.get_resource(guid)
424         return rm.start_with_condition()
425
426     def deploy(self, group = None, wait_all_ready = True):
427         """ Deploy all resource manager in group
428
429         :param group: List of guids of RMs to deploy
430         :type group: list
431
432         :param wait_all_ready: Wait until all RMs are ready in
433             order to start the RMs
434         :type guid: int
435
436         """
437         self.logger.debug(" ------- DEPLOY START ------ ")
438
439         if not group:
440             group = self.resources
441
442         # Before starting deployment we disorder the group list with the
443         # purpose of speeding up the whole deployment process.
444         # It is likely that the user inserted in the 'group' list closely
445         # resources one after another (e.g. all applications
446         # connected to the same node can likely appear one after another).
447         # This can originate a slow down in the deployment since the N 
448         # threads the parallel runner uses to processes tasks may all
449         # be taken up by the same family of resources waiting for the 
450         # same conditions (e.g. LinuxApplications running on a same 
451         # node share a single lock, so they will tend to be serialized).
452         # If we disorder the group list, this problem can be mitigated.
453         random.shuffle(group)
454
455         def wait_all_and_start(group):
456             reschedule = False
457             for guid in group:
458                 rm = self.get_resource(guid)
459                 if rm.state < ResourceState.READY:
460                     reschedule = True
461                     break
462
463             if reschedule:
464                 callback = functools.partial(wait_all_and_start, group)
465                 self.schedule("1s", callback)
466             else:
467                 # If all resources are read, we schedule the start
468                 for guid in group:
469                     rm = self.get_resource(guid)
470                     self.schedule("0.01s", rm.start_with_conditions)
471
472         if wait_all_ready:
473             # Schedule the function that will check all resources are
474             # READY, and only then it will schedule the start.
475             # This is aimed to reduce the number of tasks looping in the scheduler.
476             # Intead of having N start tasks, we will have only one
477             callback = functools.partial(wait_all_and_start, group)
478             self.schedule("1s", callback)
479
480         for guid in group:
481             rm = self.get_resource(guid)
482             self.schedule("0.001s", rm.deploy)
483
484             if not wait_all_ready:
485                 self.schedule("1s", rm.start_with_conditions)
486
487             if rm.conditions.get(ResourceAction.STOP):
488                 # Only if the RM has STOP conditions we
489                 # schedule a stop. Otherwise the RM will stop immediately
490                 self.schedule("2s", rm.stop_with_conditions)
491
492
493     def release(self, group = None):
494         """ Release the elements of the list 'group' or 
495         all the resources if any group is specified
496
497             :param group: List of RM
498             :type group: list
499
500         """
501         if not group:
502             group = self.resources
503
504         threads = []
505         for guid in group:
506             rm = self.get_resource(guid)
507             thread = threading.Thread(target=rm.release)
508             threads.append(thread)
509             thread.setDaemon(True)
510             thread.start()
511
512         while list(threads) and not self.finished:
513             thread = threads[0]
514             # Time out after 5 seconds to check EC not terminated
515             thread.join(5)
516             if not thread.is_alive():
517                 threads.remove(thread)
518         
519     def shutdown(self):
520         """ Shutdown the Experiment Controller. 
521         It means : Release all the resources and stop the scheduler
522
523         """
524         self.release()
525
526         self._stop_scheduler()
527         
528         if self._thread.is_alive():
529            self._thread.join()
530
531     def schedule(self, date, callback, track = False):
532         """ Schedule a callback to be executed at time date.
533
534             :param date: string containing execution time for the task.
535                     It can be expressed as an absolute time, using
536                     timestamp format, or as a relative time matching
537                     ^\d+.\d+(h|m|s|ms|us)$
538
539             :param callback: code to be executed for the task. Must be a
540                         Python function, and receives args and kwargs
541                         as arguments.
542
543             :param track: if set to True, the task will be retrivable with
544                     the get_task() method
545
546             :return : The Id of the task
547         """
548         timestamp = strfvalid(date)
549         
550         task = Task(timestamp, callback)
551         task = self._scheduler.schedule(task)
552
553         if track:
554             self._tasks[task.id] = task
555   
556         # Notify condition to wake up the processing thread
557         self._cond.acquire()
558         self._cond.notify()
559         self._cond.release()
560
561         return task.id
562      
563     def _process(self):
564         """ Process at executing the task that are in the scheduler.
565
566         """
567
568         runner = ParallelRun(maxthreads = 50)
569         runner.start()
570
571         try:
572             while not self.finished:
573                 self._cond.acquire()
574                 task = self._scheduler.next()
575                 self._cond.release()
576                 
577                 if not task:
578                     # It there are not tasks in the tasks queue we need to 
579                     # wait until a call to schedule wakes us up
580                     self._cond.acquire()
581                     self._cond.wait()
582                     self._cond.release()
583                 else: 
584                     # If the task timestamp is in the future the thread needs to wait
585                     # until time elapse or until another task is scheduled
586                     now = strfnow()
587                     if now < task.timestamp:
588                         # Calculate time difference in seconds
589                         timeout = strfdiff(task.timestamp, now)
590                         # Re-schedule task with the same timestamp
591                         self._scheduler.schedule(task)
592                         # Sleep until timeout or until a new task awakes the condition
593                         self._cond.acquire()
594                         self._cond.wait(timeout)
595                         self._cond.release()
596                     else:
597                         # Process tasks in parallel
598                         runner.put(self._execute, task)
599         except: 
600             import traceback
601             err = traceback.format_exc()
602             self._logger.error("Error while processing tasks in the EC: %s" % err)
603
604             self._state = ECState.FAILED
605    
606         # Mark EC state as terminated
607         if self.ecstate == ECState.RUNNING:
608             # Synchronize to get errors if occurred
609             runner.sync()
610             self._state = ECState.TERMINATED
611
612     def _execute(self, task):
613         """ Invoke the callback of the task 'task'
614
615             :param task: Id of the task
616             :type task: int
617
618         """
619         # Invoke callback
620         task.status = TaskStatus.DONE
621
622         try:
623             task.result = task.callback()
624         except:
625             import traceback
626             err = traceback.format_exc()
627             task.result = err
628             task.status = TaskStatus.ERROR
629             
630             self._logger.error("Error occurred while executing task: %s" % err)
631
632             self._stop_scheduler()
633
634             # Propage error to the ParallelRunner
635             raise
636
637     def _stop_scheduler(self):
638         """ Stop the scheduler and put the EC into a FAILED State.
639
640         """
641
642         # Mark the EC as failed
643         self._state = ECState.FAILED
644
645         # Wake up the EC in case it was sleeping
646         self._cond.acquire()
647         self._cond.notify()
648         self._cond.release()
649
650