c4917c0ead4afea7f1152626891f7bbb6b77b88e
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 import functools
21 import logging
22 import os
23 import random
24 import sys
25 import time
26 import threading
27
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid 
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32         ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
35
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
38
39 class ECState(object):
40     """ State of the Experiment Controller
41    
42     """
43     RUNNING = 1
44     FAILED = 2
45     TERMINATED = 3
46
47 class ExperimentController(object):
48     """
49     .. class:: Class Args :
50       
51         :param exp_id: Id of the experiment
52         :type exp_id: int
53         :param root_dir: Root directory of the experiment
54         :type root_dir: str
55
56     .. note::
57         This class is the only one used by the User. Indeed, the user "talks"
58         only with the Experiment Controller and this latter forward to 
59         the different Resources Manager the order provided by the user.
60
61     """
62
63     def __init__(self, exp_id = None, root_dir = "/tmp"): 
64         super(ExperimentController, self).__init__()
65         # root directory to store files
66         self._root_dir = root_dir
67
68         # experiment identifier given by the user
69         self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
70
71         # generator of globally unique ids
72         self._guid_generator = guid.GuidGenerator()
73         
74         # Resource managers
75         self._resources = dict()
76
77         # Scheduler
78         self._scheduler = HeapScheduler()
79
80         # Tasks
81         self._tasks = dict()
82
83         # Event processing thread
84         self._cond = threading.Condition()
85         self._thread = threading.Thread(target = self._process)
86         self._thread.setDaemon(True)
87         self._thread.start()
88
89         # EC state
90         self._state = ECState.RUNNING
91
92         # Logging
93         self._logger = logging.getLogger("ExperimentController")
94
95     @property
96     def logger(self):
97         """ Return the logger of the Experiment Controller
98
99         """
100         return self._logger
101
102     @property
103     def ecstate(self):
104         """ Return the state of the Experiment Controller
105
106         """
107         return self._state
108
109     @property
110     def exp_id(self):
111         """ Return the experiment ID
112
113         """
114         exp_id = self._exp_id
115         if not exp_id.startswith("nepi-"):
116             exp_id = "nepi-" + exp_id
117         return exp_id
118
119     @property
120     def finished(self):
121         """ Put the state of the Experiment Controller into a final state :
122             Either TERMINATED or FAILED
123
124         """
125         return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
126
127     def wait_finished(self, guids):
128         """ Blocking method that wait until all the RM from the 'guid' list 
129             reach the state FINISHED
130
131         :param guids: List of guids
132         :type guids: list
133         """
134         if isinstance(guids, int):
135             guids = [guids]
136
137         while not all([self.state(guid) in [ResourceState.FINISHED, 
138             ResourceState.STOPPED, 
139             ResourceState.FAILED] \
140                 for guid in guids]) and not self.finished:
141             # We keep the sleep as large as possible to 
142             # decrese the number of RM state requests
143             time.sleep(2)
144     
145     def get_task(self, tid):
146         """ Get a specific task
147
148         :param tid: Id of the task
149         :type tid: int
150         :rtype:  unknow
151         """
152         return self._tasks.get(tid)
153
154     def get_resource(self, guid):
155         """ Get a specific Resource Manager
156
157         :param guid: Id of the task
158         :type guid: int
159         :rtype:  ResourceManager
160         """
161         return self._resources.get(guid)
162
163     @property
164     def resources(self):
165         """ Returns the list of all the Resource Manager Id
166
167         :rtype: set
168
169         """
170         return self._resources.keys()
171
172     def register_resource(self, rtype, guid = None):
173         """ Register a Resource Manager. It creates a new 'guid', if it is not specified, 
174         for the RM of type 'rtype' and add it to the list of Resources.
175
176         :param rtype: Type of the RM
177         :type rtype: str
178         :return: Id of the RM
179         :rtype: int
180         """
181         # Get next available guid
182         guid = self._guid_generator.next(guid)
183         
184         # Instantiate RM
185         rm = ResourceFactory.create(rtype, self, guid)
186
187         # Store RM
188         self._resources[guid] = rm
189
190         return guid
191
192     def get_attributes(self, guid):
193         """ Return all the attibutes of a specific RM
194
195         :param guid: Guid of the RM
196         :type guid: int
197         :return: List of attributes
198         :rtype: list
199         """
200         rm = self.get_resource(guid)
201         return rm.get_attributes()
202
203     def register_connection(self, guid1, guid2):
204         """ Registers a guid1 with a guid2. 
205             The declaration order is not important
206
207             :param guid1: First guid to connect
208             :type guid1: ResourceManager
209
210             :param guid2: Second guid to connect
211             :type guid: ResourceManager
212         """
213         rm1 = self.get_resource(guid1)
214         rm2 = self.get_resource(guid2)
215
216         rm1.connect(guid2)
217         rm2.connect(guid1)
218
219     def register_condition(self, group1, action, group2, state,
220             time = None):
221         """ Registers an action START or STOP for all RM on group1 to occur 
222             time 'time' after all elements in group2 reached state 'state'.
223
224             :param group1: List of guids of RMs subjected to action
225             :type group1: list
226
227             :param action: Action to register (either START or STOP)
228             :type action: ResourceAction
229
230             :param group2: List of guids of RMs to we waited for
231             :type group2: list
232
233             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
234             :type state: ResourceState
235
236             :param time: Time to wait after group2 has reached status 
237             :type time: string
238
239         """
240         if isinstance(group1, int):
241             group1 = [group1]
242         if isinstance(group2, int):
243             group2 = [group2]
244
245         for guid1 in group1:
246             rm = self.get_resource(guid1)
247             rm.register_condition(action, group2, state, time)
248
249     def register_trace(self, guid, name):
250         """ Enable trace
251
252         :param name: Name of the trace
253         :type name: str
254         """
255         rm = self.get_resource(guid)
256         rm.register_trace(name)
257
258     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
259         """ Get information on collected trace
260
261         :param name: Name of the trace
262         :type name: str
263
264         :param attr: Can be one of:
265                          - TraceAttr.ALL (complete trace content), 
266                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
267                          - TraceAttr.PATH (full path to the trace file),
268                          - TraceAttr.SIZE (size of trace file). 
269         :type attr: str
270
271         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
272         :type name: int
273
274         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
275         :type name: int
276
277         :rtype: str
278         """
279         rm = self.get_resource(guid)
280         return rm.trace(name, attr, block, offset)
281
282     def discover(self, guid):
283         """ Discover a specific RM defined by its 'guid'
284
285             :param guid: Guid of the RM
286             :type guid: int
287
288         """
289         rm = self.get_resource(guid)
290         return rm.discover()
291
292     def provision(self, guid):
293         """ Provision a specific RM defined by its 'guid'
294
295             :param guid: Guid of the RM
296             :type guid: int
297
298         """
299         rm = self.get_resource(guid)
300         return rm.provision()
301
302     def get(self, guid, name):
303         """ Get a specific attribute 'name' from the RM 'guid'
304
305             :param guid: Guid of the RM
306             :type guid: int
307
308             :param name: attribute's name
309             :type name: str
310
311         """
312         rm = self.get_resource(guid)
313         return rm.get(name)
314
315     def set(self, guid, name, value):
316         """ Set a specific attribute 'name' from the RM 'guid' 
317             with the value 'value' 
318
319             :param guid: Guid of the RM
320             :type guid: int
321
322             :param name: attribute's name
323             :type name: str
324
325             :param value: attribute's value
326
327         """
328         rm = self.get_resource(guid)
329         return rm.set(name, value)
330
331     def state(self, guid, hr = False):
332         """ Returns the state of a resource
333
334             :param guid: Resource guid
335             :type guid: integer
336
337             :param hr: Human readable. Forces return of a 
338                 status string instead of a number 
339             :type hr: boolean
340
341         """
342         rm = self.get_resource(guid)
343         if hr:
344             return ResourceState2str.get(rm.state)
345
346         return rm.state
347
348     def stop(self, guid):
349         """ Stop a specific RM defined by its 'guid'
350
351             :param guid: Guid of the RM
352             :type guid: int
353
354         """
355         rm = self.get_resource(guid)
356         return rm.stop()
357
358     def start(self, guid):
359         """ Start a specific RM defined by its 'guid'
360
361             :param guid: Guid of the RM
362             :type guid: int
363
364         """
365         rm = self.get_resource(guid)
366         return rm.start()
367
368     def set_with_conditions(self, name, value, group1, group2, state,
369             time = None):
370         """ Set value 'value' on attribute with name 'name' on all RMs of
371             group1 when 'time' has elapsed since all elements in group2 
372             have reached state 'state'.
373
374             :param name: Name of attribute to set in RM
375             :type name: string
376
377             :param value: Value of attribute to set in RM
378             :type name: string
379
380             :param group1: List of guids of RMs subjected to action
381             :type group1: list
382
383             :param action: Action to register (either START or STOP)
384             :type action: ResourceAction
385
386             :param group2: List of guids of RMs to we waited for
387             :type group2: list
388
389             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
390             :type state: ResourceState
391
392             :param time: Time to wait after group2 has reached status 
393             :type time: string
394
395         """
396         if isinstance(group1, int):
397             group1 = [group1]
398         if isinstance(group2, int):
399             group2 = [group2]
400
401         for guid1 in group1:
402             rm = self.get_resource(guid)
403             rm.set_with_conditions(name, value, group2, state, time)
404
405     def stop_with_conditions(self, guid):
406         """ Stop a specific RM defined by its 'guid' only if all the conditions are true
407
408             :param guid: Guid of the RM
409             :type guid: int
410
411         """
412         rm = self.get_resource(guid)
413         return rm.stop_with_conditions()
414
415     def start_with_conditions(self, guid):
416         """ Start a specific RM defined by its 'guid' only if all the conditions are true
417
418             :param guid: Guid of the RM
419             :type guid: int
420
421         """
422         rm = self.get_resource(guid)
423         return rm.start_with_condition()
424
425     def deploy(self, group = None, wait_all_ready = True):
426         """ Deploy all resource manager in group
427
428         :param group: List of guids of RMs to deploy
429         :type group: list
430
431         :param wait_all_ready: Wait until all RMs are ready in
432             order to start the RMs
433         :type guid: int
434
435         """
436         self.logger.debug(" ------- DEPLOY START ------ ")
437
438         if not group:
439             group = self.resources
440
441         if isinstance(group, int):
442             group = [group]
443
444         # Before starting deployment we disorder the group list with the
445         # purpose of speeding up the whole deployment process.
446         # It is likely that the user inserted in the 'group' list closely
447         # resources one after another (e.g. all applications
448         # connected to the same node can likely appear one after another).
449         # This can originate a slow down in the deployment since the N 
450         # threads the parallel runner uses to processes tasks may all
451         # be taken up by the same family of resources waiting for the 
452         # same conditions (e.g. LinuxApplications running on a same 
453         # node share a single lock, so they will tend to be serialized).
454         # If we disorder the group list, this problem can be mitigated.
455         #random.shuffle(group)
456
457         def wait_all_and_start(group):
458             reschedule = False
459             for guid in group:
460                 rm = self.get_resource(guid)
461                 if rm.state < ResourceState.READY:
462                     reschedule = True
463                     break
464
465             if reschedule:
466                 callback = functools.partial(wait_all_and_start, group)
467                 self.schedule("1s", callback)
468             else:
469                 # If all resources are read, we schedule the start
470                 for guid in group:
471                     rm = self.get_resource(guid)
472                     self.schedule("0s", rm.start_with_conditions)
473
474         if wait_all_ready:
475             # Schedule the function that will check all resources are
476             # READY, and only then it will schedule the start.
477             # This is aimed to reduce the number of tasks looping in the scheduler.
478             # Intead of having N start tasks, we will have only one
479             callback = functools.partial(wait_all_and_start, group)
480             self.schedule("1s", callback)
481
482         for guid in group:
483             rm = self.get_resource(guid)
484             self.schedule("0s", rm.deploy)
485
486             if not wait_all_ready:
487                 self.schedule("1s", rm.start_with_conditions)
488
489             if rm.conditions.get(ResourceAction.STOP):
490                 # Only if the RM has STOP conditions we
491                 # schedule a stop. Otherwise the RM will stop immediately
492                 self.schedule("2s", rm.stop_with_conditions)
493
494
495     def release(self, group = None):
496         """ Release the elements of the list 'group' or 
497         all the resources if any group is specified
498
499             :param group: List of RM
500             :type group: list
501
502         """
503         if not group:
504             group = self.resources
505
506         threads = []
507         for guid in group:
508             rm = self.get_resource(guid)
509             thread = threading.Thread(target=rm.release)
510             threads.append(thread)
511             thread.setDaemon(True)
512             thread.start()
513
514         while list(threads) and not self.finished:
515             thread = threads[0]
516             # Time out after 5 seconds to check EC not terminated
517             thread.join(5)
518             if not thread.is_alive():
519                 threads.remove(thread)
520         
521     def shutdown(self):
522         """ Shutdown the Experiment Controller. 
523         Releases all the resources and stops task processing thread
524
525         """
526         self.release()
527
528         # Mark the EC state as TERMINATED
529         self._state = ECState.TERMINATED
530
531         # Notify condition to wake up the processing thread
532         self._notify()
533         
534         if self._thread.is_alive():
535            self._thread.join()
536
537     def schedule(self, date, callback, track = False):
538         """ Schedule a callback to be executed at time date.
539
540             :param date: string containing execution time for the task.
541                     It can be expressed as an absolute time, using
542                     timestamp format, or as a relative time matching
543                     ^\d+.\d+(h|m|s|ms|us)$
544
545             :param callback: code to be executed for the task. Must be a
546                         Python function, and receives args and kwargs
547                         as arguments.
548
549             :param track: if set to True, the task will be retrivable with
550                     the get_task() method
551
552             :return : The Id of the task
553         """
554         timestamp = strfvalid(date)
555         
556         task = Task(timestamp, callback)
557         task = self._scheduler.schedule(task)
558
559         if track:
560             self._tasks[task.id] = task
561   
562         # Notify condition to wake up the processing thread
563         self._notify()
564
565         return task.id
566      
567     def _process(self):
568         """ Process scheduled tasks.
569
570         The _process method is executed in an independent thread held by the 
571         ExperimentController for as long as the experiment is running.
572         
573         Tasks are scheduled by invoking the schedule method with a target callback. 
574         The schedule method is givedn a execution time which controls the
575         order in which tasks are processed. 
576
577         Tasks are processed in parallel using multithreading. 
578         The environmental variable NEPI_NTHREADS can be used to control
579         the number of threads used to process tasks. The default value is 50.
580
581         """
582         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
583
584         runner = ParallelRun(maxthreads = nthreads)
585         runner.start()
586
587         try:
588             while not self.finished:
589                 self._cond.acquire()
590                 task = self._scheduler.next()
591                 self._cond.release()
592                 
593                 if not task:
594                     # It there are not tasks in the tasks queue we need to 
595                     # wait until a call to schedule wakes us up
596                     self._cond.acquire()
597                     self._cond.wait()
598                     self._cond.release()
599                 else: 
600                     # If the task timestamp is in the future the thread needs to wait
601                     # until time elapse or until another task is scheduled
602                     now = strfnow()
603                     if now < task.timestamp:
604                         # Calculate time difference in seconds
605                         timeout = strfdiff(task.timestamp, now)
606                         # Re-schedule task with the same timestamp
607                         self._scheduler.schedule(task)
608                         # Sleep until timeout or until a new task awakes the condition
609                         self._cond.acquire()
610                         self._cond.wait(timeout)
611                         self._cond.release()
612                     else:
613                         # Process tasks in parallel
614                         runner.put(self._execute, task)
615         except: 
616             import traceback
617             err = traceback.format_exc()
618             self._logger.error("Error while processing tasks in the EC: %s" % err)
619
620             self._state = ECState.FAILED
621         finally:   
622             runner.sync()
623
624     def _execute(self, task):
625         """ Executes a single task. 
626
627             If the invokation of the task callback raises an
628             exception, the processing thread of the ExperimentController
629             will be stopped and the experiment will be aborted.
630
631             :param task: Object containing the callback to execute
632             :type task: Task
633
634         """
635         # Invoke callback
636         task.status = TaskStatus.DONE
637
638         try:
639             task.result = task.callback()
640         except:
641             import traceback
642             err = traceback.format_exc()
643             task.result = err
644             task.status = TaskStatus.ERROR
645             
646             self._logger.error("Error occurred while executing task: %s" % err)
647
648             # Set the EC to FAILED state (this will force to exit the task
649             # processing thread)
650             self._state = ECState.FAILED
651
652             # Notify condition to wake up the processing thread
653             self._notify()
654
655             # Propage error to the ParallelRunner
656             raise
657
658     def _notify(self):
659         """ Awakes the processing thread in case it is blocked waiting
660         for a new task to be scheduled.
661         """
662         self._cond.acquire()
663         self._cond.notify()
664         self._cond.release()
665