07a4ee6d3539429a036e1137f50d6c389b8e6c20
[nepi.git] / src / neco / execution / ec.py
1 import logging
2 import os
3 import sys
4 import time
5 import threading
6
7 from neco.util import guid
8 from neco.util.parallel import ParallelRun
9 from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
10 from neco.execution.resource import ResourceFactory, ResourceAction, \
11         ResourceState
12 from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
13 from neco.execution.trace import TraceAttr
14
15 # TODO: use multiprocessing instead of threading
16
17 class ExperimentController(object):
18     def __init__(self, exp_id = None, root_dir = "/tmp"): 
19         super(ExperimentController, self).__init__()
20         # root directory to store files
21         self._root_dir = root_dir
22
23         # experiment identifier given by the user
24         self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
25
26         # generator of globally unique ids
27         self._guid_generator = guid.GuidGenerator()
28         
29         # Resource managers
30         self._resources = dict()
31
32         # Resource managers
33         self._group = dict()
34
35         # Scheduler
36         self._scheduler = HeapScheduler()
37
38         # Tasks
39         self._tasks = dict()
40
41         # Event processing thread
42         self._stop = False
43         self._cond = threading.Condition()
44         self._thread = threading.Thread(target = self._process)
45         self._thread.start()
46
47         # Logging
48         self._logger = logging.getLogger("neco.execution.ec")
49
50     @property
51     def logger(self):
52         return self._logger
53
54     @property
55     def exp_id(self):
56         exp_id = self._exp_id
57         if not exp_id.startswith("nepi-"):
58             exp_id = "nepi-" + exp_id
59         return exp_id
60
61     def get_task(self, tid):
62         return self._tasks.get(tid)
63
64     def get_resource(self, guid):
65         return self._resources.get(guid)
66
67     @property
68     def resources(self):
69         return self._resources.keys()
70
71     def register_resource(self, rtype, guid = None):
72         # Get next available guid
73         guid = self._guid_generator.next(guid)
74         
75         # Instantiate RM
76         rm = ResourceFactory.create(rtype, self, guid)
77
78         # Store RM
79         self._resources[guid] = rm
80
81         return guid
82
83     def register_group(self, group):
84         guid = self._guid_generator.next()
85
86         if not isinstance(group, list):
87             group = [group] 
88
89         self._groups[guid] = group
90
91         return guid
92
93     def get_attributes(self, guid):
94         rm = self.get_resource(guid)
95         return rm.get_attributes()
96
97     def get_filters(self, guid):
98         rm = self.get_resource(guid)
99         return rm.get_filters()
100
101     def register_connection(self, guid1, guid2):
102         rm1 = self.get_resource(guid1)
103         rm2 = self.get_resource(guid2)
104
105         rm1.connect(guid2)
106         rm2.connect(guid1)
107
108     def register_condition(self, group1, action, group2, state,
109             time = None):
110         """ Registers an action START or STOP for all RM on group1 to occur 
111             time 'time' after all elements in group2 reached state 'state'.
112
113             :param group1: List of guids of RMs subjected to action
114             :type group1: list
115
116             :param action: Action to register (either START or STOP)
117             :type action: ResourceAction
118
119             :param group2: List of guids of RMs to we waited for
120             :type group2: list
121
122             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
123             :type state: ResourceState
124
125             :param time: Time to wait after group2 has reached status 
126             :type time: string
127
128         """
129         if isinstance(group1, int):
130             group1 = [group1]
131         if isinstance(group2, int):
132             group2 = [group2]
133
134         for guid1 in group1:
135             rm = self.get_resource(guid1)
136             rm.register_condition(action, group2, state, time)
137
138     def register_trace(self, guid, name):
139         """ Enable trace
140
141         :param name: Name of the trace
142         :type name: str
143         """
144         rm = self.get_resource(guid)
145         rm.register_trace(name)
146
147     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
148         """ Get information on collected trace
149
150         :param name: Name of the trace
151         :type name: str
152
153         :param attr: Can be one of:
154                          - TraceAttr.ALL (complete trace content), 
155                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
156                          - TraceAttr.PATH (full path to the trace file),
157                          - TraceAttr.SIZE (size of trace file). 
158         :type attr: str
159
160         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
161         :type name: int
162
163         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
164         :type name: int
165
166         :rtype: str
167         """
168         rm = self.get_resource(guid)
169         return rm.trace(name, attr, block, offset)
170
171     def discover(self, guid, filters):
172         rm = self.get_resource(guid)
173         return rm.discover(filters)
174
175     def provision(self, guid, filters):
176         rm = self.get_resource(guid)
177         return rm.provision(filters)
178
179     def get(self, guid, name):
180         rm = self.get_resource(guid)
181         return rm.get(name)
182
183     def set(self, guid, name, value):
184         rm = self.get_resource(guid)
185         return rm.set(name, value)
186
187     def state(self, guid):
188         rm = self.get_resource(guid)
189         return rm.state
190
191     def stop(self, guid):
192         rm = self.get_resource(guid)
193         return rm.stop()
194
195     def start(self, guid):
196         rm = self.get_resource(guid)
197         return rm.start()
198
199     def set_with_conditions(self, name, value, group1, group2, state,
200             time = None):
201         """ Set value 'value' on attribute with name 'name' on all RMs of
202             group1 when 'time' has elapsed since all elements in group2 
203             have reached state 'state'.
204
205             :param name: Name of attribute to set in RM
206             :type name: string
207
208             :param value: Value of attribute to set in RM
209             :type name: string
210
211             :param group1: List of guids of RMs subjected to action
212             :type group1: list
213
214             :param action: Action to register (either START or STOP)
215             :type action: ResourceAction
216
217             :param group2: List of guids of RMs to we waited for
218             :type group2: list
219
220             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
221             :type state: ResourceState
222
223             :param time: Time to wait after group2 has reached status 
224             :type time: string
225
226         """
227         if isinstance(group1, int):
228             group1 = [group1]
229         if isinstance(group2, int):
230             group2 = [group2]
231
232         for guid1 in group1:
233             rm = self.get_resource(guid)
234             rm.set_with_conditions(name, value, group2, state, time)
235
236     def stop_with_conditions(self, guid):
237         rm = self.get_resource(guid)
238         return rm.stop_with_conditions()
239
240     def start_with_conditions(self, guid):
241         rm = self.get_resource(guid)
242         return rm.start_with_condition()
243
244     def deploy(self, group = None, wait_all_ready = True):
245         """ Deploy all resource manager in group
246
247         :param group: List of guids of RMs to deploy
248         :type group: list
249
250         :param wait_all_ready: Wait until all RMs are ready in
251             order to start the RMs
252         :type guid: int
253
254         """
255         self.logger.debug(" ------- DEPLOY START ------ ")
256
257         def steps(rm):
258             rm.deploy()
259             rm.start_with_conditions()
260
261             # Only if the RM has STOP consitions we
262             # schedule a stop. Otherwise the RM will stop immediately
263             if rm.conditions.get(ResourceAction.STOP):
264                 rm.stop_with_conditions()
265
266         if not group:
267             group = self.resources
268
269         threads = []
270         for guid in group:
271             rm = self.get_resource(guid)
272
273             if wait_all_ready:
274                 towait = list(group)
275                 towait.remove(guid)
276                 self.register_condition(guid, ResourceAction.START, 
277                         towait, ResourceState.READY)
278
279             thread = threading.Thread(target = steps, args = (rm,))
280             threads.append(thread)
281             thread.start()
282
283         for thread in threads:
284             thread.join()
285
286     def release(self, group = None):
287         if not group:
288             group = self.resources
289
290         threads = []
291         for guid in group:
292             rm = self.get_resource(guid)
293             thread = threading.Thread(target=rm.release)
294             threads.append(thread)
295             thread.start()
296
297         for thread in threads:
298             thread.join()
299
300     def shutdown(self):
301         self.release()
302         
303         self._stop = True
304         self._cond.acquire()
305         self._cond.notify()
306         self._cond.release()
307         if self._thread.is_alive():
308            self._thread.join()
309
310     def schedule(self, date, callback, track = False):
311         """ Schedule a callback to be executed at time date.
312
313             date    string containing execution time for the task.
314                     It can be expressed as an absolute time, using
315                     timestamp format, or as a relative time matching
316                     ^\d+.\d+(h|m|s|ms|us)$
317
318             callback    code to be executed for the task. Must be a
319                         Python function, and receives args and kwargs
320                         as arguments.
321
322             track   if set to True, the task will be retrivable with
323                     the get_task() method
324         """
325         timestamp = strfvalid(date)
326         
327         task = Task(timestamp, callback)
328         task = self._scheduler.schedule(task)
329
330         if track:
331             self._tasks[task.id] = task
332   
333         # Notify condition to wake up the processing thread
334         self._cond.acquire()
335         self._cond.notify()
336         self._cond.release()
337
338         return task.id
339      
340     def _process(self):
341         runner = ParallelRun(maxthreads = 50)
342         runner.start()
343
344         try:
345             while not self._stop:
346                 self._cond.acquire()
347                 task = self._scheduler.next()
348                 self._cond.release()
349
350                 if not task:
351                     # It there are not tasks in the tasks queue we need to 
352                     # wait until a call to schedule wakes us up
353                     self._cond.acquire()
354                     self._cond.wait()
355                     self._cond.release()
356                 else: 
357                     # If the task timestamp is in the future the thread needs to wait
358                     # until time elapse or until another task is scheduled
359                     now = strfnow()
360                     if now < task.timestamp:
361                         # Calculate time difference in seconds
362                         timeout = strfdiff(task.timestamp, now)
363                         # Re-schedule task with the same timestamp
364                         self._scheduler.schedule(task)
365                         # Sleep until timeout or until a new task awakes the condition
366                         self._cond.acquire()
367                         self._cond.wait(timeout)
368                         self._cond.release()
369                     else:
370                         # Process tasks in parallel
371                         runner.put(self._execute, task)
372         except:  
373             import traceback
374             err = traceback.format_exc()
375             self._logger.error("Error while processing tasks in the EC: %s" % err)
376
377     def _execute(self, task):
378         # Invoke callback
379         task.status = TaskStatus.DONE
380
381         try:
382             task.result = task.callback()
383         except:
384             import traceback
385             err = traceback.format_exc()
386             self._logger.error("Error while executing event: %s" % err)
387
388             task.result = err
389             task.status = TaskStatus.ERROR
390