b5663c0faedc6496e2097e7ceaa1ea08b8244774
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 import functools
21 import logging
22 import os
23 import random
24 import sys
25 import time
26 import threading
27
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid 
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32         ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
35
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
38
39 class ECState(object):
40     """ State of the Experiment Controller
41    
42     """
43     RUNNING = 1
44     FAILED = 2
45     TERMINATED = 3
46
47 class ExperimentController(object):
48     """
49     .. class:: Class Args :
50       
51         :param exp_id: Human readable identifier for the experiment. 
52                         It will be used in the name of the directory 
53                         where experiment related information is stored
54         :type exp_id: int
55
56         :param root_dir: Root directory where experiment specific folder
57                          will be created to store experiment information
58         :type root_dir: str
59
60     .. note::
61         The ExperimentController (EC), is the entity responsible for 
62         managing a single experiment. 
63         Through the EC interface the user can create ResourceManagers (RMs),
64         configure them and interconnect them, in order to describe the experiment.
65         
66         Only when the 'deploy()' method is invoked, the EC will take actions
67         to transform the 'described' experiment into a 'running' experiment.
68
69         While the experiment is running, it is possible to continue to
70         create/configure/connect RMs, and to deploy them to involve new
71         resources in the experiment.
72
73     """
74
75     def __init__(self, exp_id = None, root_dir = "/tmp"): 
76         super(ExperimentController, self).__init__()
77         # root directory to store files
78         self._root_dir = root_dir
79
80         # experiment identifier given by the user
81         self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
82
83         # generator of globally unique ids
84         self._guid_generator = guid.GuidGenerator()
85         
86         # Resource managers
87         self._resources = dict()
88
89         # Scheduler
90         self._scheduler = HeapScheduler()
91
92         # Tasks
93         self._tasks = dict()
94
95         # Event processing thread
96         self._cond = threading.Condition()
97         self._thread = threading.Thread(target = self._process)
98         self._thread.setDaemon(True)
99         self._thread.start()
100
101         # EC state
102         self._state = ECState.RUNNING
103
104         # Logging
105         self._logger = logging.getLogger("ExperimentController")
106
107     @property
108     def logger(self):
109         """ Return the logger of the Experiment Controller
110
111         """
112         return self._logger
113
114     @property
115     def ecstate(self):
116         """ Return the state of the Experiment Controller
117
118         """
119         return self._state
120
121     @property
122     def exp_id(self):
123         """ Return the experiment ID
124
125         """
126         exp_id = self._exp_id
127         if not exp_id.startswith("nepi-"):
128             exp_id = "nepi-" + exp_id
129         return exp_id
130
131     @property
132     def finished(self):
133         """ Put the state of the Experiment Controller into a final state :
134             Either TERMINATED or FAILED
135
136         """
137         return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
138
139     def wait_finished(self, guids):
140         """ Blocking method that wait until all the RM from the 'guid' list 
141             reached the state FINISHED
142
143         :param guids: List of guids
144         :type guids: list
145         """
146         return self.wait(guids)
147
148     def wait_started(self, guids):
149         """ Blocking method that wait until all the RM from the 'guid' list 
150             reached the state STARTED
151
152         :param guids: List of guids
153         :type guids: list
154         """
155         return self.wait(guids, states = [ResourceState.STARTED, ResourceState.FINISHED])
156
157     def wait(self, guids, states = [ResourceState.FINISHED]):
158         """ Blocking method that waits until all the RM from the 'guid' list 
159             reached state 'state' or until a failure occurs
160             
161         :param guids: List of guids
162         :type guids: list
163         """
164         if isinstance(guids, int):
165             guids = [guids]
166
167         while not all([self.state(guid) in states for guid in guids]) and \
168                 not any([self.state(guid) in [
169                         ResourceState.STOPPED, 
170                         ResourceState.FAILED] for guid in guids]) and \
171                 not self.finished:
172             # We keep the sleep big to decrease the number of RM state queries
173             time.sleep(2)
174    
175     def get_task(self, tid):
176         """ Get a specific task
177
178         :param tid: Id of the task
179         :type tid: int
180         :rtype:  unknow
181         """
182         return self._tasks.get(tid)
183
184     def get_resource(self, guid):
185         """ Get a specific Resource Manager
186
187         :param guid: Id of the task
188         :type guid: int
189         :rtype:  ResourceManager
190         """
191         return self._resources.get(guid)
192
193     @property
194     def resources(self):
195         """ Returns the list of all the Resource Manager Id
196
197         :rtype: set
198
199         """
200         return self._resources.keys()
201
202     def register_resource(self, rtype, guid = None):
203         """ Register a Resource Manager. It creates a new 'guid', if it is not specified, 
204         for the RM of type 'rtype' and add it to the list of Resources.
205
206         :param rtype: Type of the RM
207         :type rtype: str
208         :return: Id of the RM
209         :rtype: int
210         """
211         # Get next available guid
212         guid = self._guid_generator.next(guid)
213         
214         # Instantiate RM
215         rm = ResourceFactory.create(rtype, self, guid)
216
217         # Store RM
218         self._resources[guid] = rm
219
220         return guid
221
222     def get_attributes(self, guid):
223         """ Return all the attibutes of a specific RM
224
225         :param guid: Guid of the RM
226         :type guid: int
227         :return: List of attributes
228         :rtype: list
229         """
230         rm = self.get_resource(guid)
231         return rm.get_attributes()
232
233     def register_connection(self, guid1, guid2):
234         """ Registers a guid1 with a guid2. 
235             The declaration order is not important
236
237             :param guid1: First guid to connect
238             :type guid1: ResourceManager
239
240             :param guid2: Second guid to connect
241             :type guid: ResourceManager
242         """
243         rm1 = self.get_resource(guid1)
244         rm2 = self.get_resource(guid2)
245
246         rm1.connect(guid2)
247         rm2.connect(guid1)
248
249     def register_condition(self, group1, action, group2, state,
250             time = None):
251         """ Registers an action START or STOP for all RM on group1 to occur 
252             time 'time' after all elements in group2 reached state 'state'.
253
254             :param group1: List of guids of RMs subjected to action
255             :type group1: list
256
257             :param action: Action to register (either START or STOP)
258             :type action: ResourceAction
259
260             :param group2: List of guids of RMs to we waited for
261             :type group2: list
262
263             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
264             :type state: ResourceState
265
266             :param time: Time to wait after group2 has reached status 
267             :type time: string
268
269         """
270         if isinstance(group1, int):
271             group1 = [group1]
272         if isinstance(group2, int):
273             group2 = [group2]
274
275         for guid1 in group1:
276             rm = self.get_resource(guid1)
277             rm.register_condition(action, group2, state, time)
278
279     def register_trace(self, guid, name):
280         """ Enable trace
281
282         :param name: Name of the trace
283         :type name: str
284         """
285         rm = self.get_resource(guid)
286         rm.register_trace(name)
287
288     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
289         """ Get information on collected trace
290
291         :param name: Name of the trace
292         :type name: str
293
294         :param attr: Can be one of:
295                          - TraceAttr.ALL (complete trace content), 
296                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
297                          - TraceAttr.PATH (full path to the trace file),
298                          - TraceAttr.SIZE (size of trace file). 
299         :type attr: str
300
301         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
302         :type name: int
303
304         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
305         :type name: int
306
307         :rtype: str
308         """
309         rm = self.get_resource(guid)
310         return rm.trace(name, attr, block, offset)
311
312     def discover(self, guid):
313         """ Discover a specific RM defined by its 'guid'
314
315             :param guid: Guid of the RM
316             :type guid: int
317
318         """
319         rm = self.get_resource(guid)
320         return rm.discover()
321
322     def provision(self, guid):
323         """ Provision a specific RM defined by its 'guid'
324
325             :param guid: Guid of the RM
326             :type guid: int
327
328         """
329         rm = self.get_resource(guid)
330         return rm.provision()
331
332     def get(self, guid, name):
333         """ Get a specific attribute 'name' from the RM 'guid'
334
335             :param guid: Guid of the RM
336             :type guid: int
337
338             :param name: attribute's name
339             :type name: str
340
341         """
342         rm = self.get_resource(guid)
343         return rm.get(name)
344
345     def set(self, guid, name, value):
346         """ Set a specific attribute 'name' from the RM 'guid' 
347             with the value 'value' 
348
349             :param guid: Guid of the RM
350             :type guid: int
351
352             :param name: attribute's name
353             :type name: str
354
355             :param value: attribute's value
356
357         """
358         rm = self.get_resource(guid)
359         return rm.set(name, value)
360
361     def state(self, guid, hr = False):
362         """ Returns the state of a resource
363
364             :param guid: Resource guid
365             :type guid: integer
366
367             :param hr: Human readable. Forces return of a 
368                 status string instead of a number 
369             :type hr: boolean
370
371         """
372         rm = self.get_resource(guid)
373         state = rm.state
374
375         if hr:
376             return ResourceState2str.get(state)
377
378         return state
379
380     def stop(self, guid):
381         """ Stop a specific RM defined by its 'guid'
382
383             :param guid: Guid of the RM
384             :type guid: int
385
386         """
387         rm = self.get_resource(guid)
388         return rm.stop()
389
390     def start(self, guid):
391         """ Start a specific RM defined by its 'guid'
392
393             :param guid: Guid of the RM
394             :type guid: int
395
396         """
397         rm = self.get_resource(guid)
398         return rm.start()
399
400     def set_with_conditions(self, name, value, group1, group2, state,
401             time = None):
402         """ Set value 'value' on attribute with name 'name' on all RMs of
403             group1 when 'time' has elapsed since all elements in group2 
404             have reached state 'state'.
405
406             :param name: Name of attribute to set in RM
407             :type name: string
408
409             :param value: Value of attribute to set in RM
410             :type name: string
411
412             :param group1: List of guids of RMs subjected to action
413             :type group1: list
414
415             :param action: Action to register (either START or STOP)
416             :type action: ResourceAction
417
418             :param group2: List of guids of RMs to we waited for
419             :type group2: list
420
421             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
422             :type state: ResourceState
423
424             :param time: Time to wait after group2 has reached status 
425             :type time: string
426
427         """
428         if isinstance(group1, int):
429             group1 = [group1]
430         if isinstance(group2, int):
431             group2 = [group2]
432
433         for guid1 in group1:
434             rm = self.get_resource(guid)
435             rm.set_with_conditions(name, value, group2, state, time)
436
437     def stop_with_conditions(self, guid):
438         """ Stop a specific RM defined by its 'guid' only if all the conditions are true
439
440             :param guid: Guid of the RM
441             :type guid: int
442
443         """
444         rm = self.get_resource(guid)
445         return rm.stop_with_conditions()
446
447     def start_with_conditions(self, guid):
448         """ Start a specific RM defined by its 'guid' only if all the conditions are true
449
450             :param guid: Guid of the RM
451             :type guid: int
452
453         """
454         rm = self.get_resource(guid)
455         return rm.start_with_condition()
456
457     def deploy(self, group = None, wait_all_ready = True):
458         """ Deploy all resource manager in group
459
460         :param group: List of guids of RMs to deploy
461         :type group: list
462
463         :param wait_all_ready: Wait until all RMs are ready in
464             order to start the RMs
465         :type guid: int
466
467         """
468         self.logger.debug(" ------- DEPLOY START ------ ")
469
470         if not group:
471             # By default, if not deployment group is indicated, 
472             # all RMs that are undeployed will be deployed
473             group = []
474             for guid in self.resources:
475                 if self.state(guid) == ResourceState.NEW:
476                     group.append(guid)
477                 
478         if isinstance(group, int):
479             group = [group]
480
481         # Before starting deployment we disorder the group list with the
482         # purpose of speeding up the whole deployment process.
483         # It is likely that the user inserted in the 'group' list closely
484         # resources one after another (e.g. all applications
485         # connected to the same node can likely appear one after another).
486         # This can originate a slow down in the deployment since the N 
487         # threads the parallel runner uses to processes tasks may all
488         # be taken up by the same family of resources waiting for the 
489         # same conditions (e.g. LinuxApplications running on a same 
490         # node share a single lock, so they will tend to be serialized).
491         # If we disorder the group list, this problem can be mitigated.
492         random.shuffle(group)
493
494         def wait_all_and_start(group):
495             reschedule = False
496             for guid in group:
497                 if self.state(guid) < ResourceState.READY:
498                     reschedule = True
499                     break
500
501             if reschedule:
502                 callback = functools.partial(wait_all_and_start, group)
503                 self.schedule("1s", callback)
504             else:
505                 # If all resources are read, we schedule the start
506                 for guid in group:
507                     rm = self.get_resource(guid)
508                     self.schedule("0s", rm.start_with_conditions)
509
510         if wait_all_ready:
511             # Schedule the function that will check all resources are
512             # READY, and only then it will schedule the start.
513             # This is aimed to reduce the number of tasks looping in the scheduler.
514             # Intead of having N start tasks, we will have only one
515             callback = functools.partial(wait_all_and_start, group)
516             self.schedule("1s", callback)
517
518         for guid in group:
519             rm = self.get_resource(guid)
520             self.schedule("0s", rm.deploy)
521
522             if not wait_all_ready:
523                 self.schedule("1s", rm.start_with_conditions)
524
525             if rm.conditions.get(ResourceAction.STOP):
526                 # Only if the RM has STOP conditions we
527                 # schedule a stop. Otherwise the RM will stop immediately
528                 self.schedule("2s", rm.stop_with_conditions)
529
530     def release(self, group = None):
531         """ Release the elements of the list 'group' or 
532         all the resources if any group is specified
533
534             :param group: List of RM
535             :type group: list
536
537         """
538         if not group:
539             group = self.resources
540
541         threads = []
542         for guid in group:
543             rm = self.get_resource(guid)
544             thread = threading.Thread(target=rm.release)
545             threads.append(thread)
546             thread.setDaemon(True)
547             thread.start()
548
549         while list(threads) and not self.finished:
550             thread = threads[0]
551             # Time out after 5 seconds to check EC not terminated
552             thread.join(5)
553             if not thread.is_alive():
554                 threads.remove(thread)
555         
556     def shutdown(self):
557         """ Shutdown the Experiment Controller. 
558         Releases all the resources and stops task processing thread
559
560         """
561         self.release()
562
563         # Mark the EC state as TERMINATED
564         self._state = ECState.TERMINATED
565
566         # Notify condition to wake up the processing thread
567         self._notify()
568         
569         if self._thread.is_alive():
570            self._thread.join()
571
572     def schedule(self, date, callback, track = False):
573         """ Schedule a callback to be executed at time date.
574
575             :param date: string containing execution time for the task.
576                     It can be expressed as an absolute time, using
577                     timestamp format, or as a relative time matching
578                     ^\d+.\d+(h|m|s|ms|us)$
579
580             :param callback: code to be executed for the task. Must be a
581                         Python function, and receives args and kwargs
582                         as arguments.
583
584             :param track: if set to True, the task will be retrivable with
585                     the get_task() method
586
587             :return : The Id of the task
588         """
589         timestamp = strfvalid(date)
590         
591         task = Task(timestamp, callback)
592         task = self._scheduler.schedule(task)
593
594         if track:
595             self._tasks[task.id] = task
596
597         # Notify condition to wake up the processing thread
598         self._notify()
599
600         return task.id
601      
602     def _process(self):
603         """ Process scheduled tasks.
604
605         .. note::
606
607         The _process method is executed in an independent thread held by the 
608         ExperimentController for as long as the experiment is running.
609         
610         Tasks are scheduled by invoking the schedule method with a target callback. 
611         The schedule method is given a execution time which controls the
612         order in which tasks are processed. 
613
614         Tasks are processed in parallel using multithreading. 
615         The environmental variable NEPI_NTHREADS can be used to control
616         the number of threads used to process tasks. The default value is 50.
617
618         Exception handling:
619
620         To execute tasks in parallel, an ParallelRunner (PR) object, holding
621         a pool of threads (workers), is used.
622         For each available thread in the PR, the next task popped from 
623         the scheduler queue is 'put' in the PR.
624         Upon receiving a task to execute, each PR worker (thread) invokes the 
625         _execute method of the EC, passing the task as argument. 
626         This method, calls task.callback inside a try/except block. If an 
627         exception is raised by the tasks.callback, it will be trapped by the 
628         try block, logged to standard error (usually the console), and the EC 
629         state will be set to ECState.FAILED.
630         The invocation of _notify immediately after, forces the processing
631         loop in the _process method, to wake up if it was blocked waiting for new 
632         tasks to arrived, and to check the EC state.
633         As the EC is in FAILED state, the processing loop exits and the 
634         'finally' block is invoked. In the 'finally' block, the 'sync' method
635         of the PR is invoked, which forces the PR to raise any unchecked errors
636         that might have been raised by the workers.
637
638         """
639         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
640
641         runner = ParallelRun(maxthreads = nthreads)
642         runner.start()
643
644         try:
645             while not self.finished:
646                 self._cond.acquire()
647
648                 task = self._scheduler.next()
649                 
650                 if not task:
651                     # No task to execute. Wait for a new task to be scheduled.
652                     self._cond.wait()
653                 else:
654                     # The task timestamp is in the future. Wait for timeout 
655                     # or until another task is scheduled.
656                     now = strfnow()
657                     if now < task.timestamp:
658                         # Calculate timeout in seconds
659                         timeout = strfdiff(task.timestamp, now)
660
661                         # Re-schedule task with the same timestamp
662                         self._scheduler.schedule(task)
663                         
664                         task = None
665
666                         # Wait timeout or until a new task awakes the condition
667                         self._cond.wait(timeout)
668                
669                 self._cond.release()
670
671                 if task:
672                     # Process tasks in parallel
673                     runner.put(self._execute, task)
674         except: 
675             import traceback
676             err = traceback.format_exc()
677             self.logger.error("Error while processing tasks in the EC: %s" % err)
678
679             self._state = ECState.FAILED
680         finally:   
681             self.logger.debug("Exiting the task processing loop ... ")
682             runner.sync()
683
684     def _execute(self, task):
685         """ Executes a single task. 
686
687             :param task: Object containing the callback to execute
688             :type task: Task
689
690         .. note::
691
692         If the invokation of the task callback raises an
693         exception, the processing thread of the ExperimentController
694         will be stopped and the experiment will be aborted.
695
696         """
697         # Invoke callback
698         task.status = TaskStatus.DONE
699
700         try:
701             task.result = task.callback()
702         except:
703             import traceback
704             err = traceback.format_exc()
705             task.result = err
706             task.status = TaskStatus.ERROR
707             
708             self.logger.error("Error occurred while executing task: %s" % err)
709
710             # Set the EC to FAILED state (this will force to exit the task
711             # processing thread)
712             self._state = ECState.FAILED
713
714             # Notify condition to wake up the processing thread
715             self._notify()
716
717             # Propage error to the ParallelRunner
718             raise
719
720     def _notify(self):
721         """ Awakes the processing thread in case it is blocked waiting
722         for a new task to be scheduled.
723         """
724         self._cond.acquire()
725         self._cond.notify()
726         self._cond.release()
727