Bugfixing LinuxNode and LinuxApplication
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 import functools
21 import logging
22 import os
23 import random
24 import sys
25 import time
26 import threading
27
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid 
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32         ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
35
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
38
39 class ECState(object):
40     """ State of the Experiment Controller
41    
42     """
43     RUNNING = 1
44     FAILED = 2
45     TERMINATED = 3
46
47 class ExperimentController(object):
48     """
49     .. class:: Class Args :
50       
51         :param exp_id: Id of the experiment
52         :type exp_id: int
53         :param root_dir: Root directory of the experiment
54         :type root_dir: str
55
56     .. note::
57
58        This class is the only one used by the User. Indeed, the user "talks"
59        only with the Experiment Controller and this latter forward to 
60        the different Resources Manager the order provided by the user.
61
62     """
63
64     def __init__(self, exp_id = None, root_dir = "/tmp"): 
65         super(ExperimentController, self).__init__()
66         # root directory to store files
67         self._root_dir = root_dir
68
69         # experiment identifier given by the user
70         self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
71
72         # generator of globally unique ids
73         self._guid_generator = guid.GuidGenerator()
74         
75         # Resource managers
76         self._resources = dict()
77
78         # Scheduler
79         self._scheduler = HeapScheduler()
80
81         # Tasks
82         self._tasks = dict()
83
84         # Event processing thread
85         self._cond = threading.Condition()
86         self._thread = threading.Thread(target = self._process)
87         self._thread.setDaemon(True)
88         self._thread.start()
89
90         # EC state
91         self._state = ECState.RUNNING
92
93         # Logging
94         self._logger = logging.getLogger("ExperimentController")
95
96     @property
97     def logger(self):
98         """ Return the logger of the Experiment Controller
99
100         """
101         return self._logger
102
103     @property
104     def ecstate(self):
105         """ Return the state of the Experiment Controller
106
107         """
108         return self._state
109
110     @property
111     def exp_id(self):
112         """ Return the experiment ID
113
114         """
115         exp_id = self._exp_id
116         if not exp_id.startswith("nepi-"):
117             exp_id = "nepi-" + exp_id
118         return exp_id
119
120     @property
121     def finished(self):
122         """ Put the state of the Experiment Controller into a final state :
123             Either TERMINATED or FAILED
124
125         """
126         return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
127
128     def wait_finished(self, guids):
129         """ Blocking method that wait until all the RM from the 'guid' list 
130             reach the state FINISHED
131
132         :param guids: List of guids
133         :type guids: list
134         """
135         if isinstance(guids, int):
136             guids = [guids]
137
138         while not all([self.state(guid) in [ResourceState.FINISHED, 
139             ResourceState.STOPPED, 
140             ResourceState.FAILED] \
141                 for guid in guids]) and not self.finished:
142             # We keep the sleep as large as possible to 
143             # decrese the number of RM state requests
144             time.sleep(2)
145     
146     def get_task(self, tid):
147         """ Get a specific task
148
149         :param tid: Id of the task
150         :type tid: int
151         :rtype:  unknow
152         """
153         return self._tasks.get(tid)
154
155     def get_resource(self, guid):
156         """ Get a specific Resource Manager
157
158         :param guid: Id of the task
159         :type guid: int
160         :rtype:  ResourceManager
161         """
162         return self._resources.get(guid)
163
164     @property
165     def resources(self):
166         """ Returns the list of all the Resource Manager Id
167
168         :rtype:  set
169         """
170         return self._resources.keys()
171
172     def register_resource(self, rtype, guid = None):
173         """ Register a Resource Manager. It creates a new 'guid', if it is not specified, 
174         for the RM of type 'rtype' and add it to the list of Resources.
175
176         :param rtype: Type of the RM
177         :type rtype: str
178         :return : Id of the RM
179         :rtype:  int
180         """
181         # Get next available guid
182         guid = self._guid_generator.next(guid)
183         
184         # Instantiate RM
185         rm = ResourceFactory.create(rtype, self, guid)
186
187         # Store RM
188         self._resources[guid] = rm
189
190         return guid
191
192     def get_attributes(self, guid):
193         """ Return all the attibutes of a specific RM
194
195         :param guid: Guid of the RM
196         :type guid: int
197         :return : List of attributes
198         :rtype: list
199         """
200         rm = self.get_resource(guid)
201         return rm.get_attributes()
202
203     def register_connection(self, guid1, guid2):
204         """ Registers a guid1 with a guid2. 
205             The declaration order is not important
206
207             :param guid1: First guid to connect
208             :type guid1: ResourceManager
209
210             :param guid2: Second guid to connect
211             :type guid: ResourceManager
212
213         """
214         rm1 = self.get_resource(guid1)
215         rm2 = self.get_resource(guid2)
216
217         rm1.connect(guid2)
218         rm2.connect(guid1)
219
220     def register_condition(self, group1, action, group2, state,
221             time = None):
222         """ Registers an action START or STOP for all RM on group1 to occur 
223             time 'time' after all elements in group2 reached state 'state'.
224
225             :param group1: List of guids of RMs subjected to action
226             :type group1: list
227
228             :param action: Action to register (either START or STOP)
229             :type action: ResourceAction
230
231             :param group2: List of guids of RMs to we waited for
232             :type group2: list
233
234             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
235             :type state: ResourceState
236
237             :param time: Time to wait after group2 has reached status 
238             :type time: string
239
240         """
241         if isinstance(group1, int):
242             group1 = [group1]
243         if isinstance(group2, int):
244             group2 = [group2]
245
246         for guid1 in group1:
247             rm = self.get_resource(guid1)
248             rm.register_condition(action, group2, state, time)
249
250     def register_trace(self, guid, name):
251         """ Enable trace
252
253         :param name: Name of the trace
254         :type name: str
255         """
256         rm = self.get_resource(guid)
257         rm.register_trace(name)
258
259     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
260         """ Get information on collected trace
261
262         :param name: Name of the trace
263         :type name: str
264
265         :param attr: Can be one of:
266                          - TraceAttr.ALL (complete trace content), 
267                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
268                          - TraceAttr.PATH (full path to the trace file),
269                          - TraceAttr.SIZE (size of trace file). 
270         :type attr: str
271
272         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
273         :type name: int
274
275         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
276         :type name: int
277
278         :rtype: str
279         """
280         rm = self.get_resource(guid)
281         return rm.trace(name, attr, block, offset)
282
283     def discover(self, guid):
284         """ Discover a specific RM defined by its 'guid'
285
286             :param guid: Guid of the RM
287             :type guid: int
288
289         """
290         rm = self.get_resource(guid)
291         return rm.discover()
292
293     def provision(self, guid):
294         """ Provision a specific RM defined by its 'guid'
295
296             :param guid: Guid of the RM
297             :type guid: int
298
299         """
300         rm = self.get_resource(guid)
301         return rm.provision()
302
303     def get(self, guid, name):
304         """ Get a specific attribute 'name' from the RM 'guid'
305
306             :param guid: Guid of the RM
307             :type guid: int
308
309             :param name: attribute's name
310             :type name: str
311
312         """
313         rm = self.get_resource(guid)
314         return rm.get(name)
315
316     def set(self, guid, name, value):
317         """ Set a specific attribute 'name' from the RM 'guid' 
318             with the value 'value' 
319
320             :param guid: Guid of the RM
321             :type guid: int
322
323             :param name: attribute's name
324             :type name: str
325
326             :param value: attribute's value
327
328         """
329         rm = self.get_resource(guid)
330         return rm.set(name, value)
331
332     def state(self, guid, hr = False):
333         """ Returns the state of a resource
334
335             :param guid: Resource guid
336             :type guid: integer
337
338             :param hr: Human readable. Forces return of a 
339                 status string instead of a number 
340             :type hr: boolean
341
342         """
343         rm = self.get_resource(guid)
344         if hr:
345             return ResourceState2str.get(rm.state)
346
347         return rm.state
348
349     def stop(self, guid):
350         """ Stop a specific RM defined by its 'guid'
351
352             :param guid: Guid of the RM
353             :type guid: int
354
355         """
356         rm = self.get_resource(guid)
357         return rm.stop()
358
359     def start(self, guid):
360         """ Start a specific RM defined by its 'guid'
361
362             :param guid: Guid of the RM
363             :type guid: int
364
365         """
366         rm = self.get_resource(guid)
367         return rm.start()
368
369     def set_with_conditions(self, name, value, group1, group2, state,
370             time = None):
371         """ Set value 'value' on attribute with name 'name' on all RMs of
372             group1 when 'time' has elapsed since all elements in group2 
373             have reached state 'state'.
374
375             :param name: Name of attribute to set in RM
376             :type name: string
377
378             :param value: Value of attribute to set in RM
379             :type name: string
380
381             :param group1: List of guids of RMs subjected to action
382             :type group1: list
383
384             :param action: Action to register (either START or STOP)
385             :type action: ResourceAction
386
387             :param group2: List of guids of RMs to we waited for
388             :type group2: list
389
390             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
391             :type state: ResourceState
392
393             :param time: Time to wait after group2 has reached status 
394             :type time: string
395
396         """
397         if isinstance(group1, int):
398             group1 = [group1]
399         if isinstance(group2, int):
400             group2 = [group2]
401
402         for guid1 in group1:
403             rm = self.get_resource(guid)
404             rm.set_with_conditions(name, value, group2, state, time)
405
406     def stop_with_conditions(self, guid):
407         """ Stop a specific RM defined by its 'guid' only if all the conditions are true
408
409             :param guid: Guid of the RM
410             :type guid: int
411
412         """
413         rm = self.get_resource(guid)
414         return rm.stop_with_conditions()
415
416     def start_with_conditions(self, guid):
417         """ Start a specific RM defined by its 'guid' only if all the conditions are true
418
419             :param guid: Guid of the RM
420             :type guid: int
421
422         """
423         rm = self.get_resource(guid)
424         return rm.start_with_condition()
425
426     def deploy(self, group = None, wait_all_ready = True):
427         """ Deploy all resource manager in group
428
429         :param group: List of guids of RMs to deploy
430         :type group: list
431
432         :param wait_all_ready: Wait until all RMs are ready in
433             order to start the RMs
434         :type guid: int
435
436         """
437         self.logger.debug(" ------- DEPLOY START ------ ")
438
439         if not group:
440             group = self.resources
441
442         if isinstance(group, int):
443             group = [group]
444
445         # Before starting deployment we disorder the group list with the
446         # purpose of speeding up the whole deployment process.
447         # It is likely that the user inserted in the 'group' list closely
448         # resources one after another (e.g. all applications
449         # connected to the same node can likely appear one after another).
450         # This can originate a slow down in the deployment since the N 
451         # threads the parallel runner uses to processes tasks may all
452         # be taken up by the same family of resources waiting for the 
453         # same conditions (e.g. LinuxApplications running on a same 
454         # node share a single lock, so they will tend to be serialized).
455         # If we disorder the group list, this problem can be mitigated.
456         #random.shuffle(group)
457
458         def wait_all_and_start(group):
459             reschedule = False
460             for guid in group:
461                 rm = self.get_resource(guid)
462                 if rm.state < ResourceState.READY:
463                     reschedule = True
464                     break
465
466             if reschedule:
467                 callback = functools.partial(wait_all_and_start, group)
468                 self.schedule("1s", callback)
469             else:
470                 # If all resources are read, we schedule the start
471                 for guid in group:
472                     rm = self.get_resource(guid)
473                     self.schedule("0s", rm.start_with_conditions)
474
475         if wait_all_ready:
476             # Schedule the function that will check all resources are
477             # READY, and only then it will schedule the start.
478             # This is aimed to reduce the number of tasks looping in the scheduler.
479             # Intead of having N start tasks, we will have only one
480             callback = functools.partial(wait_all_and_start, group)
481             self.schedule("1s", callback)
482
483         for guid in group:
484             rm = self.get_resource(guid)
485             self.schedule("0s", rm.deploy)
486
487             if not wait_all_ready:
488                 self.schedule("1s", rm.start_with_conditions)
489
490             if rm.conditions.get(ResourceAction.STOP):
491                 # Only if the RM has STOP conditions we
492                 # schedule a stop. Otherwise the RM will stop immediately
493                 self.schedule("2s", rm.stop_with_conditions)
494
495
496     def release(self, group = None):
497         """ Release the elements of the list 'group' or 
498         all the resources if any group is specified
499
500             :param group: List of RM
501             :type group: list
502
503         """
504         if not group:
505             group = self.resources
506
507         threads = []
508         for guid in group:
509             rm = self.get_resource(guid)
510             thread = threading.Thread(target=rm.release)
511             threads.append(thread)
512             thread.setDaemon(True)
513             thread.start()
514
515         while list(threads) and not self.finished:
516             thread = threads[0]
517             # Time out after 5 seconds to check EC not terminated
518             thread.join(5)
519             if not thread.is_alive():
520                 threads.remove(thread)
521         
522     def shutdown(self):
523         """ Shutdown the Experiment Controller. 
524         Releases all the resources and stops task processing thread
525
526         """
527         self.release()
528
529         # Mark the EC state as TERMINATED
530         self._state = ECState.TERMINATED
531
532         # Notify condition to wake up the processing thread
533         self._notify()
534         
535         if self._thread.is_alive():
536            self._thread.join()
537
538     def schedule(self, date, callback, track = False):
539         """ Schedule a callback to be executed at time date.
540
541             :param date: string containing execution time for the task.
542                     It can be expressed as an absolute time, using
543                     timestamp format, or as a relative time matching
544                     ^\d+.\d+(h|m|s|ms|us)$
545
546             :param callback: code to be executed for the task. Must be a
547                         Python function, and receives args and kwargs
548                         as arguments.
549
550             :param track: if set to True, the task will be retrivable with
551                     the get_task() method
552
553             :return : The Id of the task
554         """
555         timestamp = strfvalid(date)
556         
557         task = Task(timestamp, callback)
558         task = self._scheduler.schedule(task)
559
560         if track:
561             self._tasks[task.id] = task
562   
563         # Notify condition to wake up the processing thread
564         self._notify()
565
566         return task.id
567      
568     def _process(self):
569         """ Process scheduled tasks.
570
571         The _process method is executed in an independent thread held by the 
572         ExperimentController for as long as the experiment is running.
573         
574         Tasks are scheduled by invoking the schedule method with a target callback. 
575         The schedule method is givedn a execution time which controls the
576         order in which tasks are processed. 
577
578         Tasks are processed in parallel using multithreading. 
579         The environmental variable NEPI_NTHREADS can be used to control
580         the number of threads used to process tasks. The default value is 50.
581
582         """
583         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
584
585         runner = ParallelRun(maxthreads = nthreads)
586         runner.start()
587
588         try:
589             while not self.finished:
590                 self._cond.acquire()
591                 task = self._scheduler.next()
592                 self._cond.release()
593                 
594                 if not task:
595                     # It there are not tasks in the tasks queue we need to 
596                     # wait until a call to schedule wakes us up
597                     self._cond.acquire()
598                     self._cond.wait()
599                     self._cond.release()
600                 else: 
601                     # If the task timestamp is in the future the thread needs to wait
602                     # until time elapse or until another task is scheduled
603                     now = strfnow()
604                     if now < task.timestamp:
605                         # Calculate time difference in seconds
606                         timeout = strfdiff(task.timestamp, now)
607                         # Re-schedule task with the same timestamp
608                         self._scheduler.schedule(task)
609                         # Sleep until timeout or until a new task awakes the condition
610                         self._cond.acquire()
611                         self._cond.wait(timeout)
612                         self._cond.release()
613                     else:
614                         # Process tasks in parallel
615                         runner.put(self._execute, task)
616         except: 
617             import traceback
618             err = traceback.format_exc()
619             self._logger.error("Error while processing tasks in the EC: %s" % err)
620
621             self._state = ECState.FAILED
622         finally:   
623             runner.sync()
624
625     def _execute(self, task):
626         """ Executes a single task. 
627
628             If the invokation of the task callback raises an
629             exception, the processing thread of the ExperimentController
630             will be stopped and the experiment will be aborted.
631
632             :param task: Object containing the callback to execute
633             :type task: Task
634
635         """
636         # Invoke callback
637         task.status = TaskStatus.DONE
638
639         try:
640             task.result = task.callback()
641         except:
642             import traceback
643             err = traceback.format_exc()
644             task.result = err
645             task.status = TaskStatus.ERROR
646             
647             self._logger.error("Error occurred while executing task: %s" % err)
648
649             # Set the EC to FAILED state (this will force to exit the task
650             # processing thread)
651             self._state = ECState.FAILED
652
653             # Notify condition to wake up the processing thread
654             self._notify()
655
656             # Propage error to the ParallelRunner
657             raise
658
659     def _notify(self):
660         """ Awakes the processing thread in case it is blocked waiting
661         for a new task to be scheduled.
662         """
663         self._cond.acquire()
664         self._cond.notify()
665         self._cond.release()
666