ba064d490c3ed237217117eb9649772948162742
[nepi.git] / src / neco / execution / ec.py
1 import logging
2 import os
3 import sys
4 import time
5 import threading
6
7 from neco.util import guid
8 from neco.util.parallel import ParallelRun
9 from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
10 from neco.execution.resource import ResourceFactory, ResourceAction, \
11         ResourceState
12 from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
13 from neco.execution.trace import TraceAttr
14
15 # TODO: use multiprocessing instead of threading
16 # TODO: Improve speed. Too slow... !!
17
18 class ECState(object):
19     RUNNING = 1
20     FAILED = 2
21     TERMINATED = 3
22
23 class ExperimentController(object):
24     def __init__(self, exp_id = None, root_dir = "/tmp"): 
25         super(ExperimentController, self).__init__()
26         # root directory to store files
27         self._root_dir = root_dir
28
29         # experiment identifier given by the user
30         self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
31
32         # generator of globally unique ids
33         self._guid_generator = guid.GuidGenerator()
34         
35         # Resource managers
36         self._resources = dict()
37
38         # Scheduler
39         self._scheduler = HeapScheduler()
40
41         # Tasks
42         self._tasks = dict()
43
44         # Event processing thread
45         self._cond = threading.Condition()
46         self._thread = threading.Thread(target = self._process)
47         self._thread.setDaemon(True)
48         self._thread.start()
49
50         # EC state
51         self._state = ECState.RUNNING
52
53         # Logging
54         self._logger = logging.getLogger("ExperimentController")
55
56     @property
57     def logger(self):
58         return self._logger
59
60     @property
61     def ecstate(self):
62         return self._state
63
64     @property
65     def exp_id(self):
66         exp_id = self._exp_id
67         if not exp_id.startswith("nepi-"):
68             exp_id = "nepi-" + exp_id
69         return exp_id
70
71     @property
72     def finished(self):
73         return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
74
75     def wait_finished(self, guids):
76         while not all([self.state(guid) == ResourceState.FINISHED \
77                 for guid in guids]) and not self.finished:
78             time.sleep(1)
79     
80     def get_task(self, tid):
81         return self._tasks.get(tid)
82
83     def get_resource(self, guid):
84         return self._resources.get(guid)
85
86     @property
87     def resources(self):
88         return self._resources.keys()
89
90     def register_resource(self, rtype, guid = None):
91         # Get next available guid
92         guid = self._guid_generator.next(guid)
93         
94         # Instantiate RM
95         rm = ResourceFactory.create(rtype, self, guid)
96
97         # Store RM
98         self._resources[guid] = rm
99
100         return guid
101
102     def get_attributes(self, guid):
103         rm = self.get_resource(guid)
104         return rm.get_attributes()
105
106     def get_filters(self, guid):
107         rm = self.get_resource(guid)
108         return rm.get_filters()
109
110     def register_connection(self, guid1, guid2):
111         rm1 = self.get_resource(guid1)
112         rm2 = self.get_resource(guid2)
113
114         rm1.connect(guid2)
115         rm2.connect(guid1)
116
117     def register_condition(self, group1, action, group2, state,
118             time = None):
119         """ Registers an action START or STOP for all RM on group1 to occur 
120             time 'time' after all elements in group2 reached state 'state'.
121
122             :param group1: List of guids of RMs subjected to action
123             :type group1: list
124
125             :param action: Action to register (either START or STOP)
126             :type action: ResourceAction
127
128             :param group2: List of guids of RMs to we waited for
129             :type group2: list
130
131             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
132             :type state: ResourceState
133
134             :param time: Time to wait after group2 has reached status 
135             :type time: string
136
137         """
138         if isinstance(group1, int):
139             group1 = [group1]
140         if isinstance(group2, int):
141             group2 = [group2]
142
143         for guid1 in group1:
144             rm = self.get_resource(guid1)
145             rm.register_condition(action, group2, state, time)
146
147     def register_trace(self, guid, name):
148         """ Enable trace
149
150         :param name: Name of the trace
151         :type name: str
152         """
153         rm = self.get_resource(guid)
154         rm.register_trace(name)
155
156     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
157         """ Get information on collected trace
158
159         :param name: Name of the trace
160         :type name: str
161
162         :param attr: Can be one of:
163                          - TraceAttr.ALL (complete trace content), 
164                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
165                          - TraceAttr.PATH (full path to the trace file),
166                          - TraceAttr.SIZE (size of trace file). 
167         :type attr: str
168
169         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
170         :type name: int
171
172         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
173         :type name: int
174
175         :rtype: str
176         """
177         rm = self.get_resource(guid)
178         return rm.trace(name, attr, block, offset)
179
180     def discover(self, guid, filters):
181         rm = self.get_resource(guid)
182         return rm.discover(filters)
183
184     def provision(self, guid, filters):
185         rm = self.get_resource(guid)
186         return rm.provision(filters)
187
188     def get(self, guid, name):
189         rm = self.get_resource(guid)
190         return rm.get(name)
191
192     def set(self, guid, name, value):
193         rm = self.get_resource(guid)
194         return rm.set(name, value)
195
196     def state(self, guid):
197         rm = self.get_resource(guid)
198         return rm.state
199
200     def stop(self, guid):
201         rm = self.get_resource(guid)
202         return rm.stop()
203
204     def start(self, guid):
205         rm = self.get_resource(guid)
206         return rm.start()
207
208     def set_with_conditions(self, name, value, group1, group2, state,
209             time = None):
210         """ Set value 'value' on attribute with name 'name' on all RMs of
211             group1 when 'time' has elapsed since all elements in group2 
212             have reached state 'state'.
213
214             :param name: Name of attribute to set in RM
215             :type name: string
216
217             :param value: Value of attribute to set in RM
218             :type name: string
219
220             :param group1: List of guids of RMs subjected to action
221             :type group1: list
222
223             :param action: Action to register (either START or STOP)
224             :type action: ResourceAction
225
226             :param group2: List of guids of RMs to we waited for
227             :type group2: list
228
229             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
230             :type state: ResourceState
231
232             :param time: Time to wait after group2 has reached status 
233             :type time: string
234
235         """
236         if isinstance(group1, int):
237             group1 = [group1]
238         if isinstance(group2, int):
239             group2 = [group2]
240
241         for guid1 in group1:
242             rm = self.get_resource(guid)
243             rm.set_with_conditions(name, value, group2, state, time)
244
245     def stop_with_conditions(self, guid):
246         rm = self.get_resource(guid)
247         return rm.stop_with_conditions()
248
249     def start_with_conditions(self, guid):
250         rm = self.get_resource(guid)
251         return rm.start_with_condition()
252
253     def deploy(self, group = None, wait_all_ready = True):
254         """ Deploy all resource manager in group
255
256         :param group: List of guids of RMs to deploy
257         :type group: list
258
259         :param wait_all_ready: Wait until all RMs are ready in
260             order to start the RMs
261         :type guid: int
262
263         """
264         self.logger.debug(" ------- DEPLOY START ------ ")
265
266         def steps(rm):
267             rm.deploy()
268             rm.start_with_conditions()
269
270             # Only if the RM has STOP consitions we
271             # schedule a stop. Otherwise the RM will stop immediately
272             if rm.conditions.get(ResourceAction.STOP):
273                 rm.stop_with_conditions()
274
275         if not group:
276             group = self.resources
277
278         threads = []
279         for guid in group:
280             rm = self.get_resource(guid)
281
282             if wait_all_ready:
283                 towait = list(group)
284                 towait.remove(guid)
285                 self.register_condition(guid, ResourceAction.START, 
286                         towait, ResourceState.READY)
287
288             thread = threading.Thread(target = steps, args = (rm,))
289             threads.append(thread)
290             thread.setDaemon(True)
291             thread.start()
292
293         while list(threads) and not self.finished:
294             thread = threads[0]
295             # Time out after 5 seconds to check EC not terminated
296             thread.join(5)
297             if not thread.is_alive():
298                 threads.remove(thread)
299
300     def release(self, group = None):
301         if not group:
302             group = self.resources
303
304         threads = []
305         for guid in group:
306             rm = self.get_resource(guid)
307             thread = threading.Thread(target=rm.release)
308             threads.append(thread)
309             thread.setDaemon(True)
310             thread.start()
311
312         while list(threads) and not self.finished:
313             thread = threads[0]
314             # Time out after 5 seconds to check EC not terminated
315             thread.join(5)
316             if not thread.is_alive():
317                 threads.remove(thread)
318         
319         self._state = ECState.TERMINATED
320
321     def shutdown(self):
322         self.release()
323         
324         self._cond.acquire()
325         self._cond.notify()
326         self._cond.release()
327
328         if self._thread.is_alive():
329            self._thread.join()
330
331     def schedule(self, date, callback, track = False):
332         """ Schedule a callback to be executed at time date.
333
334             date    string containing execution time for the task.
335                     It can be expressed as an absolute time, using
336                     timestamp format, or as a relative time matching
337                     ^\d+.\d+(h|m|s|ms|us)$
338
339             callback    code to be executed for the task. Must be a
340                         Python function, and receives args and kwargs
341                         as arguments.
342
343             track   if set to True, the task will be retrivable with
344                     the get_task() method
345         """
346         timestamp = strfvalid(date)
347         
348         task = Task(timestamp, callback)
349         task = self._scheduler.schedule(task)
350
351         if track:
352             self._tasks[task.id] = task
353   
354         # Notify condition to wake up the processing thread
355         self._cond.acquire()
356         self._cond.notify()
357         self._cond.release()
358
359         return task.id
360      
361     def _process(self):
362         runner = ParallelRun(maxthreads = 50)
363         runner.start()
364
365         try:
366             while not self.finished:
367                 self._cond.acquire()
368                 task = self._scheduler.next()
369                 self._cond.release()
370
371                 if not task:
372                     # It there are not tasks in the tasks queue we need to 
373                     # wait until a call to schedule wakes us up
374                     self._cond.acquire()
375                     self._cond.wait()
376                     self._cond.release()
377                 else: 
378                     # If the task timestamp is in the future the thread needs to wait
379                     # until time elapse or until another task is scheduled
380                     now = strfnow()
381                     if now < task.timestamp:
382                         # Calculate time difference in seconds
383                         timeout = strfdiff(task.timestamp, now)
384                         # Re-schedule task with the same timestamp
385                         self._scheduler.schedule(task)
386                         # Sleep until timeout or until a new task awakes the condition
387                         self._cond.acquire()
388                         self._cond.wait(timeout)
389                         self._cond.release()
390                     else:
391                         # Process tasks in parallel
392                         runner.put(self._execute, task)
393                 
394         except: 
395             import traceback
396             err = traceback.format_exc()
397             self._logger.error("Error while processing tasks in the EC: %s" % err)
398
399             self._state = ECState.FAILED
400             return
401    
402         # Mark EC state as terminated
403         if self.ecstate == ECState.RUNNING:
404             self._state = ECState.TERMINATED
405
406     def _execute(self, task):
407         # Invoke callback
408         task.status = TaskStatus.DONE
409
410         try:
411             task.result = task.callback()
412         except:
413             import traceback
414             err = traceback.format_exc()
415             task.result = err
416             task.status = TaskStatus.ERROR
417             
418             self._logger.error("Error occurred while executing task: %s" % err)
419
420             # Mark the EC as failed
421             self._state = ECState.FAILED
422
423             # Wake up the EC in case it was sleeping
424             self._cond.acquire()
425             self._cond.notify()
426             self._cond.release()
427
428             # Propage error to the ParallelRunner
429             raise
430