upgrade OMF code and comment
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 import functools
21 import logging
22 import os
23 import random
24 import sys
25 import time
26 import threading
27
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid 
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32         ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
35
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
38
39 class ECState(object):
40     RUNNING = 1
41     FAILED = 2
42     TERMINATED = 3
43
44 class ExperimentController(object):
45     def __init__(self, exp_id = None, root_dir = "/tmp"): 
46         super(ExperimentController, self).__init__()
47         # root directory to store files
48         self._root_dir = root_dir
49
50         # experiment identifier given by the user
51         self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
52
53         # generator of globally unique ids
54         self._guid_generator = guid.GuidGenerator()
55         
56         # Resource managers
57         self._resources = dict()
58
59         # Scheduler
60         self._scheduler = HeapScheduler()
61
62         # Tasks
63         self._tasks = dict()
64
65         # Event processing thread
66         self._cond = threading.Condition()
67         self._thread = threading.Thread(target = self._process)
68         self._thread.setDaemon(True)
69         self._thread.start()
70
71         # EC state
72         self._state = ECState.RUNNING
73
74         # Logging
75         self._logger = logging.getLogger("ExperimentController")
76
77     @property
78     def logger(self):
79         return self._logger
80
81     @property
82     def ecstate(self):
83         return self._state
84
85     @property
86     def exp_id(self):
87         exp_id = self._exp_id
88         if not exp_id.startswith("nepi-"):
89             exp_id = "nepi-" + exp_id
90         return exp_id
91
92     @property
93     def finished(self):
94         return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
95
96     def wait_finished(self, guids):
97        # Take into account if only one guids is given in parameter
98         while not all([self.state(guid) in [ResourceState.FINISHED, 
99             ResourceState.STOPPED, 
100             ResourceState.FAILED] \
101                 for guid in guids]) and not self.finished:
102             # We keep the sleep as large as possible to 
103             # decrese the number of RM state requests
104             time.sleep(2)
105     
106     def get_task(self, tid):
107         return self._tasks.get(tid)
108
109     def get_resource(self, guid):
110         return self._resources.get(guid)
111
112     @property
113     def resources(self):
114         return self._resources.keys()
115
116     def register_resource(self, rtype, guid = None):
117         # Get next available guid
118         guid = self._guid_generator.next(guid)
119         
120         # Instantiate RM
121         rm = ResourceFactory.create(rtype, self, guid)
122
123         # Store RM
124         self._resources[guid] = rm
125
126         return guid
127
128     def get_attributes(self, guid):
129         rm = self.get_resource(guid)
130         return rm.get_attributes()
131
132     def register_connection(self, guid1, guid2):
133         rm1 = self.get_resource(guid1)
134         rm2 = self.get_resource(guid2)
135
136         rm1.connect(guid2)
137         rm2.connect(guid1)
138
139     def register_condition(self, group1, action, group2, state,
140             time = None):
141         """ Registers an action START or STOP for all RM on group1 to occur 
142             time 'time' after all elements in group2 reached state 'state'.
143
144             :param group1: List of guids of RMs subjected to action
145             :type group1: list
146
147             :param action: Action to register (either START or STOP)
148             :type action: ResourceAction
149
150             :param group2: List of guids of RMs to we waited for
151             :type group2: list
152
153             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
154             :type state: ResourceState
155
156             :param time: Time to wait after group2 has reached status 
157             :type time: string
158
159         """
160         if isinstance(group1, int):
161             group1 = [group1]
162         if isinstance(group2, int):
163             group2 = [group2]
164
165         for guid1 in group1:
166             rm = self.get_resource(guid1)
167             rm.register_condition(action, group2, state, time)
168
169     def register_trace(self, guid, name):
170         """ Enable trace
171
172         :param name: Name of the trace
173         :type name: str
174         """
175         rm = self.get_resource(guid)
176         rm.register_trace(name)
177
178     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
179         """ Get information on collected trace
180
181         :param name: Name of the trace
182         :type name: str
183
184         :param attr: Can be one of:
185                          - TraceAttr.ALL (complete trace content), 
186                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
187                          - TraceAttr.PATH (full path to the trace file),
188                          - TraceAttr.SIZE (size of trace file). 
189         :type attr: str
190
191         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
192         :type name: int
193
194         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
195         :type name: int
196
197         :rtype: str
198         """
199         rm = self.get_resource(guid)
200         return rm.trace(name, attr, block, offset)
201
202     def discover(self, guid):
203         rm = self.get_resource(guid)
204         return rm.discover()
205
206     def provision(self, guid):
207         rm = self.get_resource(guid)
208         return rm.provision()
209
210     def get(self, guid, name):
211         rm = self.get_resource(guid)
212         return rm.get(name)
213
214     def set(self, guid, name, value):
215         rm = self.get_resource(guid)
216         return rm.set(name, value)
217
218     def state(self, guid, hr = False):
219         """ Returns the state of a resource
220
221             :param guid: Resource guid
222             :type guid: integer
223
224             :param hr: Human readable. Forces return of a 
225                 status string instead of a number 
226             :type hr: boolean
227
228         """
229         rm = self.get_resource(guid)
230         if hr:
231             return ResourceState2str.get(rm.state)
232
233         return rm.state
234
235     def stop(self, guid):
236         rm = self.get_resource(guid)
237         return rm.stop()
238
239     def start(self, guid):
240         rm = self.get_resource(guid)
241         return rm.start()
242
243     def set_with_conditions(self, name, value, group1, group2, state,
244             time = None):
245         """ Set value 'value' on attribute with name 'name' on all RMs of
246             group1 when 'time' has elapsed since all elements in group2 
247             have reached state 'state'.
248
249             :param name: Name of attribute to set in RM
250             :type name: string
251
252             :param value: Value of attribute to set in RM
253             :type name: string
254
255             :param group1: List of guids of RMs subjected to action
256             :type group1: list
257
258             :param action: Action to register (either START or STOP)
259             :type action: ResourceAction
260
261             :param group2: List of guids of RMs to we waited for
262             :type group2: list
263
264             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
265             :type state: ResourceState
266
267             :param time: Time to wait after group2 has reached status 
268             :type time: string
269
270         """
271         if isinstance(group1, int):
272             group1 = [group1]
273         if isinstance(group2, int):
274             group2 = [group2]
275
276         for guid1 in group1:
277             rm = self.get_resource(guid)
278             rm.set_with_conditions(name, value, group2, state, time)
279
280     def stop_with_conditions(self, guid):
281         rm = self.get_resource(guid)
282         return rm.stop_with_conditions()
283
284     def start_with_conditions(self, guid):
285         rm = self.get_resource(guid)
286         return rm.start_with_condition()
287
288     def deploy(self, group = None, wait_all_ready = True):
289         """ Deploy all resource manager in group
290
291         :param group: List of guids of RMs to deploy
292         :type group: list
293
294         :param wait_all_ready: Wait until all RMs are ready in
295             order to start the RMs
296         :type guid: int
297
298         """
299         self.logger.debug(" ------- DEPLOY START ------ ")
300
301         if not group:
302             group = self.resources
303
304         # Before starting deployment we disorder the group list with the
305         # purpose of speeding up the whole deployment process.
306         # It is likely that the user inserted in the 'group' list closely
307         # resources one after another (e.g. all applications
308         # connected to the same node can likely appear one after another).
309         # This can originate a slow down in the deployment since the N 
310         # threads the parallel runner uses to processes tasks may all
311         # be taken up by the same family of resources waiting for the 
312         # same conditions (e.g. LinuxApplications running on a same 
313         # node share a single lock, so they will tend to be serialized).
314         # If we disorder the group list, this problem can be mitigated.
315         random.shuffle(group)
316
317         def wait_all_and_start(group):
318             reschedule = False
319             for guid in group:
320                 rm = self.get_resource(guid)
321                 if rm.state < ResourceState.READY:
322                     reschedule = True
323                     break
324
325             if reschedule:
326                 callback = functools.partial(wait_all_and_start, group)
327                 self.schedule("1s", callback)
328             else:
329                 # If all resources are read, we schedule the start
330                 for guid in group:
331                     rm = self.get_resource(guid)
332                     self.schedule("0.01s", rm.start_with_conditions)
333
334         if wait_all_ready:
335             # Schedule the function that will check all resources are
336             # READY, and only then it will schedule the start.
337             # This is aimed to reduce the number of tasks looping in the scheduler.
338             # Intead of having N start tasks, we will have only one
339             callback = functools.partial(wait_all_and_start, group)
340             self.schedule("1s", callback)
341
342         for guid in group:
343             rm = self.get_resource(guid)
344             self.schedule("0.001s", rm.deploy)
345
346             if not wait_all_ready:
347                 self.schedule("1s", rm.start_with_conditions)
348
349             if rm.conditions.get(ResourceAction.STOP):
350                 # Only if the RM has STOP conditions we
351                 # schedule a stop. Otherwise the RM will stop immediately
352                 self.schedule("2s", rm.stop_with_conditions)
353
354
355     def release(self, group = None):
356         if not group:
357             group = self.resources
358
359         threads = []
360         for guid in group:
361             rm = self.get_resource(guid)
362             thread = threading.Thread(target=rm.release)
363             threads.append(thread)
364             thread.setDaemon(True)
365             thread.start()
366
367         while list(threads) and not self.finished:
368             thread = threads[0]
369             # Time out after 5 seconds to check EC not terminated
370             thread.join(5)
371             if not thread.is_alive():
372                 threads.remove(thread)
373         
374     def shutdown(self):
375         self.release()
376
377         self._stop_scheduler()
378         
379         if self._thread.is_alive():
380            self._thread.join()
381
382     def schedule(self, date, callback, track = False):
383         """ Schedule a callback to be executed at time date.
384
385             date    string containing execution time for the task.
386                     It can be expressed as an absolute time, using
387                     timestamp format, or as a relative time matching
388                     ^\d+.\d+(h|m|s|ms|us)$
389
390             callback    code to be executed for the task. Must be a
391                         Python function, and receives args and kwargs
392                         as arguments.
393
394             track   if set to True, the task will be retrivable with
395                     the get_task() method
396         """
397         timestamp = strfvalid(date)
398         
399         task = Task(timestamp, callback)
400         task = self._scheduler.schedule(task)
401
402         if track:
403             self._tasks[task.id] = task
404   
405         # Notify condition to wake up the processing thread
406         self._cond.acquire()
407         self._cond.notify()
408         self._cond.release()
409
410         return task.id
411      
412     def _process(self):
413         runner = ParallelRun(maxthreads = 50)
414         runner.start()
415
416         try:
417             while not self.finished:
418                 self._cond.acquire()
419                 task = self._scheduler.next()
420                 self._cond.release()
421                 
422                 if not task:
423                     # It there are not tasks in the tasks queue we need to 
424                     # wait until a call to schedule wakes us up
425                     self._cond.acquire()
426                     self._cond.wait()
427                     self._cond.release()
428                 else: 
429                     # If the task timestamp is in the future the thread needs to wait
430                     # until time elapse or until another task is scheduled
431                     now = strfnow()
432                     if now < task.timestamp:
433                         # Calculate time difference in seconds
434                         timeout = strfdiff(task.timestamp, now)
435                         # Re-schedule task with the same timestamp
436                         self._scheduler.schedule(task)
437                         # Sleep until timeout or until a new task awakes the condition
438                         self._cond.acquire()
439                         self._cond.wait(timeout)
440                         self._cond.release()
441                     else:
442                         # Process tasks in parallel
443                         runner.put(self._execute, task)
444         except: 
445             import traceback
446             err = traceback.format_exc()
447             self._logger.error("Error while processing tasks in the EC: %s" % err)
448
449             self._state = ECState.FAILED
450    
451         # Mark EC state as terminated
452         if self.ecstate == ECState.RUNNING:
453             # Synchronize to get errors if occurred
454             runner.sync()
455             self._state = ECState.TERMINATED
456
457     def _execute(self, task):
458         # Invoke callback
459         task.status = TaskStatus.DONE
460
461         try:
462             task.result = task.callback()
463         except:
464             import traceback
465             err = traceback.format_exc()
466             task.result = err
467             task.status = TaskStatus.ERROR
468             
469             self._logger.error("Error occurred while executing task: %s" % err)
470
471             self._stop_scheduler()
472
473             # Propage error to the ParallelRunner
474             raise
475
476     def _stop_scheduler(self):
477         # Mark the EC as failed
478         self._state = ECState.FAILED
479
480         # Wake up the EC in case it was sleeping
481         self._cond.acquire()
482         self._cond.notify()
483         self._cond.release()
484
485