b793d0e4424a3c732c839b95b5c5b24ca0b3491e
[nepi.git] / src / neco / execution / ec.py
1 import logging
2 import os
3 import sys
4 import time
5 import threading
6
7 from neco.util import guid
8 from neco.util.parallel import ParallelRun
9 from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
10 from neco.execution.resource import ResourceFactory, ResourceAction, \
11         ResourceState
12 from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
13 from neco.execution.trace import TraceAttr
14
15 # TODO: use multiprocessing instead of threading
16 # TODO: Improve speed. Too slow... !!
17
18 class ECState(object):
19     RUNNING = 1
20     FAILED = 2
21     TERMINATED = 3
22
23 class ExperimentController(object):
24     def __init__(self, exp_id = None, root_dir = "/tmp"): 
25         super(ExperimentController, self).__init__()
26         # root directory to store files
27         self._root_dir = root_dir
28
29         # experiment identifier given by the user
30         self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
31
32         # generator of globally unique ids
33         self._guid_generator = guid.GuidGenerator()
34         
35         # Resource managers
36         self._resources = dict()
37
38         # Scheduler
39         self._scheduler = HeapScheduler()
40
41         # Tasks
42         self._tasks = dict()
43
44         # Event processing thread
45         self._cond = threading.Condition()
46         self._thread = threading.Thread(target = self._process)
47         self._thread.setDaemon(True)
48         self._thread.start()
49
50         # EC state
51         self._state = ECState.RUNNING
52
53         # Logging
54         self._logger = logging.getLogger("ExperimentController")
55
56     @property
57     def logger(self):
58         return self._logger
59
60     @property
61     def ecstate(self):
62         return self._state
63
64     @property
65     def exp_id(self):
66         exp_id = self._exp_id
67         if not exp_id.startswith("nepi-"):
68             exp_id = "nepi-" + exp_id
69         return exp_id
70
71     @property
72     def finished(self):
73         return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
74
75     def wait_finished(self, guids):
76         while not all([self.state(guid) == ResourceState.FINISHED \
77                 for guid in guids]) and not self.finished:
78             # We keep the sleep as large as possible to 
79             # decrese the number of RM state requests
80             time.sleep(2)
81     
82     def get_task(self, tid):
83         return self._tasks.get(tid)
84
85     def get_resource(self, guid):
86         return self._resources.get(guid)
87
88     @property
89     def resources(self):
90         return self._resources.keys()
91
92     def register_resource(self, rtype, guid = None):
93         # Get next available guid
94         guid = self._guid_generator.next(guid)
95         
96         # Instantiate RM
97         rm = ResourceFactory.create(rtype, self, guid)
98
99         # Store RM
100         self._resources[guid] = rm
101
102         return guid
103
104     def get_attributes(self, guid):
105         rm = self.get_resource(guid)
106         return rm.get_attributes()
107
108     def get_filters(self, guid):
109         rm = self.get_resource(guid)
110         return rm.get_filters()
111
112     def register_connection(self, guid1, guid2):
113         rm1 = self.get_resource(guid1)
114         rm2 = self.get_resource(guid2)
115
116         rm1.connect(guid2)
117         rm2.connect(guid1)
118
119     def register_condition(self, group1, action, group2, state,
120             time = None):
121         """ Registers an action START or STOP for all RM on group1 to occur 
122             time 'time' after all elements in group2 reached state 'state'.
123
124             :param group1: List of guids of RMs subjected to action
125             :type group1: list
126
127             :param action: Action to register (either START or STOP)
128             :type action: ResourceAction
129
130             :param group2: List of guids of RMs to we waited for
131             :type group2: list
132
133             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
134             :type state: ResourceState
135
136             :param time: Time to wait after group2 has reached status 
137             :type time: string
138
139         """
140         if isinstance(group1, int):
141             group1 = [group1]
142         if isinstance(group2, int):
143             group2 = [group2]
144
145         for guid1 in group1:
146             rm = self.get_resource(guid1)
147             rm.register_condition(action, group2, state, time)
148
149     def register_trace(self, guid, name):
150         """ Enable trace
151
152         :param name: Name of the trace
153         :type name: str
154         """
155         rm = self.get_resource(guid)
156         rm.register_trace(name)
157
158     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
159         """ Get information on collected trace
160
161         :param name: Name of the trace
162         :type name: str
163
164         :param attr: Can be one of:
165                          - TraceAttr.ALL (complete trace content), 
166                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
167                          - TraceAttr.PATH (full path to the trace file),
168                          - TraceAttr.SIZE (size of trace file). 
169         :type attr: str
170
171         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
172         :type name: int
173
174         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
175         :type name: int
176
177         :rtype: str
178         """
179         rm = self.get_resource(guid)
180         return rm.trace(name, attr, block, offset)
181
182     def discover(self, guid, filters):
183         rm = self.get_resource(guid)
184         return rm.discover(filters)
185
186     def provision(self, guid, filters):
187         rm = self.get_resource(guid)
188         return rm.provision(filters)
189
190     def get(self, guid, name):
191         rm = self.get_resource(guid)
192         return rm.get(name)
193
194     def set(self, guid, name, value):
195         rm = self.get_resource(guid)
196         return rm.set(name, value)
197
198     def state(self, guid):
199         rm = self.get_resource(guid)
200         return rm.state
201
202     def stop(self, guid):
203         rm = self.get_resource(guid)
204         return rm.stop()
205
206     def start(self, guid):
207         rm = self.get_resource(guid)
208         return rm.start()
209
210     def set_with_conditions(self, name, value, group1, group2, state,
211             time = None):
212         """ Set value 'value' on attribute with name 'name' on all RMs of
213             group1 when 'time' has elapsed since all elements in group2 
214             have reached state 'state'.
215
216             :param name: Name of attribute to set in RM
217             :type name: string
218
219             :param value: Value of attribute to set in RM
220             :type name: string
221
222             :param group1: List of guids of RMs subjected to action
223             :type group1: list
224
225             :param action: Action to register (either START or STOP)
226             :type action: ResourceAction
227
228             :param group2: List of guids of RMs to we waited for
229             :type group2: list
230
231             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
232             :type state: ResourceState
233
234             :param time: Time to wait after group2 has reached status 
235             :type time: string
236
237         """
238         if isinstance(group1, int):
239             group1 = [group1]
240         if isinstance(group2, int):
241             group2 = [group2]
242
243         for guid1 in group1:
244             rm = self.get_resource(guid)
245             rm.set_with_conditions(name, value, group2, state, time)
246
247     def stop_with_conditions(self, guid):
248         rm = self.get_resource(guid)
249         return rm.stop_with_conditions()
250
251     def start_with_conditions(self, guid):
252         rm = self.get_resource(guid)
253         return rm.start_with_condition()
254
255     def deploy(self, group = None, wait_all_ready = True):
256         """ Deploy all resource manager in group
257
258         :param group: List of guids of RMs to deploy
259         :type group: list
260
261         :param wait_all_ready: Wait until all RMs are ready in
262             order to start the RMs
263         :type guid: int
264
265         """
266         self.logger.debug(" ------- DEPLOY START ------ ")
267
268         def steps(rm):
269             rm.deploy()
270             rm.start_with_conditions()
271
272             # Only if the RM has STOP consitions we
273             # schedule a stop. Otherwise the RM will stop immediately
274             if rm.conditions.get(ResourceAction.STOP):
275                 rm.stop_with_conditions()
276
277         if not group:
278             group = self.resources
279
280         threads = []
281         for guid in group:
282             rm = self.get_resource(guid)
283
284             if wait_all_ready:
285                 towait = list(group)
286                 towait.remove(guid)
287                 self.register_condition(guid, ResourceAction.START, 
288                         towait, ResourceState.READY)
289
290             thread = threading.Thread(target = steps, args = (rm,))
291             threads.append(thread)
292             thread.setDaemon(True)
293             thread.start()
294
295         while list(threads) and not self.finished:
296             thread = threads[0]
297             # Time out after 5 seconds to check EC not terminated
298             thread.join(5)
299             if not thread.is_alive():
300                 threads.remove(thread)
301
302     def release(self, group = None):
303         if not group:
304             group = self.resources
305
306         threads = []
307         for guid in group:
308             rm = self.get_resource(guid)
309             thread = threading.Thread(target=rm.release)
310             threads.append(thread)
311             thread.setDaemon(True)
312             thread.start()
313
314         while list(threads) and not self.finished:
315             thread = threads[0]
316             # Time out after 5 seconds to check EC not terminated
317             thread.join(5)
318             if not thread.is_alive():
319                 threads.remove(thread)
320         
321         self._state = ECState.TERMINATED
322
323     def shutdown(self):
324         self.release()
325         
326         self._cond.acquire()
327         self._cond.notify()
328         self._cond.release()
329
330         if self._thread.is_alive():
331            self._thread.join()
332
333     def schedule(self, date, callback, track = False):
334         """ Schedule a callback to be executed at time date.
335
336             date    string containing execution time for the task.
337                     It can be expressed as an absolute time, using
338                     timestamp format, or as a relative time matching
339                     ^\d+.\d+(h|m|s|ms|us)$
340
341             callback    code to be executed for the task. Must be a
342                         Python function, and receives args and kwargs
343                         as arguments.
344
345             track   if set to True, the task will be retrivable with
346                     the get_task() method
347         """
348         timestamp = strfvalid(date)
349         
350         task = Task(timestamp, callback)
351         task = self._scheduler.schedule(task)
352
353         if track:
354             self._tasks[task.id] = task
355   
356         # Notify condition to wake up the processing thread
357         self._cond.acquire()
358         self._cond.notify()
359         self._cond.release()
360
361         return task.id
362      
363     def _process(self):
364         runner = ParallelRun(maxthreads = 50)
365         runner.start()
366
367         try:
368             while not self.finished:
369                 self._cond.acquire()
370                 task = self._scheduler.next()
371                 self._cond.release()
372
373                 if not task:
374                     # It there are not tasks in the tasks queue we need to 
375                     # wait until a call to schedule wakes us up
376                     self._cond.acquire()
377                     self._cond.wait()
378                     self._cond.release()
379                 else: 
380                     # If the task timestamp is in the future the thread needs to wait
381                     # until time elapse or until another task is scheduled
382                     now = strfnow()
383                     if now < task.timestamp:
384                         # Calculate time difference in seconds
385                         timeout = strfdiff(task.timestamp, now)
386                         # Re-schedule task with the same timestamp
387                         self._scheduler.schedule(task)
388                         # Sleep until timeout or until a new task awakes the condition
389                         self._cond.acquire()
390                         self._cond.wait(timeout)
391                         self._cond.release()
392                     else:
393                         # Process tasks in parallel
394                         runner.put(self._execute, task)
395                 
396         except: 
397             import traceback
398             err = traceback.format_exc()
399             self._logger.error("Error while processing tasks in the EC: %s" % err)
400
401             self._state = ECState.FAILED
402             return
403    
404         # Mark EC state as terminated
405         if self.ecstate == ECState.RUNNING:
406             self._state = ECState.TERMINATED
407
408     def _execute(self, task):
409         # Invoke callback
410         task.status = TaskStatus.DONE
411
412         try:
413             task.result = task.callback()
414         except:
415             import traceback
416             err = traceback.format_exc()
417             task.result = err
418             task.status = TaskStatus.ERROR
419             
420             self._logger.error("Error occurred while executing task: %s" % err)
421
422             # Mark the EC as failed
423             self._state = ECState.FAILED
424
425             # Wake up the EC in case it was sleeping
426             self._cond.acquire()
427             self._cond.notify()
428             self._cond.release()
429
430             # Propage error to the ParallelRunner
431             raise
432