Bugfixes for LinuxApplication and LinuxNode
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 import functools
21 import logging
22 import os
23 import random
24 import sys
25 import time
26 import threading
27
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid 
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32         ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
35
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
38
39 class ECState(object):
40     """ State of the Experiment Controller
41    
42     """
43     RUNNING = 1
44     FAILED = 2
45     TERMINATED = 3
46
47 class ExperimentController(object):
48     """
49     .. class:: Class Args :
50       
51         :param exp_id: Human readable identifier for the experiment. 
52                         It will be used in the name of the directory 
53                         where experiment related information is stored
54         :type exp_id: int
55
56         :param root_dir: Root directory where experiment specific folder
57                          will be created to store experiment information
58         :type root_dir: str
59
60     .. note::
61         The ExperimentController (EC), is the entity responsible for 
62         managing a single experiment. 
63         Through the EC interface the user can create ResourceManagers (RMs),
64         configure them and interconnect them, in order to describe the experiment.
65         
66         Only when the 'deploy()' method is invoked, the EC will take actions
67         to transform the 'described' experiment into a 'running' experiment.
68
69         While the experiment is running, it is possible to continue to
70         create/configure/connect RMs, and to deploy them to involve new
71         resources in the experiment.
72
73     """
74
75     def __init__(self, exp_id = None, root_dir = "/tmp"): 
76         super(ExperimentController, self).__init__()
77         # root directory to store files
78         self._root_dir = root_dir
79
80         # experiment identifier given by the user
81         self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
82
83         # generator of globally unique ids
84         self._guid_generator = guid.GuidGenerator()
85         
86         # Resource managers
87         self._resources = dict()
88
89         # Scheduler
90         self._scheduler = HeapScheduler()
91
92         # Tasks
93         self._tasks = dict()
94
95         # Event processing thread
96         self._cond = threading.Condition()
97         self._thread = threading.Thread(target = self._process)
98         self._thread.setDaemon(True)
99         self._thread.start()
100
101         # EC state
102         self._state = ECState.RUNNING
103
104         # Logging
105         self._logger = logging.getLogger("ExperimentController")
106
107     @property
108     def logger(self):
109         """ Return the logger of the Experiment Controller
110
111         """
112         return self._logger
113
114     @property
115     def ecstate(self):
116         """ Return the state of the Experiment Controller
117
118         """
119         return self._state
120
121     @property
122     def exp_id(self):
123         """ Return the experiment ID
124
125         """
126         exp_id = self._exp_id
127         if not exp_id.startswith("nepi-"):
128             exp_id = "nepi-" + exp_id
129         return exp_id
130
131     @property
132     def finished(self):
133         """ Put the state of the Experiment Controller into a final state :
134             Either TERMINATED or FAILED
135
136         """
137         return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
138
139     def wait_finished(self, guids):
140         """ Blocking method that wait until all the RM from the 'guid' list 
141             reached the state FINISHED
142
143         :param guids: List of guids
144         :type guids: list
145         """
146         return self.wait(guids)
147
148     def wait_started(self, guids):
149         """ Blocking method that wait until all the RM from the 'guid' list 
150             reached the state STARTED
151
152         :param guids: List of guids
153         :type guids: list
154         """
155         return self.wait(guids, states = [ResourceState.STARTED, ResourceState.FINISHED])
156
157     def wait(self, guids, states = [ResourceState.FINISHED]):
158         """ Blocking method that waits until all the RM from the 'guid' list 
159             reached state 'state' or until a failure occurs
160             
161         :param guids: List of guids
162         :type guids: list
163         """
164         if isinstance(guids, int):
165             guids = [guids]
166
167         while not all([self.state(guid) in states for guid in guids]) and \
168                 not any([self.state(guid) in [
169                         ResourceState.STOPPED, 
170                         ResourceState.FAILED] for guid in guids]) and \
171                 not self.finished:
172             # We keep the sleep big to decrease the number of RM state queries
173             time.sleep(2)
174    
175     def get_task(self, tid):
176         """ Get a specific task
177
178         :param tid: Id of the task
179         :type tid: int
180         :rtype:  unknow
181         """
182         return self._tasks.get(tid)
183
184     def get_resource(self, guid):
185         """ Get a specific Resource Manager
186
187         :param guid: Id of the task
188         :type guid: int
189         :rtype:  ResourceManager
190         """
191         return self._resources.get(guid)
192
193     @property
194     def resources(self):
195         """ Returns the list of all the Resource Manager Id
196
197         :rtype: set
198
199         """
200         return self._resources.keys()
201
202     def register_resource(self, rtype, guid = None):
203         """ Register a Resource Manager. It creates a new 'guid', if it is not specified, 
204         for the RM of type 'rtype' and add it to the list of Resources.
205
206         :param rtype: Type of the RM
207         :type rtype: str
208         :return: Id of the RM
209         :rtype: int
210         """
211         # Get next available guid
212         guid = self._guid_generator.next(guid)
213         
214         # Instantiate RM
215         rm = ResourceFactory.create(rtype, self, guid)
216
217         # Store RM
218         self._resources[guid] = rm
219
220         return guid
221
222     def get_attributes(self, guid):
223         """ Return all the attibutes of a specific RM
224
225         :param guid: Guid of the RM
226         :type guid: int
227         :return: List of attributes
228         :rtype: list
229         """
230         rm = self.get_resource(guid)
231         return rm.get_attributes()
232
233     def register_connection(self, guid1, guid2):
234         """ Registers a guid1 with a guid2. 
235             The declaration order is not important
236
237             :param guid1: First guid to connect
238             :type guid1: ResourceManager
239
240             :param guid2: Second guid to connect
241             :type guid: ResourceManager
242         """
243         rm1 = self.get_resource(guid1)
244         rm2 = self.get_resource(guid2)
245
246         rm1.connect(guid2)
247         rm2.connect(guid1)
248
249     def register_condition(self, group1, action, group2, state,
250             time = None):
251         """ Registers an action START or STOP for all RM on group1 to occur 
252             time 'time' after all elements in group2 reached state 'state'.
253
254             :param group1: List of guids of RMs subjected to action
255             :type group1: list
256
257             :param action: Action to register (either START or STOP)
258             :type action: ResourceAction
259
260             :param group2: List of guids of RMs to we waited for
261             :type group2: list
262
263             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
264             :type state: ResourceState
265
266             :param time: Time to wait after group2 has reached status 
267             :type time: string
268
269         """
270         if isinstance(group1, int):
271             group1 = [group1]
272         if isinstance(group2, int):
273             group2 = [group2]
274
275         for guid1 in group1:
276             rm = self.get_resource(guid1)
277             rm.register_condition(action, group2, state, time)
278
279     def register_trace(self, guid, name):
280         """ Enable trace
281
282         :param name: Name of the trace
283         :type name: str
284         """
285         rm = self.get_resource(guid)
286         rm.register_trace(name)
287
288     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
289         """ Get information on collected trace
290
291         :param name: Name of the trace
292         :type name: str
293
294         :param attr: Can be one of:
295                          - TraceAttr.ALL (complete trace content), 
296                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
297                          - TraceAttr.PATH (full path to the trace file),
298                          - TraceAttr.SIZE (size of trace file). 
299         :type attr: str
300
301         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
302         :type name: int
303
304         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
305         :type name: int
306
307         :rtype: str
308         """
309         rm = self.get_resource(guid)
310         return rm.trace(name, attr, block, offset)
311
312     def discover(self, guid):
313         """ Discover a specific RM defined by its 'guid'
314
315             :param guid: Guid of the RM
316             :type guid: int
317
318         """
319         rm = self.get_resource(guid)
320         return rm.discover()
321
322     def provision(self, guid):
323         """ Provision a specific RM defined by its 'guid'
324
325             :param guid: Guid of the RM
326             :type guid: int
327
328         """
329         rm = self.get_resource(guid)
330         return rm.provision()
331
332     def get(self, guid, name):
333         """ Get a specific attribute 'name' from the RM 'guid'
334
335             :param guid: Guid of the RM
336             :type guid: int
337
338             :param name: attribute's name
339             :type name: str
340
341         """
342         rm = self.get_resource(guid)
343         return rm.get(name)
344
345     def set(self, guid, name, value):
346         """ Set a specific attribute 'name' from the RM 'guid' 
347             with the value 'value' 
348
349             :param guid: Guid of the RM
350             :type guid: int
351
352             :param name: attribute's name
353             :type name: str
354
355             :param value: attribute's value
356
357         """
358         rm = self.get_resource(guid)
359         return rm.set(name, value)
360
361     def state(self, guid, hr = False):
362         """ Returns the state of a resource
363
364             :param guid: Resource guid
365             :type guid: integer
366
367             :param hr: Human readable. Forces return of a 
368                 status string instead of a number 
369             :type hr: boolean
370
371         """
372         rm = self.get_resource(guid)
373         if hr:
374             return ResourceState2str.get(rm.state)
375
376         return rm.state
377
378     def stop(self, guid):
379         """ Stop a specific RM defined by its 'guid'
380
381             :param guid: Guid of the RM
382             :type guid: int
383
384         """
385         rm = self.get_resource(guid)
386         return rm.stop()
387
388     def start(self, guid):
389         """ Start a specific RM defined by its 'guid'
390
391             :param guid: Guid of the RM
392             :type guid: int
393
394         """
395         rm = self.get_resource(guid)
396         return rm.start()
397
398     def set_with_conditions(self, name, value, group1, group2, state,
399             time = None):
400         """ Set value 'value' on attribute with name 'name' on all RMs of
401             group1 when 'time' has elapsed since all elements in group2 
402             have reached state 'state'.
403
404             :param name: Name of attribute to set in RM
405             :type name: string
406
407             :param value: Value of attribute to set in RM
408             :type name: string
409
410             :param group1: List of guids of RMs subjected to action
411             :type group1: list
412
413             :param action: Action to register (either START or STOP)
414             :type action: ResourceAction
415
416             :param group2: List of guids of RMs to we waited for
417             :type group2: list
418
419             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
420             :type state: ResourceState
421
422             :param time: Time to wait after group2 has reached status 
423             :type time: string
424
425         """
426         if isinstance(group1, int):
427             group1 = [group1]
428         if isinstance(group2, int):
429             group2 = [group2]
430
431         for guid1 in group1:
432             rm = self.get_resource(guid)
433             rm.set_with_conditions(name, value, group2, state, time)
434
435     def stop_with_conditions(self, guid):
436         """ Stop a specific RM defined by its 'guid' only if all the conditions are true
437
438             :param guid: Guid of the RM
439             :type guid: int
440
441         """
442         rm = self.get_resource(guid)
443         return rm.stop_with_conditions()
444
445     def start_with_conditions(self, guid):
446         """ Start a specific RM defined by its 'guid' only if all the conditions are true
447
448             :param guid: Guid of the RM
449             :type guid: int
450
451         """
452         rm = self.get_resource(guid)
453         return rm.start_with_condition()
454
455     def deploy(self, group = None, wait_all_ready = True):
456         """ Deploy all resource manager in group
457
458         :param group: List of guids of RMs to deploy
459         :type group: list
460
461         :param wait_all_ready: Wait until all RMs are ready in
462             order to start the RMs
463         :type guid: int
464
465         """
466         self.logger.debug(" ------- DEPLOY START ------ ")
467
468         if not group:
469             # By default, if not deployment group is indicated, 
470             # all RMs that are undeployed will be deployed
471             group = []
472             for guid in self.resources:
473                 if self.state(guid) == ResourceState.NEW:
474                     group.append(guid)
475                 
476         if isinstance(group, int):
477             group = [group]
478
479         # Before starting deployment we disorder the group list with the
480         # purpose of speeding up the whole deployment process.
481         # It is likely that the user inserted in the 'group' list closely
482         # resources one after another (e.g. all applications
483         # connected to the same node can likely appear one after another).
484         # This can originate a slow down in the deployment since the N 
485         # threads the parallel runner uses to processes tasks may all
486         # be taken up by the same family of resources waiting for the 
487         # same conditions (e.g. LinuxApplications running on a same 
488         # node share a single lock, so they will tend to be serialized).
489         # If we disorder the group list, this problem can be mitigated.
490         #random.shuffle(group)
491
492         def wait_all_and_start(group):
493             reschedule = False
494             for guid in group:
495                 rm = self.get_resource(guid)
496                 if rm.state < ResourceState.READY:
497                     reschedule = True
498                     break
499
500             if reschedule:
501                 callback = functools.partial(wait_all_and_start, group)
502                 self.schedule("1s", callback)
503             else:
504                 # If all resources are read, we schedule the start
505                 for guid in group:
506                     rm = self.get_resource(guid)
507                     self.schedule("0s", rm.start_with_conditions)
508
509         if wait_all_ready:
510             # Schedule the function that will check all resources are
511             # READY, and only then it will schedule the start.
512             # This is aimed to reduce the number of tasks looping in the scheduler.
513             # Intead of having N start tasks, we will have only one
514             callback = functools.partial(wait_all_and_start, group)
515             self.schedule("1s", callback)
516
517         for guid in group:
518             rm = self.get_resource(guid)
519             self.schedule("0s", rm.deploy)
520
521             if not wait_all_ready:
522                 self.schedule("1s", rm.start_with_conditions)
523
524             if rm.conditions.get(ResourceAction.STOP):
525                 # Only if the RM has STOP conditions we
526                 # schedule a stop. Otherwise the RM will stop immediately
527                 self.schedule("2s", rm.stop_with_conditions)
528
529
530     def release(self, group = None):
531         """ Release the elements of the list 'group' or 
532         all the resources if any group is specified
533
534             :param group: List of RM
535             :type group: list
536
537         """
538         if not group:
539             group = self.resources
540
541         threads = []
542         for guid in group:
543             rm = self.get_resource(guid)
544             thread = threading.Thread(target=rm.release)
545             threads.append(thread)
546             thread.setDaemon(True)
547             thread.start()
548
549         while list(threads) and not self.finished:
550             thread = threads[0]
551             # Time out after 5 seconds to check EC not terminated
552             thread.join(5)
553             if not thread.is_alive():
554                 threads.remove(thread)
555         
556     def shutdown(self):
557         """ Shutdown the Experiment Controller. 
558         Releases all the resources and stops task processing thread
559
560         """
561         self.release()
562
563         # Mark the EC state as TERMINATED
564         self._state = ECState.TERMINATED
565
566         # Notify condition to wake up the processing thread
567         self._notify()
568         
569         if self._thread.is_alive():
570            self._thread.join()
571
572     def schedule(self, date, callback, track = False):
573         """ Schedule a callback to be executed at time date.
574
575             :param date: string containing execution time for the task.
576                     It can be expressed as an absolute time, using
577                     timestamp format, or as a relative time matching
578                     ^\d+.\d+(h|m|s|ms|us)$
579
580             :param callback: code to be executed for the task. Must be a
581                         Python function, and receives args and kwargs
582                         as arguments.
583
584             :param track: if set to True, the task will be retrivable with
585                     the get_task() method
586
587             :return : The Id of the task
588         """
589         timestamp = strfvalid(date)
590         
591         task = Task(timestamp, callback)
592         task = self._scheduler.schedule(task)
593
594         if track:
595             self._tasks[task.id] = task
596   
597         # Notify condition to wake up the processing thread
598         self._notify()
599
600         return task.id
601      
602     def _process(self):
603         """ Process scheduled tasks.
604
605         The _process method is executed in an independent thread held by the 
606         ExperimentController for as long as the experiment is running.
607         
608         Tasks are scheduled by invoking the schedule method with a target callback. 
609         The schedule method is given a execution time which controls the
610         order in which tasks are processed. 
611
612         Tasks are processed in parallel using multithreading. 
613         The environmental variable NEPI_NTHREADS can be used to control
614         the number of threads used to process tasks. The default value is 50.
615
616         Exception handling:
617
618         To execute tasks in parallel, an ParallelRunner (PR) object, holding
619         a pool of threads (workers), is used.
620         For each available thread in the PR, the next task popped from 
621         the scheduler queue is 'put' in the PR.
622         Upon receiving a task to execute, each PR worker (thread) invokes the 
623         _execute method of the EC, passing the task as argument. 
624         This method, calls task.callback inside a try/except block. If an 
625         exception is raised by the tasks.callback, it will be trapped by the 
626         try block, logged to standard error (usually the console), and the EC 
627         state will be set to ECState.FAILED.
628         The invocation of _notify immediately after, forces the processing
629         loop in the _process method, to wake up if it was blocked waiting for new 
630         tasks to arrived, and to check the EC state.
631         As the EC is in FAILED state, the processing loop exits and the 
632         'finally' block is invoked. In the 'finally' block, the 'sync' method
633         of the PR is invoked, which forces the PR to raise any unchecked errors
634         that might have been raised by the workers.
635
636         """
637         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
638
639         runner = ParallelRun(maxthreads = nthreads)
640         runner.start()
641
642         try:
643             while not self.finished:
644                 self._cond.acquire()
645                 task = self._scheduler.next()
646                 self._cond.release()
647                 
648                 if not task:
649                     # It there are not tasks in the tasks queue we need to 
650                     # wait until a call to schedule wakes us up
651                     self._cond.acquire()
652                     self._cond.wait()
653                     self._cond.release()
654                 else: 
655                     # If the task timestamp is in the future the thread needs to wait
656                     # until time elapse or until another task is scheduled
657                     now = strfnow()
658                     if now < task.timestamp:
659                         # Calculate time difference in seconds
660                         timeout = strfdiff(task.timestamp, now)
661                         # Re-schedule task with the same timestamp
662                         self._scheduler.schedule(task)
663                         # Sleep until timeout or until a new task awakes the condition
664                         self._cond.acquire()
665                         self._cond.wait(timeout)
666                         self._cond.release()
667                     else:
668                         # Process tasks in parallel
669                         runner.put(self._execute, task)
670         except: 
671             import traceback
672             err = traceback.format_exc()
673             self._logger.error("Error while processing tasks in the EC: %s" % err)
674
675             self._state = ECState.FAILED
676         finally:   
677             self._logger.info("Exiting the task processing loop ... ")
678             runner.sync()
679
680     def _execute(self, task):
681         """ Executes a single task. 
682
683             If the invokation of the task callback raises an
684             exception, the processing thread of the ExperimentController
685             will be stopped and the experiment will be aborted.
686
687             :param task: Object containing the callback to execute
688             :type task: Task
689
690         """
691         # Invoke callback
692         task.status = TaskStatus.DONE
693
694         try:
695             task.result = task.callback()
696         except:
697             import traceback
698             err = traceback.format_exc()
699             task.result = err
700             task.status = TaskStatus.ERROR
701             
702             self._logger.error("Error occurred while executing task: %s" % err)
703
704             # Set the EC to FAILED state (this will force to exit the task
705             # processing thread)
706             self._state = ECState.FAILED
707
708             # Notify condition to wake up the processing thread
709             self._notify()
710
711             # Propage error to the ParallelRunner
712             raise
713
714     def _notify(self):
715         """ Awakes the processing thread in case it is blocked waiting
716         for a new task to be scheduled.
717         """
718         self._cond.acquire()
719         self._cond.notify()
720         self._cond.release()
721