1e55cc04619a59e191d350be34f7e00aea92d46d
[nepi.git] / src / neco / execution / ec.py
1 import logging
2 import os
3 import sys
4 import time
5 import threading
6
7 from neco.util import guid
8 from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
9 from neco.execution.resource import ResourceFactory, ResourceAction, \
10         ResourceState
11 from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
12 from neco.util.parallel import ParallelRun
13
14 # TODO: use multiprocessing instead of threading
15
16 class ExperimentController(object):
17     def __init__(self, root_dir = "/tmp"): 
18         super(ExperimentController, self).__init__()
19         # root directory to store files
20         self._root_dir = root_dir
21
22         # generator of globally unique ids
23         self._guid_generator = guid.GuidGenerator()
24         
25         # Resource managers
26         self._resources = dict()
27
28         # Resource managers
29         self._group = dict()
30
31         # Scheduler
32         self._scheduler = HeapScheduler()
33
34         # Tasks
35         self._tasks = dict()
36
37         # Event processing thread
38         self._stop = False
39         self._cond = threading.Condition()
40         self._thread = threading.Thread(target = self._process)
41         self._thread.start()
42
43         # Logging
44         self._logger = logging.getLogger("neco.execution.ec")
45
46     @property
47     def logger(self):
48         return self._logger
49
50
51     def get_task(self, tid):
52         return self._tasks.get(tid)
53
54     def get_resource(self, guid):
55         return self._resources.get(guid)
56
57     @property
58     def resources(self):
59         return self._resources.keys()
60
61     def register_resource(self, rtype, guid = None):
62         # Get next available guid
63         guid = self._guid_generator.next(guid)
64         
65         # Instantiate RM
66         rm = ResourceFactory.create(rtype, self, guid)
67
68         # Store RM
69         self._resources[guid] = rm
70
71         return guid
72
73     def register_group(self, group):
74         guid = self._guid_generator.next()
75
76         if not isinstance(group, list):
77             group = [group] 
78
79         self._groups[guid] = group
80
81         return guid
82
83     def get_attributes(self, guid):
84         rm = self.get_resource(guid)
85         return rm.get_attributes()
86
87     def get_filters(self, guid):
88         rm = self.get_resource(guid)
89         return rm.get_filters()
90
91     def register_connection(self, guid1, guid2):
92         rm1 = self.get_resource(guid1)
93         rm2 = self.get_resource(guid2)
94
95         rm1.connect(guid2)
96         rm2.connect(guid1)
97
98     def register_condition(self, group1, action, group2, state,
99             time = None):
100         """ Registers an action START or STOP for all RM on group1 to occur 
101             time 'time' after all elements in group2 reached state 'state'.
102
103             :param group1: List of guids of RMs subjected to action
104             :type group1: list
105
106             :param action: Action to register (either START or STOP)
107             :type action: ResourceAction
108
109             :param group2: List of guids of RMs to we waited for
110             :type group2: list
111
112             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
113             :type state: ResourceState
114
115             :param time: Time to wait after group2 has reached status 
116             :type time: string
117
118         """
119         if isinstance(group1, int):
120             group1 = [group1]
121         if isinstance(group2, int):
122             group2 = [group2]
123
124         for guid1 in group1:
125             rm = self.get_resource(guid1)
126             rm.register_condition(action, group2, state, time)
127
128     def discover(self, guid, filters):
129         rm = self.get_resource(guid)
130         return rm.discover(filters)
131
132     def provision(self, guid, filters):
133         rm = self.get_resource(guid)
134         return rm.provision(filters)
135
136     def get(self, guid, name):
137         rm = self.get_resource(guid)
138         return rm.get(name)
139
140     def set(self, guid, name, value):
141         rm = self.get_resource(guid)
142         return rm.set(name, value)
143
144     def state(self, guid):
145         rm = self.get_resource(guid)
146         return rm.state
147
148     def stop(self, guid):
149         rm = self.get_resource(guid)
150         return rm.stop()
151
152     def start(self, guid):
153         rm = self.get_resource(guid)
154         return rm.start()
155
156     def set_with_conditions(self, name, value, group1, group2, state,
157             time = None):
158         """ Set value 'value' on attribute with name 'name' on all RMs of
159             group1 when 'time' has elapsed since all elements in group2 
160             have reached state 'state'.
161
162             :param name: Name of attribute to set in RM
163             :type name: string
164
165             :param value: Value of attribute to set in RM
166             :type name: string
167
168             :param group1: List of guids of RMs subjected to action
169             :type group1: list
170
171             :param action: Action to register (either START or STOP)
172             :type action: ResourceAction
173
174             :param group2: List of guids of RMs to we waited for
175             :type group2: list
176
177             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
178             :type state: ResourceState
179
180             :param time: Time to wait after group2 has reached status 
181             :type time: string
182
183         """
184         if isinstance(group1, int):
185             group1 = [group1]
186         if isinstance(group2, int):
187             group2 = [group2]
188
189         for guid1 in group1:
190             rm = self.get_resource(guid)
191             rm.set_with_conditions(name, value, group2, state, time)
192
193     def stop_with_conditions(self, guid):
194         rm = self.get_resource(guid)
195         return rm.stop_with_conditions()
196
197     def start_with_conditions(self, guid):
198         rm = self.get_resource(guid)
199         return rm.start_with_condition()
200
201     def deploy(self, group = None, wait_all_deployed = True):
202         """ Deploy all resource manager in group
203
204         :param group: List of guids of RMs to deploy
205         :type group: list
206
207         :param wait_all_deployed: Wait until all RMs are deployed in
208             order to start the RMs
209         :type guid: int
210
211         """
212         self.logger.debug(" ------- DEPLOY START ------ ")
213
214         def steps(rm):
215             rm.deploy()
216             rm.start_with_conditions()
217
218             # Only if the RM has STOP consitions we
219             # schedule a stop. Otherwise the RM will stop immediately
220             if rm.conditions.get(ResourceAction.STOP):
221                 rm.stop_with_conditions()
222
223         if not group:
224             group = self.resources
225
226         threads = []
227         for guid in group:
228             rm = self.get_resource(guid)
229
230             if wait_all_deployed:
231                 towait = list(group)
232                 towait.remove(guid)
233                 self.register_condition(guid, ResourceAction.START, 
234                         towait, ResourceState.READY)
235
236             thread = threading.Thread(target = steps, args = (rm,))
237             threads.append(thread)
238             thread.start()
239
240         for thread in threads:
241             thread.join()
242
243     def release(self, group = None):
244         if not group:
245             group = self.resources
246
247         threads = []
248         for guid in group:
249             rm = self.get_resource(guid)
250             thread = threading.Thread(target=rm.release)
251             threads.append(thread)
252             thread.start()
253
254         for thread in threads:
255             thread.join()
256
257     def shutdown(self):
258         self.release()
259         
260         self._stop = True
261         self._cond.acquire()
262         self._cond.notify()
263         self._cond.release()
264         if self._thread.is_alive():
265            self._thread.join()
266
267     def schedule(self, date, callback, track = False):
268         """ Schedule a callback to be executed at time date.
269
270             date    string containing execution time for the task.
271                     It can be expressed as an absolute time, using
272                     timestamp format, or as a relative time matching
273                     ^\d+.\d+(h|m|s|ms|us)$
274
275             callback    code to be executed for the task. Must be a
276                         Python function, and receives args and kwargs
277                         as arguments.
278
279             track   if set to True, the task will be retrivable with
280                     the get_task() method
281         """
282         timestamp = strfvalid(date)
283         
284         task = Task(timestamp, callback)
285         task = self._scheduler.schedule(task)
286
287         if track:
288             self._tasks[task.id] = task
289   
290         # Notify condition to wake up the processing thread
291         self._cond.acquire()
292         self._cond.notify()
293         self._cond.release()
294
295         return task.id
296      
297     def _process(self):
298         runner = ParallelRun(maxthreads = 50)
299         runner.start()
300
301         try:
302             while not self._stop:
303                 self._cond.acquire()
304                 task = self._scheduler.next()
305                 self._cond.release()
306
307                 if not task:
308                     # It there are not tasks in the tasks queue we need to 
309                     # wait until a call to schedule wakes us up
310                     self._cond.acquire()
311                     self._cond.wait()
312                     self._cond.release()
313                 else: 
314                     # If the task timestamp is in the future the thread needs to wait
315                     # until time elapse or until another task is scheduled
316                     now = strfnow()
317                     if now < task.timestamp:
318                         # Calculate time difference in seconds
319                         timeout = strfdiff(task.timestamp, now)
320                         # Re-schedule task with the same timestamp
321                         self._scheduler.schedule(task)
322                         # Sleep until timeout or until a new task awakes the condition
323                         self._cond.acquire()
324                         self._cond.wait(timeout)
325                         self._cond.release()
326                     else:
327                         # Process tasks in parallel
328                         runner.put(self._execute, task)
329         except:  
330             import traceback
331             err = traceback.format_exc()
332             self._logger.error("Error while processing tasks in the EC: %s" % err)
333
334     def _execute(self, task):
335         # Invoke callback
336         task.status = TaskStatus.DONE
337
338         try:
339             task.result = task.callback()
340         except:
341             import traceback
342             err = traceback.format_exc()
343             self._logger.error("Error while executing event: %s" % err)
344
345             task.result = err
346             task.status = TaskStatus.ERROR
347