Adding start_with_condition, stop_with_condition and set_with_condition to the EC...
[nepi.git] / src / neco / execution / ec.py
1 import logging
2 import os
3 import sys
4 import time
5 import threading
6
7 from neco.util import guid
8 from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
9 from neco.execution.resource import ResourceFactory, ResourceAction, \
10         ResourceState
11 from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
12 from neco.util.parallel import ParallelRun
13
14 # TODO: use multiprocessing instead of threading
15
16 class ExperimentController(object):
17     def __init__(self, root_dir = "/tmp", loglevel = 'error'): 
18         super(ExperimentController, self).__init__()
19         # root directory to store files
20         self._root_dir = root_dir
21
22         # generator of globally unique ids
23         self._guid_generator = guid.GuidGenerator()
24         
25         # Resource managers
26         self._resources = dict()
27
28         # Scheduler
29         self._scheduler = HeapScheduler()
30
31         # Tasks
32         self._tasks = dict()
33
34         # Event processing thread
35         self._stop = False
36         self._cond = threading.Condition()
37         self._thread = threading.Thread(target = self._process)
38         self._thread.start()
39
40         # Logging
41         self._logger = logging.getLogger("neco.execution.ec")
42         self._logger.setLevel(getattr(logging, loglevel.upper()))
43
44     def get_task(self, tid):
45         return self._tasks.get(tid)
46
47     def get_resource(self, guid):
48         return self._resources.get(guid)
49
50     @property
51     def resources(self):
52         return self._resources.keys()
53
54     def register_resource(self, rtype, guid = None, creds = None):
55         # Get next available guid
56         guid = self._guid_generator.next(guid)
57         
58         # Instantiate RM
59         rm = ResourceFactory.create(rtype, self, guid, creds)
60
61         # Store RM
62         self._resources[guid] = rm
63
64         return guid
65
66     def get_attributes(self, guid):
67         rm = self.get_resource(guid)
68         return rm.get_attributes()
69
70     def get_filters(self, guid):
71         rm = self.get_resource(guid)
72         return rm.get_filters()
73
74     def register_connection(self, guid1, guid2):
75         rm1 = self.get_resource(guid1)
76         rm2 = self.get_resource(guid2)
77
78         rm1.connect(guid2)
79         rm2.connect(guid1)
80
81     def register_condition(self, group1, action, group2, state,
82             time = None):
83         """ Registers an action START or STOP for all RM on group1 to occur 
84             time 'time' after all elements in group2 reached state 'state'.
85
86             :param group1: List of guids of RMs subjected to action
87             :type group1: list
88
89             :param action: Action to register (either START or STOP)
90             :type action: ResourceAction
91
92             :param group2: List of guids of RMs to we waited for
93             :type group2: list
94
95             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
96             :type state: ResourceState
97
98             :param time: Time to wait after group2 has reached status 
99             :type time: string
100
101         """
102         if isinstance(group1, int):
103             group1 = list[group1]
104         if isinstance(group2, int):
105             group2 = list[group2]
106
107         for guid1 in group1:
108             rm = self.get_resource(guid)
109             rm.register_condition(action, group2, state, time)
110
111     def discover(self, guid, filters):
112         rm = self.get_resource(guid)
113         return rm.discover(filters)
114
115     def provision(self, guid, filters):
116         rm = self.get_resource(guid)
117         return rm.provision(filters)
118
119     def get(self, guid, name):
120         rm = self.get_resource(guid)
121         return rm.get(name)
122
123     def set(self, guid, name, value):
124         rm = self.get_resource(guid)
125         return rm.set(name, value)
126
127     def state(self, guid):
128         rm = self.get_resource(guid)
129         return rm.state
130
131     def stop(self, guid):
132         rm = self.get_resource(guid)
133         return rm.stop()
134
135     def start(self, guid):
136         rm = self.get_resource(guid)
137         return rm.start()
138
139     def set_with_conditions(self, name, value, group1, group2, state,
140             time = None):
141         """ Set value 'value' on attribute with name 'name' on all RMs of
142             group1 when 'time' has elapsed since all elements in group2 
143             have reached state 'state'.
144
145             :param name: Name of attribute to set in RM
146             :type name: string
147
148             :param value: Value of attribute to set in RM
149             :type name: string
150
151             :param group1: List of guids of RMs subjected to action
152             :type group1: list
153
154             :param action: Action to register (either START or STOP)
155             :type action: ResourceAction
156
157             :param group2: List of guids of RMs to we waited for
158             :type group2: list
159
160             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
161             :type state: ResourceState
162
163             :param time: Time to wait after group2 has reached status 
164             :type time: string
165
166         """
167         if isinstance(group1, int):
168             group1 = list[group1]
169         if isinstance(group2, int):
170             group2 = list[group2]
171
172         for guid1 in group1:
173             rm = self.get_resource(guid)
174             rm.set_with_conditions(name, value, group2, state, time)
175
176     def stop_with_conditions(self, guid):
177         rm = self.get_resource(guid)
178         return rm.stop_with_conditions()
179
180     def start_with_conditions(self, guid):
181         rm = self.get_resource(guid)
182         return rm.start_with_condition()
183
184     def deploy(self, group = None, wait_all_ready = True):
185         """ Deploy all resource manager in group
186
187         :param group: List of guids of RMs to deploy
188         :type group: list
189
190         :param wait_all_ready: Wait until all RMs are deployed in
191             order to start the RMs
192         :type guid: int
193
194         """
195         def steps(rm):
196             rm.deploy()
197             rm.start_with_conditions()
198
199             # Only if the RM has STOP consitions we
200             # schedule a stop. Otherwise the RM will stop immediately
201             if rm.conditions.get(ResourceAction.STOP):
202                 rm.stop_with_conditions()
203
204         if not group:
205             group = self.resources
206
207         threads = []
208         for guid in group:
209             rm = self.get_resource(guid)
210
211             if wait_all_ready:
212                 towait = list(group)
213                 towait.remove(guid)
214                 self.register_condition(guid, ResourceAction.START, 
215                         towait, ResourceState.DEPLOYED)
216
217             thread = threading.Thread(target = steps, args = (rm))
218             threads.append(thread)
219             thread.start()
220
221         for thread in threads:
222             thread.join()
223
224     def release(self, group = None):
225         if not group:
226             group = self.resources
227
228         threads = []
229         for guid in group:
230             rm = self.get_resource(guid)
231             thread = threading.Thread(target=rm.release)
232             threads.append(thread)
233             thread.start()
234
235         for thread in threads:
236             thread.join()
237
238     def shutdown(self):
239         self.release()
240         
241         self._stop = True
242         self._cond.acquire()
243         self._cond.notify()
244         self._cond.release()
245         if self._thread.is_alive():
246            self._thread.join()
247
248     def schedule(self, date, callback, track = False):
249         """ Schedule a callback to be executed at time date.
250
251             date    string containing execution time for the task.
252                     It can be expressed as an absolute time, using
253                     timestamp format, or as a relative time matching
254                     ^\d+.\d+(h|m|s|ms|us)$
255
256             callback    code to be executed for the task. Must be a
257                         Python function, and receives args and kwargs
258                         as arguments.
259
260             track   if set to True, the task will be retrivable with
261                     the get_task() method
262         """
263         timestamp = strfvalid(date)
264         
265         task = Task(timestamp, callback)
266         task = self._scheduler.schedule(task)
267
268         if track:
269             self._tasks[task.id] = task
270   
271         # Notify condition to wake up the processing thread
272         self._cond.acquire()
273         self._cond.notify()
274         self._cond.release()
275
276         return task.id
277      
278     def _process(self):
279         runner = ParallelRun(maxthreads = 50)
280         runner.start()
281
282         try:
283             while not self._stop:
284                 self._cond.acquire()
285                 task = self._scheduler.next()
286                 self._cond.release()
287
288                 if not task:
289                     # It there are not tasks in the tasks queue we need to 
290                     # wait until a call to schedule wakes us up
291                     self._cond.acquire()
292                     self._cond.wait()
293                     self._cond.release()
294                 else: 
295                     # If the task timestamp is in the future the thread needs to wait
296                     # until time elapse or until another task is scheduled
297                     now = strfnow()
298                     if now < task.timestamp:
299                         # Calculate time difference in seconds
300                         timeout = strfdiff(task.timestamp, now)
301                         # Re-schedule task with the same timestamp
302                         self._scheduler.schedule(task)
303                         # Sleep until timeout or until a new task awakes the condition
304                         self._cond.acquire()
305                         self._cond.wait(timeout)
306                         self._cond.release()
307                     else:
308                         # Process tasks in parallel
309                         runner.put(self._execute, task)
310         except:  
311             import traceback
312             err = traceback.format_exc()
313             self._logger.error("Error while processing tasks in the EC: %s" % err)
314
315     def _execute(self, task):
316         # Invoke callback
317         task.status = TaskStatus.DONE
318
319         try:
320             task.result = task.callback()
321         except:
322             import traceback
323             err = traceback.format_exc()
324             self._logger.error("Error while executing event: %s" % err)
325
326             task.result = err
327             task.status = TaskStatus.ERROR
328