5d0363e3e19210542ee72512273a030be15e9faf
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 import functools
21 import logging
22 import os
23 import random
24 import sys
25 import time
26 import threading
27
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid 
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32         ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
35
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
38
39 class ECState(object):
40     """ State of the Experiment Controller
41    
42     """
43     RUNNING = 1
44     FAILED = 2
45     TERMINATED = 3
46
47 class ExperimentController(object):
48     """
49     .. class:: Class Args :
50       
51         :param exp_id: Human readable identifier for the experiment. 
52                         It will be used in the name of the directory 
53                         where experiment related information is stored
54         :type exp_id: int
55
56         :param root_dir: Root directory where experiment specific folder
57                          will be created to store experiment information
58         :type root_dir: str
59
60     .. note::
61         The ExperimentController (EC), is the entity responsible for 
62         managing a single experiment. 
63         Through the EC interface the user can create ResourceManagers (RMs),
64         configure them and interconnect them, in order to describe the experiment.
65         
66         Only when the 'deploy()' method is invoked, the EC will take actions
67         to transform the 'described' experiment into a 'running' experiment.
68
69         While the experiment is running, it is possible to continue to
70         create/configure/connect RMs, and to deploy them to involve new
71         resources in the experiment.
72
73     """
74
75     def __init__(self, exp_id = None, root_dir = "/tmp"): 
76         super(ExperimentController, self).__init__()
77         # root directory to store files
78         self._root_dir = root_dir
79
80         # experiment identifier given by the user
81         self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
82
83         # generator of globally unique ids
84         self._guid_generator = guid.GuidGenerator()
85         
86         # Resource managers
87         self._resources = dict()
88
89         # Scheduler
90         self._scheduler = HeapScheduler()
91
92         # Tasks
93         self._tasks = dict()
94
95         # Event processing thread
96         self._cond = threading.Condition()
97         self._thread = threading.Thread(target = self._process)
98         self._thread.setDaemon(True)
99         self._thread.start()
100
101         # EC state
102         self._state = ECState.RUNNING
103
104         # Logging
105         self._logger = logging.getLogger("ExperimentController")
106
107     @property
108     def logger(self):
109         """ Return the logger of the Experiment Controller
110
111         """
112         return self._logger
113
114     @property
115     def ecstate(self):
116         """ Return the state of the Experiment Controller
117
118         """
119         return self._state
120
121     @property
122     def exp_id(self):
123         """ Return the experiment ID
124
125         """
126         exp_id = self._exp_id
127         if not exp_id.startswith("nepi-"):
128             exp_id = "nepi-" + exp_id
129         return exp_id
130
131     @property
132     def finished(self):
133         """ Put the state of the Experiment Controller into a final state :
134             Either TERMINATED or FAILED
135
136         """
137         return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
138
139     def wait_finished(self, guids):
140         """ Blocking method that wait until all the RM from the 'guid' list 
141             reach the state FINISHED
142
143         :param guids: List of guids
144         :type guids: list
145         """
146         if isinstance(guids, int):
147             guids = [guids]
148
149         while not all([self.state(guid) in [ResourceState.FINISHED, 
150             ResourceState.STOPPED, 
151             ResourceState.FAILED] \
152                 for guid in guids]) and not self.finished:
153             # We keep the sleep as large as possible to 
154             # decrese the number of RM state requests
155             time.sleep(2)
156     
157     def get_task(self, tid):
158         """ Get a specific task
159
160         :param tid: Id of the task
161         :type tid: int
162         :rtype:  unknow
163         """
164         return self._tasks.get(tid)
165
166     def get_resource(self, guid):
167         """ Get a specific Resource Manager
168
169         :param guid: Id of the task
170         :type guid: int
171         :rtype:  ResourceManager
172         """
173         return self._resources.get(guid)
174
175     @property
176     def resources(self):
177         """ Returns the list of all the Resource Manager Id
178
179         :rtype: set
180
181         """
182         return self._resources.keys()
183
184     def register_resource(self, rtype, guid = None):
185         """ Register a Resource Manager. It creates a new 'guid', if it is not specified, 
186         for the RM of type 'rtype' and add it to the list of Resources.
187
188         :param rtype: Type of the RM
189         :type rtype: str
190         :return: Id of the RM
191         :rtype: int
192         """
193         # Get next available guid
194         guid = self._guid_generator.next(guid)
195         
196         # Instantiate RM
197         rm = ResourceFactory.create(rtype, self, guid)
198
199         # Store RM
200         self._resources[guid] = rm
201
202         return guid
203
204     def get_attributes(self, guid):
205         """ Return all the attibutes of a specific RM
206
207         :param guid: Guid of the RM
208         :type guid: int
209         :return: List of attributes
210         :rtype: list
211         """
212         rm = self.get_resource(guid)
213         return rm.get_attributes()
214
215     def register_connection(self, guid1, guid2):
216         """ Registers a guid1 with a guid2. 
217             The declaration order is not important
218
219             :param guid1: First guid to connect
220             :type guid1: ResourceManager
221
222             :param guid2: Second guid to connect
223             :type guid: ResourceManager
224         """
225         rm1 = self.get_resource(guid1)
226         rm2 = self.get_resource(guid2)
227
228         rm1.connect(guid2)
229         rm2.connect(guid1)
230
231     def register_condition(self, group1, action, group2, state,
232             time = None):
233         """ Registers an action START or STOP for all RM on group1 to occur 
234             time 'time' after all elements in group2 reached state 'state'.
235
236             :param group1: List of guids of RMs subjected to action
237             :type group1: list
238
239             :param action: Action to register (either START or STOP)
240             :type action: ResourceAction
241
242             :param group2: List of guids of RMs to we waited for
243             :type group2: list
244
245             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
246             :type state: ResourceState
247
248             :param time: Time to wait after group2 has reached status 
249             :type time: string
250
251         """
252         if isinstance(group1, int):
253             group1 = [group1]
254         if isinstance(group2, int):
255             group2 = [group2]
256
257         for guid1 in group1:
258             rm = self.get_resource(guid1)
259             rm.register_condition(action, group2, state, time)
260
261     def register_trace(self, guid, name):
262         """ Enable trace
263
264         :param name: Name of the trace
265         :type name: str
266         """
267         rm = self.get_resource(guid)
268         rm.register_trace(name)
269
270     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
271         """ Get information on collected trace
272
273         :param name: Name of the trace
274         :type name: str
275
276         :param attr: Can be one of:
277                          - TraceAttr.ALL (complete trace content), 
278                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
279                          - TraceAttr.PATH (full path to the trace file),
280                          - TraceAttr.SIZE (size of trace file). 
281         :type attr: str
282
283         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
284         :type name: int
285
286         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
287         :type name: int
288
289         :rtype: str
290         """
291         rm = self.get_resource(guid)
292         return rm.trace(name, attr, block, offset)
293
294     def discover(self, guid):
295         """ Discover a specific RM defined by its 'guid'
296
297             :param guid: Guid of the RM
298             :type guid: int
299
300         """
301         rm = self.get_resource(guid)
302         return rm.discover()
303
304     def provision(self, guid):
305         """ Provision a specific RM defined by its 'guid'
306
307             :param guid: Guid of the RM
308             :type guid: int
309
310         """
311         rm = self.get_resource(guid)
312         return rm.provision()
313
314     def get(self, guid, name):
315         """ Get a specific attribute 'name' from the RM 'guid'
316
317             :param guid: Guid of the RM
318             :type guid: int
319
320             :param name: attribute's name
321             :type name: str
322
323         """
324         rm = self.get_resource(guid)
325         return rm.get(name)
326
327     def set(self, guid, name, value):
328         """ Set a specific attribute 'name' from the RM 'guid' 
329             with the value 'value' 
330
331             :param guid: Guid of the RM
332             :type guid: int
333
334             :param name: attribute's name
335             :type name: str
336
337             :param value: attribute's value
338
339         """
340         rm = self.get_resource(guid)
341         return rm.set(name, value)
342
343     def state(self, guid, hr = False):
344         """ Returns the state of a resource
345
346             :param guid: Resource guid
347             :type guid: integer
348
349             :param hr: Human readable. Forces return of a 
350                 status string instead of a number 
351             :type hr: boolean
352
353         """
354         rm = self.get_resource(guid)
355         if hr:
356             return ResourceState2str.get(rm.state)
357
358         return rm.state
359
360     def stop(self, guid):
361         """ Stop a specific RM defined by its 'guid'
362
363             :param guid: Guid of the RM
364             :type guid: int
365
366         """
367         rm = self.get_resource(guid)
368         return rm.stop()
369
370     def start(self, guid):
371         """ Start a specific RM defined by its 'guid'
372
373             :param guid: Guid of the RM
374             :type guid: int
375
376         """
377         rm = self.get_resource(guid)
378         return rm.start()
379
380     def set_with_conditions(self, name, value, group1, group2, state,
381             time = None):
382         """ Set value 'value' on attribute with name 'name' on all RMs of
383             group1 when 'time' has elapsed since all elements in group2 
384             have reached state 'state'.
385
386             :param name: Name of attribute to set in RM
387             :type name: string
388
389             :param value: Value of attribute to set in RM
390             :type name: string
391
392             :param group1: List of guids of RMs subjected to action
393             :type group1: list
394
395             :param action: Action to register (either START or STOP)
396             :type action: ResourceAction
397
398             :param group2: List of guids of RMs to we waited for
399             :type group2: list
400
401             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
402             :type state: ResourceState
403
404             :param time: Time to wait after group2 has reached status 
405             :type time: string
406
407         """
408         if isinstance(group1, int):
409             group1 = [group1]
410         if isinstance(group2, int):
411             group2 = [group2]
412
413         for guid1 in group1:
414             rm = self.get_resource(guid)
415             rm.set_with_conditions(name, value, group2, state, time)
416
417     def stop_with_conditions(self, guid):
418         """ Stop a specific RM defined by its 'guid' only if all the conditions are true
419
420             :param guid: Guid of the RM
421             :type guid: int
422
423         """
424         rm = self.get_resource(guid)
425         return rm.stop_with_conditions()
426
427     def start_with_conditions(self, guid):
428         """ Start a specific RM defined by its 'guid' only if all the conditions are true
429
430             :param guid: Guid of the RM
431             :type guid: int
432
433         """
434         rm = self.get_resource(guid)
435         return rm.start_with_condition()
436
437     def deploy(self, group = None, wait_all_ready = True):
438         """ Deploy all resource manager in group
439
440         :param group: List of guids of RMs to deploy
441         :type group: list
442
443         :param wait_all_ready: Wait until all RMs are ready in
444             order to start the RMs
445         :type guid: int
446
447         """
448         self.logger.debug(" ------- DEPLOY START ------ ")
449
450         if not group:
451             group = self.resources
452
453         if isinstance(group, int):
454             group = [group]
455
456         # Before starting deployment we disorder the group list with the
457         # purpose of speeding up the whole deployment process.
458         # It is likely that the user inserted in the 'group' list closely
459         # resources one after another (e.g. all applications
460         # connected to the same node can likely appear one after another).
461         # This can originate a slow down in the deployment since the N 
462         # threads the parallel runner uses to processes tasks may all
463         # be taken up by the same family of resources waiting for the 
464         # same conditions (e.g. LinuxApplications running on a same 
465         # node share a single lock, so they will tend to be serialized).
466         # If we disorder the group list, this problem can be mitigated.
467         #random.shuffle(group)
468
469         def wait_all_and_start(group):
470             reschedule = False
471             for guid in group:
472                 rm = self.get_resource(guid)
473                 if rm.state < ResourceState.READY:
474                     reschedule = True
475                     break
476
477             if reschedule:
478                 callback = functools.partial(wait_all_and_start, group)
479                 self.schedule("1s", callback)
480             else:
481                 # If all resources are read, we schedule the start
482                 for guid in group:
483                     rm = self.get_resource(guid)
484                     self.schedule("0s", rm.start_with_conditions)
485
486         if wait_all_ready:
487             # Schedule the function that will check all resources are
488             # READY, and only then it will schedule the start.
489             # This is aimed to reduce the number of tasks looping in the scheduler.
490             # Intead of having N start tasks, we will have only one
491             callback = functools.partial(wait_all_and_start, group)
492             self.schedule("1s", callback)
493
494         for guid in group:
495             rm = self.get_resource(guid)
496             self.schedule("0s", rm.deploy)
497
498             if not wait_all_ready:
499                 self.schedule("1s", rm.start_with_conditions)
500
501             if rm.conditions.get(ResourceAction.STOP):
502                 # Only if the RM has STOP conditions we
503                 # schedule a stop. Otherwise the RM will stop immediately
504                 self.schedule("2s", rm.stop_with_conditions)
505
506
507     def release(self, group = None):
508         """ Release the elements of the list 'group' or 
509         all the resources if any group is specified
510
511             :param group: List of RM
512             :type group: list
513
514         """
515         if not group:
516             group = self.resources
517
518         threads = []
519         for guid in group:
520             rm = self.get_resource(guid)
521             thread = threading.Thread(target=rm.release)
522             threads.append(thread)
523             thread.setDaemon(True)
524             thread.start()
525
526         while list(threads) and not self.finished:
527             thread = threads[0]
528             # Time out after 5 seconds to check EC not terminated
529             thread.join(5)
530             if not thread.is_alive():
531                 threads.remove(thread)
532         
533     def shutdown(self):
534         """ Shutdown the Experiment Controller. 
535         Releases all the resources and stops task processing thread
536
537         """
538         self.release()
539
540         # Mark the EC state as TERMINATED
541         self._state = ECState.TERMINATED
542
543         # Notify condition to wake up the processing thread
544         self._notify()
545         
546         if self._thread.is_alive():
547            self._thread.join()
548
549     def schedule(self, date, callback, track = False):
550         """ Schedule a callback to be executed at time date.
551
552             :param date: string containing execution time for the task.
553                     It can be expressed as an absolute time, using
554                     timestamp format, or as a relative time matching
555                     ^\d+.\d+(h|m|s|ms|us)$
556
557             :param callback: code to be executed for the task. Must be a
558                         Python function, and receives args and kwargs
559                         as arguments.
560
561             :param track: if set to True, the task will be retrivable with
562                     the get_task() method
563
564             :return : The Id of the task
565         """
566         timestamp = strfvalid(date)
567         
568         task = Task(timestamp, callback)
569         task = self._scheduler.schedule(task)
570
571         if track:
572             self._tasks[task.id] = task
573   
574         # Notify condition to wake up the processing thread
575         self._notify()
576
577         return task.id
578      
579     def _process(self):
580         """ Process scheduled tasks.
581
582         The _process method is executed in an independent thread held by the 
583         ExperimentController for as long as the experiment is running.
584         
585         Tasks are scheduled by invoking the schedule method with a target callback. 
586         The schedule method is given a execution time which controls the
587         order in which tasks are processed. 
588
589         Tasks are processed in parallel using multithreading. 
590         The environmental variable NEPI_NTHREADS can be used to control
591         the number of threads used to process tasks. The default value is 50.
592
593         Exception handling:
594
595         To execute tasks in parallel, an ParallelRunner (PR) object, holding
596         a pool of threads (workers), is used.
597         For each available thread in the PR, the next task popped from 
598         the scheduler queue is 'put' in the PR.
599         Upon receiving a task to execute, each PR worker (thread) invokes the 
600         _execute method of the EC, passing the task as argument. 
601         This method, calls task.callback inside a try/except block. If an 
602         exception is raised by the tasks.callback, it will be trapped by the 
603         try block, logged to standard error (usually the console), and the EC 
604         state will be set to ECState.FAILED.
605         The invocation of _notify immediately after, forces the processing
606         loop in the _process method, to wake up if it was blocked waiting for new 
607         tasks to arrived, and to check the EC state.
608         As the EC is in FAILED state, the processing loop exits and the 
609         'finally' block is invoked. In the 'finally' block, the 'sync' method
610         of the PR is invoked, which forces the PR to raise any unchecked errors
611         that might have been raised by the workers.
612
613         """
614         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
615
616         runner = ParallelRun(maxthreads = nthreads)
617         runner.start()
618
619         try:
620             while not self.finished:
621                 self._cond.acquire()
622                 task = self._scheduler.next()
623                 self._cond.release()
624                 
625                 if not task:
626                     # It there are not tasks in the tasks queue we need to 
627                     # wait until a call to schedule wakes us up
628                     self._cond.acquire()
629                     self._cond.wait()
630                     self._cond.release()
631                 else: 
632                     # If the task timestamp is in the future the thread needs to wait
633                     # until time elapse or until another task is scheduled
634                     now = strfnow()
635                     if now < task.timestamp:
636                         # Calculate time difference in seconds
637                         timeout = strfdiff(task.timestamp, now)
638                         # Re-schedule task with the same timestamp
639                         self._scheduler.schedule(task)
640                         # Sleep until timeout or until a new task awakes the condition
641                         self._cond.acquire()
642                         self._cond.wait(timeout)
643                         self._cond.release()
644                     else:
645                         # Process tasks in parallel
646                         runner.put(self._execute, task)
647         except: 
648             import traceback
649             err = traceback.format_exc()
650             self._logger.error("Error while processing tasks in the EC: %s" % err)
651
652             self._state = ECState.FAILED
653         finally:   
654             runner.sync()
655
656     def _execute(self, task):
657         """ Executes a single task. 
658
659             If the invokation of the task callback raises an
660             exception, the processing thread of the ExperimentController
661             will be stopped and the experiment will be aborted.
662
663             :param task: Object containing the callback to execute
664             :type task: Task
665
666         """
667         # Invoke callback
668         task.status = TaskStatus.DONE
669
670         try:
671             task.result = task.callback()
672         except:
673             import traceback
674             err = traceback.format_exc()
675             task.result = err
676             task.status = TaskStatus.ERROR
677             
678             self._logger.error("Error occurred while executing task: %s" % err)
679
680             # Set the EC to FAILED state (this will force to exit the task
681             # processing thread)
682             self._state = ECState.FAILED
683
684             # Notify condition to wake up the processing thread
685             self._notify()
686
687             # Propage error to the ParallelRunner
688             raise
689
690     def _notify(self):
691         """ Awakes the processing thread in case it is blocked waiting
692         for a new task to be scheduled.
693         """
694         self._cond.acquire()
695         self._cond.notify()
696         self._cond.release()
697