1d2533475932e9714b388c06e1bd1971e021fe09
[nepi.git] / src / neco / execution / ec.py
1 import logging
2 import os
3 import random
4 import sys
5 import time
6 import threading
7
8 from neco.util import guid
9 from neco.util.parallel import ParallelRun
10 from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
11 from neco.execution.resource import ResourceFactory, ResourceAction, \
12         ResourceState
13 from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
14 from neco.execution.trace import TraceAttr
15
16 # TODO: use multiprocessing instead of threading
17 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
18
19 class ECState(object):
20     RUNNING = 1
21     FAILED = 2
22     TERMINATED = 3
23
24 class ExperimentController(object):
25     def __init__(self, exp_id = None, root_dir = "/tmp"): 
26         super(ExperimentController, self).__init__()
27         # root directory to store files
28         self._root_dir = root_dir
29
30         # experiment identifier given by the user
31         self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
32
33         # generator of globally unique ids
34         self._guid_generator = guid.GuidGenerator()
35         
36         # Resource managers
37         self._resources = dict()
38
39         # Scheduler
40         self._scheduler = HeapScheduler()
41
42         # Tasks
43         self._tasks = dict()
44
45         # Event processing thread
46         self._cond = threading.Condition()
47         self._thread = threading.Thread(target = self._process)
48         self._thread.setDaemon(True)
49         self._thread.start()
50
51         # EC state
52         self._state = ECState.RUNNING
53
54         # Logging
55         self._logger = logging.getLogger("ExperimentController")
56
57     @property
58     def logger(self):
59         return self._logger
60
61     @property
62     def ecstate(self):
63         return self._state
64
65     @property
66     def exp_id(self):
67         exp_id = self._exp_id
68         if not exp_id.startswith("nepi-"):
69             exp_id = "nepi-" + exp_id
70         return exp_id
71
72     @property
73     def finished(self):
74         return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
75
76     def wait_finished(self, guids):
77         while not all([self.state(guid) == ResourceState.FINISHED \
78                 for guid in guids]) and not self.finished:
79             # We keep the sleep as large as possible to 
80             # decrese the number of RM state requests
81             time.sleep(2)
82     
83     def get_task(self, tid):
84         return self._tasks.get(tid)
85
86     def get_resource(self, guid):
87         return self._resources.get(guid)
88
89     @property
90     def resources(self):
91         return self._resources.keys()
92
93     def register_resource(self, rtype, guid = None):
94         # Get next available guid
95         guid = self._guid_generator.next(guid)
96         
97         # Instantiate RM
98         rm = ResourceFactory.create(rtype, self, guid)
99
100         # Store RM
101         self._resources[guid] = rm
102
103         return guid
104
105     def get_attributes(self, guid):
106         rm = self.get_resource(guid)
107         return rm.get_attributes()
108
109     def get_filters(self, guid):
110         rm = self.get_resource(guid)
111         return rm.get_filters()
112
113     def register_connection(self, guid1, guid2):
114         rm1 = self.get_resource(guid1)
115         rm2 = self.get_resource(guid2)
116
117         rm1.connect(guid2)
118         rm2.connect(guid1)
119
120     def register_condition(self, group1, action, group2, state,
121             time = None):
122         """ Registers an action START or STOP for all RM on group1 to occur 
123             time 'time' after all elements in group2 reached state 'state'.
124
125             :param group1: List of guids of RMs subjected to action
126             :type group1: list
127
128             :param action: Action to register (either START or STOP)
129             :type action: ResourceAction
130
131             :param group2: List of guids of RMs to we waited for
132             :type group2: list
133
134             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
135             :type state: ResourceState
136
137             :param time: Time to wait after group2 has reached status 
138             :type time: string
139
140         """
141         if isinstance(group1, int):
142             group1 = [group1]
143         if isinstance(group2, int):
144             group2 = [group2]
145
146         for guid1 in group1:
147             rm = self.get_resource(guid1)
148             rm.register_condition(action, group2, state, time)
149
150     def register_trace(self, guid, name):
151         """ Enable trace
152
153         :param name: Name of the trace
154         :type name: str
155         """
156         rm = self.get_resource(guid)
157         rm.register_trace(name)
158
159     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
160         """ Get information on collected trace
161
162         :param name: Name of the trace
163         :type name: str
164
165         :param attr: Can be one of:
166                          - TraceAttr.ALL (complete trace content), 
167                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
168                          - TraceAttr.PATH (full path to the trace file),
169                          - TraceAttr.SIZE (size of trace file). 
170         :type attr: str
171
172         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
173         :type name: int
174
175         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
176         :type name: int
177
178         :rtype: str
179         """
180         rm = self.get_resource(guid)
181         return rm.trace(name, attr, block, offset)
182
183     def discover(self, guid, filters):
184         rm = self.get_resource(guid)
185         return rm.discover(filters)
186
187     def provision(self, guid, filters):
188         rm = self.get_resource(guid)
189         return rm.provision(filters)
190
191     def get(self, guid, name):
192         rm = self.get_resource(guid)
193         return rm.get(name)
194
195     def set(self, guid, name, value):
196         rm = self.get_resource(guid)
197         return rm.set(name, value)
198
199     def state(self, guid):
200         rm = self.get_resource(guid)
201         return rm.state
202
203     def stop(self, guid):
204         rm = self.get_resource(guid)
205         return rm.stop()
206
207     def start(self, guid):
208         rm = self.get_resource(guid)
209         return rm.start()
210
211     def set_with_conditions(self, name, value, group1, group2, state,
212             time = None):
213         """ Set value 'value' on attribute with name 'name' on all RMs of
214             group1 when 'time' has elapsed since all elements in group2 
215             have reached state 'state'.
216
217             :param name: Name of attribute to set in RM
218             :type name: string
219
220             :param value: Value of attribute to set in RM
221             :type name: string
222
223             :param group1: List of guids of RMs subjected to action
224             :type group1: list
225
226             :param action: Action to register (either START or STOP)
227             :type action: ResourceAction
228
229             :param group2: List of guids of RMs to we waited for
230             :type group2: list
231
232             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
233             :type state: ResourceState
234
235             :param time: Time to wait after group2 has reached status 
236             :type time: string
237
238         """
239         if isinstance(group1, int):
240             group1 = [group1]
241         if isinstance(group2, int):
242             group2 = [group2]
243
244         for guid1 in group1:
245             rm = self.get_resource(guid)
246             rm.set_with_conditions(name, value, group2, state, time)
247
248     def stop_with_conditions(self, guid):
249         rm = self.get_resource(guid)
250         return rm.stop_with_conditions()
251
252     def start_with_conditions(self, guid):
253         rm = self.get_resource(guid)
254         return rm.start_with_condition()
255
256     def deploy(self, group = None, wait_all_ready = True):
257         """ Deploy all resource manager in group
258
259         :param group: List of guids of RMs to deploy
260         :type group: list
261
262         :param wait_all_ready: Wait until all RMs are ready in
263             order to start the RMs
264         :type guid: int
265
266         """
267         self.logger.debug(" ------- DEPLOY START ------ ")
268
269         stop = []
270
271         def steps(rm):
272             try:
273                 rm.deploy()
274                 rm.start_with_conditions()
275
276                 # Only if the RM has STOP consitions we
277                 # schedule a stop. Otherwise the RM will stop immediately
278                 if rm.conditions.get(ResourceAction.STOP):
279                     rm.stop_with_conditions()
280             except:
281                 import traceback
282                 err = traceback.format_exc()
283                 
284                 self._logger.error("Error occurred while deploying resources: %s" % err)
285
286                 # stop deployment
287                 stop.append(None)
288
289         if not group:
290             group = self.resources
291
292         # Before starting deployment we disorder the group list with the
293         # purpose of speeding up the whole deployment process.
294         # It is likely that the user inserted in the 'group' list closely
295         # resources resources one after another (e.g. all applications
296         # connected to the same node can likely appear one after another).
297         # This can originate a slow down in the deployment since the N 
298         # threads the parallel runner uses to processes tasks may all
299         # be taken up by the same family of resources waiting for the 
300         # same conditions. 
301         # If we disorder the group list, this problem can be mitigated
302         random.shuffle(group)
303
304         threads = []
305         for guid in group:
306             rm = self.get_resource(guid)
307
308             if wait_all_ready:
309                 towait = list(group)
310                 towait.remove(guid)
311                 self.register_condition(guid, ResourceAction.START, 
312                         towait, ResourceState.READY)
313
314             thread = threading.Thread(target = steps, args = (rm,))
315             threads.append(thread)
316             thread.setDaemon(True)
317             thread.start()
318
319         while list(threads) and not self.finished and not stop:
320             thread = threads[0]
321             # Time out after 5 seconds to check EC not terminated
322             thread.join(1)
323             if not thread.is_alive():
324                 threads.remove(thread)
325
326         if stop:
327             # stop the scheduler
328             self._stop_scheduler()
329
330             if self._thread.is_alive():
331                self._thread.join()
332
333             raise RuntimeError, "Error occurred, interrupting deployment " 
334
335     def release(self, group = None):
336         if not group:
337             group = self.resources
338
339         threads = []
340         for guid in group:
341             rm = self.get_resource(guid)
342             thread = threading.Thread(target=rm.release)
343             threads.append(thread)
344             thread.setDaemon(True)
345             thread.start()
346
347         while list(threads) and not self.finished:
348             thread = threads[0]
349             # Time out after 5 seconds to check EC not terminated
350             thread.join(5)
351             if not thread.is_alive():
352                 threads.remove(thread)
353
354     def shutdown(self):
355         self.release()
356
357         self._stop_scheduler()
358         
359         if self._thread.is_alive():
360            self._thread.join()
361
362     def schedule(self, date, callback, track = False):
363         """ Schedule a callback to be executed at time date.
364
365             date    string containing execution time for the task.
366                     It can be expressed as an absolute time, using
367                     timestamp format, or as a relative time matching
368                     ^\d+.\d+(h|m|s|ms|us)$
369
370             callback    code to be executed for the task. Must be a
371                         Python function, and receives args and kwargs
372                         as arguments.
373
374             track   if set to True, the task will be retrivable with
375                     the get_task() method
376         """
377         timestamp = strfvalid(date)
378         
379         task = Task(timestamp, callback)
380         task = self._scheduler.schedule(task)
381
382         if track:
383             self._tasks[task.id] = task
384   
385         # Notify condition to wake up the processing thread
386         self._cond.acquire()
387         self._cond.notify()
388         self._cond.release()
389
390         return task.id
391      
392     def _process(self):
393         runner = ParallelRun(maxthreads = 50)
394         runner.start()
395
396         try:
397             while not self.finished:
398                 self._cond.acquire()
399                 task = self._scheduler.next()
400                 self._cond.release()
401
402                 if not task:
403                     # It there are not tasks in the tasks queue we need to 
404                     # wait until a call to schedule wakes us up
405                     self._cond.acquire()
406                     self._cond.wait()
407                     self._cond.release()
408                 else: 
409                     # If the task timestamp is in the future the thread needs to wait
410                     # until time elapse or until another task is scheduled
411                     now = strfnow()
412                     if now < task.timestamp:
413                         # Calculate time difference in seconds
414                         timeout = strfdiff(task.timestamp, now)
415                         # Re-schedule task with the same timestamp
416                         self._scheduler.schedule(task)
417                         # Sleep until timeout or until a new task awakes the condition
418                         self._cond.acquire()
419                         self._cond.wait(timeout)
420                         self._cond.release()
421                     else:
422                         # Process tasks in parallel
423                         runner.put(self._execute, task)
424                 
425         except: 
426             import traceback
427             err = traceback.format_exc()
428             self._logger.error("Error while processing tasks in the EC: %s" % err)
429
430             self._state = ECState.FAILED
431         finally:
432             runner.sync()
433    
434         # Mark EC state as terminated
435         if self.ecstate == ECState.RUNNING:
436             self._state = ECState.TERMINATED
437
438     def _execute(self, task):
439         # Invoke callback
440         task.status = TaskStatus.DONE
441
442         try:
443             task.result = task.callback()
444         except:
445             import traceback
446             err = traceback.format_exc()
447             task.result = err
448             task.status = TaskStatus.ERROR
449             
450             self._logger.error("Error occurred while executing task: %s" % err)
451
452             self._stop_scheduler()
453
454             # Propage error to the ParallelRunner
455             raise
456
457     def _stop_scheduler(self):
458         # Mark the EC as failed
459         self._state = ECState.FAILED
460
461         # Wake up the EC in case it was sleeping
462         self._cond.acquire()
463         self._cond.notify()
464         self._cond.release()
465
466