Improved LinuxApplication behavior
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 import functools
21 import logging
22 import os
23 import random
24 import sys
25 import time
26 import threading
27
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid 
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32         ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
35
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
38
39 class ECState(object):
40     """ State of the Experiment Controller
41    
42     """
43     RUNNING = 1
44     FAILED = 2
45     TERMINATED = 3
46
47 class ExperimentController(object):
48     """
49     .. class:: Class Args :
50       
51         :param exp_id: Human readable identifier for the experiment. 
52                         It will be used in the name of the directory 
53                         where experiment related information is stored
54         :type exp_id: int
55
56         :param root_dir: Root directory where experiment specific folder
57                          will be created to store experiment information
58         :type root_dir: str
59
60     .. note::
61         The ExperimentController (EC), is the entity responsible for 
62         managing a single experiment. 
63         Through the EC interface the user can create ResourceManagers (RMs),
64         configure them and interconnect them, in order to describe the experiment.
65         
66         Only when the 'deploy()' method is invoked, the EC will take actions
67         to transform the 'described' experiment into a 'running' experiment.
68
69         While the experiment is running, it is possible to continue to
70         create/configure/connect RMs, and to deploy them to involve new
71         resources in the experiment.
72
73     """
74
75     def __init__(self, exp_id = None, root_dir = "/tmp"): 
76         super(ExperimentController, self).__init__()
77         # root directory to store files
78         self._root_dir = root_dir
79
80         # experiment identifier given by the user
81         self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
82
83         # generator of globally unique ids
84         self._guid_generator = guid.GuidGenerator()
85         
86         # Resource managers
87         self._resources = dict()
88
89         # Scheduler
90         self._scheduler = HeapScheduler()
91
92         # Tasks
93         self._tasks = dict()
94
95         # Event processing thread
96         self._cond = threading.Condition()
97         self._thread = threading.Thread(target = self._process)
98         self._thread.setDaemon(True)
99         self._thread.start()
100
101         # EC state
102         self._state = ECState.RUNNING
103
104         # Logging
105         self._logger = logging.getLogger("ExperimentController")
106
107     @property
108     def logger(self):
109         """ Return the logger of the Experiment Controller
110
111         """
112         return self._logger
113
114     @property
115     def ecstate(self):
116         """ Return the state of the Experiment Controller
117
118         """
119         return self._state
120
121     @property
122     def exp_id(self):
123         """ Return the experiment ID
124
125         """
126         exp_id = self._exp_id
127         if not exp_id.startswith("nepi-"):
128             exp_id = "nepi-" + exp_id
129         return exp_id
130
131     @property
132     def finished(self):
133         """ Put the state of the Experiment Controller into a final state :
134             Either TERMINATED or FAILED
135
136         """
137         return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
138
139     def wait_finished(self, guids):
140         """ Blocking method that wait until all the RM from the 'guid' list 
141             reached the state FINISHED
142
143         :param guids: List of guids
144         :type guids: list
145         """
146         return self.wait(guids)
147
148     def wait_started(self, guids):
149         """ Blocking method that wait until all the RM from the 'guid' list 
150             reached the state STARTED
151
152         :param guids: List of guids
153         :type guids: list
154         """
155         return self.wait(guids, states = [ResourceState.STARTED, ResourceState.FINISHED])
156
157     def wait(self, guids, states = [ResourceState.FINISHED]):
158         """ Blocking method that waits until all the RM from the 'guid' list 
159             reached state 'state' or until a failure occurs
160             
161         :param guids: List of guids
162         :type guids: list
163         """
164         if isinstance(guids, int):
165             guids = [guids]
166
167         while not all([self.state(guid) in states for guid in guids]) and \
168                 not any([self.state(guid) in [
169                         ResourceState.STOPPED, 
170                         ResourceState.FAILED] for guid in guids]) and \
171                 not self.finished:
172             # We keep the sleep big to decrease the number of RM state queries
173             time.sleep(2)
174    
175     def get_task(self, tid):
176         """ Get a specific task
177
178         :param tid: Id of the task
179         :type tid: int
180         :rtype:  unknow
181         """
182         return self._tasks.get(tid)
183
184     def get_resource(self, guid):
185         """ Get a specific Resource Manager
186
187         :param guid: Id of the task
188         :type guid: int
189         :rtype:  ResourceManager
190         """
191         return self._resources.get(guid)
192
193     @property
194     def resources(self):
195         """ Returns the list of all the Resource Manager Id
196
197         :rtype: set
198
199         """
200         return self._resources.keys()
201
202     def register_resource(self, rtype, guid = None):
203         """ Register a Resource Manager. It creates a new 'guid', if it is not specified, 
204         for the RM of type 'rtype' and add it to the list of Resources.
205
206         :param rtype: Type of the RM
207         :type rtype: str
208         :return: Id of the RM
209         :rtype: int
210         """
211         # Get next available guid
212         guid = self._guid_generator.next(guid)
213         
214         # Instantiate RM
215         rm = ResourceFactory.create(rtype, self, guid)
216
217         # Store RM
218         self._resources[guid] = rm
219
220         return guid
221
222     def get_attributes(self, guid):
223         """ Return all the attibutes of a specific RM
224
225         :param guid: Guid of the RM
226         :type guid: int
227         :return: List of attributes
228         :rtype: list
229         """
230         rm = self.get_resource(guid)
231         return rm.get_attributes()
232
233     def register_connection(self, guid1, guid2):
234         """ Registers a guid1 with a guid2. 
235             The declaration order is not important
236
237             :param guid1: First guid to connect
238             :type guid1: ResourceManager
239
240             :param guid2: Second guid to connect
241             :type guid: ResourceManager
242         """
243         rm1 = self.get_resource(guid1)
244         rm2 = self.get_resource(guid2)
245
246         rm1.connect(guid2)
247         rm2.connect(guid1)
248
249     def register_condition(self, group1, action, group2, state,
250             time = None):
251         """ Registers an action START or STOP for all RM on group1 to occur 
252             time 'time' after all elements in group2 reached state 'state'.
253
254             :param group1: List of guids of RMs subjected to action
255             :type group1: list
256
257             :param action: Action to register (either START or STOP)
258             :type action: ResourceAction
259
260             :param group2: List of guids of RMs to we waited for
261             :type group2: list
262
263             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
264             :type state: ResourceState
265
266             :param time: Time to wait after group2 has reached status 
267             :type time: string
268
269         """
270         if isinstance(group1, int):
271             group1 = [group1]
272         if isinstance(group2, int):
273             group2 = [group2]
274
275         for guid1 in group1:
276             rm = self.get_resource(guid1)
277             rm.register_condition(action, group2, state, time)
278
279     def register_trace(self, guid, name):
280         """ Enable trace
281
282         :param name: Name of the trace
283         :type name: str
284         """
285         rm = self.get_resource(guid)
286         rm.register_trace(name)
287
288     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
289         """ Get information on collected trace
290
291         :param name: Name of the trace
292         :type name: str
293
294         :param attr: Can be one of:
295                          - TraceAttr.ALL (complete trace content), 
296                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
297                          - TraceAttr.PATH (full path to the trace file),
298                          - TraceAttr.SIZE (size of trace file). 
299         :type attr: str
300
301         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
302         :type name: int
303
304         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
305         :type name: int
306
307         :rtype: str
308         """
309         rm = self.get_resource(guid)
310         return rm.trace(name, attr, block, offset)
311
312     def discover(self, guid):
313         """ Discover a specific RM defined by its 'guid'
314
315             :param guid: Guid of the RM
316             :type guid: int
317
318         """
319         rm = self.get_resource(guid)
320         return rm.discover()
321
322     def provision(self, guid):
323         """ Provision a specific RM defined by its 'guid'
324
325             :param guid: Guid of the RM
326             :type guid: int
327
328         """
329         rm = self.get_resource(guid)
330         return rm.provision()
331
332     def get(self, guid, name):
333         """ Get a specific attribute 'name' from the RM 'guid'
334
335             :param guid: Guid of the RM
336             :type guid: int
337
338             :param name: attribute's name
339             :type name: str
340
341         """
342         rm = self.get_resource(guid)
343         return rm.get(name)
344
345     def set(self, guid, name, value):
346         """ Set a specific attribute 'name' from the RM 'guid' 
347             with the value 'value' 
348
349             :param guid: Guid of the RM
350             :type guid: int
351
352             :param name: attribute's name
353             :type name: str
354
355             :param value: attribute's value
356
357         """
358         rm = self.get_resource(guid)
359         return rm.set(name, value)
360
361     def state(self, guid, hr = False):
362         """ Returns the state of a resource
363
364             :param guid: Resource guid
365             :type guid: integer
366
367             :param hr: Human readable. Forces return of a 
368                 status string instead of a number 
369             :type hr: boolean
370
371         """
372         rm = self.get_resource(guid)
373         if hr:
374             return ResourceState2str.get(rm.state)
375
376         return rm.state
377
378     def stop(self, guid):
379         """ Stop a specific RM defined by its 'guid'
380
381             :param guid: Guid of the RM
382             :type guid: int
383
384         """
385         rm = self.get_resource(guid)
386         return rm.stop()
387
388     def start(self, guid):
389         """ Start a specific RM defined by its 'guid'
390
391             :param guid: Guid of the RM
392             :type guid: int
393
394         """
395         rm = self.get_resource(guid)
396         return rm.start()
397
398     def set_with_conditions(self, name, value, group1, group2, state,
399             time = None):
400         """ Set value 'value' on attribute with name 'name' on all RMs of
401             group1 when 'time' has elapsed since all elements in group2 
402             have reached state 'state'.
403
404             :param name: Name of attribute to set in RM
405             :type name: string
406
407             :param value: Value of attribute to set in RM
408             :type name: string
409
410             :param group1: List of guids of RMs subjected to action
411             :type group1: list
412
413             :param action: Action to register (either START or STOP)
414             :type action: ResourceAction
415
416             :param group2: List of guids of RMs to we waited for
417             :type group2: list
418
419             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
420             :type state: ResourceState
421
422             :param time: Time to wait after group2 has reached status 
423             :type time: string
424
425         """
426         if isinstance(group1, int):
427             group1 = [group1]
428         if isinstance(group2, int):
429             group2 = [group2]
430
431         for guid1 in group1:
432             rm = self.get_resource(guid)
433             rm.set_with_conditions(name, value, group2, state, time)
434
435     def stop_with_conditions(self, guid):
436         """ Stop a specific RM defined by its 'guid' only if all the conditions are true
437
438             :param guid: Guid of the RM
439             :type guid: int
440
441         """
442         rm = self.get_resource(guid)
443         return rm.stop_with_conditions()
444
445     def start_with_conditions(self, guid):
446         """ Start a specific RM defined by its 'guid' only if all the conditions are true
447
448             :param guid: Guid of the RM
449             :type guid: int
450
451         """
452         rm = self.get_resource(guid)
453         return rm.start_with_condition()
454
455     def deploy(self, group = None, wait_all_ready = True):
456         """ Deploy all resource manager in group
457
458         :param group: List of guids of RMs to deploy
459         :type group: list
460
461         :param wait_all_ready: Wait until all RMs are ready in
462             order to start the RMs
463         :type guid: int
464
465         """
466         self.logger.debug(" ------- DEPLOY START ------ ")
467
468         if not group:
469             # By default, if not deployment group is indicated, 
470             # all RMs that are undeployed will be deployed
471             group = []
472             for guid in self.resources:
473                 if self.state(guid) == ResourceState.NEW:
474                     group.append(guid)
475                 
476         if isinstance(group, int):
477             group = [group]
478
479         # Before starting deployment we disorder the group list with the
480         # purpose of speeding up the whole deployment process.
481         # It is likely that the user inserted in the 'group' list closely
482         # resources one after another (e.g. all applications
483         # connected to the same node can likely appear one after another).
484         # This can originate a slow down in the deployment since the N 
485         # threads the parallel runner uses to processes tasks may all
486         # be taken up by the same family of resources waiting for the 
487         # same conditions (e.g. LinuxApplications running on a same 
488         # node share a single lock, so they will tend to be serialized).
489         # If we disorder the group list, this problem can be mitigated.
490         #random.shuffle(group)
491
492         def wait_all_and_start(group):
493             reschedule = False
494             for guid in group:
495                 if self.state(guid) < ResourceState.READY:
496                     reschedule = True
497                     break
498
499             if reschedule:
500                 callback = functools.partial(wait_all_and_start, group)
501                 self.schedule("1s", callback)
502             else:
503                 # If all resources are read, we schedule the start
504                 for guid in group:
505                     rm = self.get_resource(guid)
506                     self.schedule("0s", rm.start_with_conditions)
507
508         if wait_all_ready:
509             # Schedule the function that will check all resources are
510             # READY, and only then it will schedule the start.
511             # This is aimed to reduce the number of tasks looping in the scheduler.
512             # Intead of having N start tasks, we will have only one
513             callback = functools.partial(wait_all_and_start, group)
514             self.schedule("1s", callback)
515
516         for guid in group:
517             rm = self.get_resource(guid)
518             self.schedule("0s", rm.deploy)
519
520             if not wait_all_ready:
521                 self.schedule("1s", rm.start_with_conditions)
522
523             if rm.conditions.get(ResourceAction.STOP):
524                 # Only if the RM has STOP conditions we
525                 # schedule a stop. Otherwise the RM will stop immediately
526                 self.schedule("2s", rm.stop_with_conditions)
527
528
529     def release(self, group = None):
530         """ Release the elements of the list 'group' or 
531         all the resources if any group is specified
532
533             :param group: List of RM
534             :type group: list
535
536         """
537         if not group:
538             group = self.resources
539
540         threads = []
541         for guid in group:
542             rm = self.get_resource(guid)
543             thread = threading.Thread(target=rm.release)
544             threads.append(thread)
545             thread.setDaemon(True)
546             thread.start()
547
548         while list(threads) and not self.finished:
549             thread = threads[0]
550             # Time out after 5 seconds to check EC not terminated
551             thread.join(5)
552             if not thread.is_alive():
553                 threads.remove(thread)
554         
555     def shutdown(self):
556         """ Shutdown the Experiment Controller. 
557         Releases all the resources and stops task processing thread
558
559         """
560         self.release()
561
562         # Mark the EC state as TERMINATED
563         self._state = ECState.TERMINATED
564
565         # Notify condition to wake up the processing thread
566         self._notify()
567         
568         if self._thread.is_alive():
569            self._thread.join()
570
571     def schedule(self, date, callback, track = False):
572         """ Schedule a callback to be executed at time date.
573
574             :param date: string containing execution time for the task.
575                     It can be expressed as an absolute time, using
576                     timestamp format, or as a relative time matching
577                     ^\d+.\d+(h|m|s|ms|us)$
578
579             :param callback: code to be executed for the task. Must be a
580                         Python function, and receives args and kwargs
581                         as arguments.
582
583             :param track: if set to True, the task will be retrivable with
584                     the get_task() method
585
586             :return : The Id of the task
587         """
588         timestamp = strfvalid(date)
589         
590         task = Task(timestamp, callback)
591         task = self._scheduler.schedule(task)
592
593         if track:
594             self._tasks[task.id] = task
595   
596         # Notify condition to wake up the processing thread
597         self._notify()
598
599         return task.id
600      
601     def _process(self):
602         """ Process scheduled tasks.
603
604         The _process method is executed in an independent thread held by the 
605         ExperimentController for as long as the experiment is running.
606         
607         Tasks are scheduled by invoking the schedule method with a target callback. 
608         The schedule method is given a execution time which controls the
609         order in which tasks are processed. 
610
611         Tasks are processed in parallel using multithreading. 
612         The environmental variable NEPI_NTHREADS can be used to control
613         the number of threads used to process tasks. The default value is 50.
614
615         Exception handling:
616
617         To execute tasks in parallel, an ParallelRunner (PR) object, holding
618         a pool of threads (workers), is used.
619         For each available thread in the PR, the next task popped from 
620         the scheduler queue is 'put' in the PR.
621         Upon receiving a task to execute, each PR worker (thread) invokes the 
622         _execute method of the EC, passing the task as argument. 
623         This method, calls task.callback inside a try/except block. If an 
624         exception is raised by the tasks.callback, it will be trapped by the 
625         try block, logged to standard error (usually the console), and the EC 
626         state will be set to ECState.FAILED.
627         The invocation of _notify immediately after, forces the processing
628         loop in the _process method, to wake up if it was blocked waiting for new 
629         tasks to arrived, and to check the EC state.
630         As the EC is in FAILED state, the processing loop exits and the 
631         'finally' block is invoked. In the 'finally' block, the 'sync' method
632         of the PR is invoked, which forces the PR to raise any unchecked errors
633         that might have been raised by the workers.
634
635         """
636         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
637
638         runner = ParallelRun(maxthreads = nthreads)
639         runner.start()
640
641         try:
642             while not self.finished:
643                 self._cond.acquire()
644                 task = self._scheduler.next()
645                 self._cond.release()
646                 
647                 if not task:
648                     # It there are not tasks in the tasks queue we need to 
649                     # wait until a call to schedule wakes us up
650                     self._cond.acquire()
651                     self._cond.wait()
652                     self._cond.release()
653                 else: 
654                     # If the task timestamp is in the future the thread needs to wait
655                     # until time elapse or until another task is scheduled
656                     now = strfnow()
657                     if now < task.timestamp:
658                         # Calculate time difference in seconds
659                         timeout = strfdiff(task.timestamp, now)
660                         # Re-schedule task with the same timestamp
661                         self._scheduler.schedule(task)
662                         # Sleep until timeout or until a new task awakes the condition
663                         self._cond.acquire()
664                         self._cond.wait(timeout)
665                         self._cond.release()
666                     else:
667                         # Process tasks in parallel
668                         runner.put(self._execute, task)
669         except: 
670             import traceback
671             err = traceback.format_exc()
672             self._logger.error("Error while processing tasks in the EC: %s" % err)
673
674             self._state = ECState.FAILED
675         finally:   
676             self._logger.info("Exiting the task processing loop ... ")
677             runner.sync()
678
679     def _execute(self, task):
680         """ Executes a single task. 
681
682             If the invokation of the task callback raises an
683             exception, the processing thread of the ExperimentController
684             will be stopped and the experiment will be aborted.
685
686             :param task: Object containing the callback to execute
687             :type task: Task
688
689         """
690         # Invoke callback
691         task.status = TaskStatus.DONE
692
693         try:
694             task.result = task.callback()
695         except:
696             import traceback
697             err = traceback.format_exc()
698             task.result = err
699             task.status = TaskStatus.ERROR
700             
701             self._logger.error("Error occurred while executing task: %s" % err)
702
703             # Set the EC to FAILED state (this will force to exit the task
704             # processing thread)
705             self._state = ECState.FAILED
706
707             # Notify condition to wake up the processing thread
708             self._notify()
709
710             # Propage error to the ParallelRunner
711             raise
712
713     def _notify(self):
714         """ Awakes the processing thread in case it is blocked waiting
715         for a new task to be scheduled.
716         """
717         self._cond.acquire()
718         self._cond.notify()
719         self._cond.release()
720