Debug the deploy part and start with condition
[nepi.git] / src / neco / execution / ec.py
1 import logging
2 import os
3 import sys
4 import time
5 import threading
6
7 from neco.util import guid
8 from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
9 from neco.execution.resource import ResourceFactory, ResourceAction, \
10         ResourceState
11 from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
12 from neco.util.parallel import ParallelRun
13
14 # TODO: use multiprocessing instead of threading
15
16 class ExperimentController(object):
17     def __init__(self, root_dir = "/tmp", loglevel = 'error'): 
18         super(ExperimentController, self).__init__()
19         # root directory to store files
20         self._root_dir = root_dir
21
22         # generator of globally unique ids
23         self._guid_generator = guid.GuidGenerator()
24         
25         # Resource managers
26         self._resources = dict()
27
28         # Resource managers
29         self._group = dict()
30
31         # Scheduler
32         self._scheduler = HeapScheduler()
33
34         # Tasks
35         self._tasks = dict()
36
37         # Event processing thread
38         self._stop = False
39         self._cond = threading.Condition()
40         self._thread = threading.Thread(target = self._process)
41         self._thread.start()
42
43         # Logging
44         self._logger = logging.getLogger("neco.execution.ec")
45         self._logger.setLevel(getattr(logging, loglevel.upper()))
46
47     def get_task(self, tid):
48         return self._tasks.get(tid)
49
50     def get_resource(self, guid):
51         return self._resources.get(guid)
52
53     @property
54     def resources(self):
55         return self._resources.keys()
56
57     def register_resource(self, rtype, guid = None):
58         # Get next available guid
59         guid = self._guid_generator.next(guid)
60         
61         # Instantiate RM
62         rm = ResourceFactory.create(rtype, self, guid)
63
64         # Store RM
65         self._resources[guid] = rm
66
67         return guid
68
69     def create_group(self, *args):
70         guid = self._guid_generator.next(guid)
71
72         grp = [arg for arg in args]
73
74         self._resources[guid] = grp
75
76         return guid
77  
78
79     def get_attributes(self, guid):
80         rm = self.get_resource(guid)
81         return rm.get_attributes()
82
83     def get_filters(self, guid):
84         rm = self.get_resource(guid)
85         return rm.get_filters()
86
87     def register_connection(self, guid1, guid2):
88         rm1 = self.get_resource(guid1)
89         rm2 = self.get_resource(guid2)
90
91         rm1.connect(guid2)
92         rm2.connect(guid1)
93
94     def register_condition(self, group1, action, group2, state,
95             time = None):
96         """ Registers an action START or STOP for all RM on group1 to occur 
97             time 'time' after all elements in group2 reached state 'state'.
98
99             :param group1: List of guids of RMs subjected to action
100             :type group1: list
101
102             :param action: Action to register (either START or STOP)
103             :type action: ResourceAction
104
105             :param group2: List of guids of RMs to we waited for
106             :type group2: list
107
108             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
109             :type state: ResourceState
110
111             :param time: Time to wait after group2 has reached status 
112             :type time: string
113
114         """
115         if isinstance(group1, int):
116             group1 = [group1]
117         if isinstance(group2, int):
118             group2 = [group2]
119
120         for guid1 in group1:
121             rm = self.get_resource(guid1)
122             rm.register_condition(action, group2, state, time)
123
124     def discover(self, guid, filters):
125         rm = self.get_resource(guid)
126         return rm.discover(filters)
127
128     def provision(self, guid, filters):
129         rm = self.get_resource(guid)
130         return rm.provision(filters)
131
132     def get(self, guid, name):
133         rm = self.get_resource(guid)
134         return rm.get(name)
135
136     def set(self, guid, name, value):
137         rm = self.get_resource(guid)
138         return rm.set(name, value)
139
140     def state(self, guid):
141         rm = self.get_resource(guid)
142         return rm.state
143
144     def stop(self, guid):
145         rm = self.get_resource(guid)
146         return rm.stop()
147
148     def start(self, guid):
149         rm = self.get_resource(guid)
150         return rm.start()
151
152     def set_with_conditions(self, name, value, group1, group2, state,
153             time = None):
154         """ Set value 'value' on attribute with name 'name' on all RMs of
155             group1 when 'time' has elapsed since all elements in group2 
156             have reached state 'state'.
157
158             :param name: Name of attribute to set in RM
159             :type name: string
160
161             :param value: Value of attribute to set in RM
162             :type name: string
163
164             :param group1: List of guids of RMs subjected to action
165             :type group1: list
166
167             :param action: Action to register (either START or STOP)
168             :type action: ResourceAction
169
170             :param group2: List of guids of RMs to we waited for
171             :type group2: list
172
173             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
174             :type state: ResourceState
175
176             :param time: Time to wait after group2 has reached status 
177             :type time: string
178
179         """
180         if isinstance(group1, int):
181             group1 = [group1]
182         if isinstance(group2, int):
183             group2 = [group2]
184
185         for guid1 in group1:
186             rm = self.get_resource(guid)
187             rm.set_with_conditions(name, value, group2, state, time)
188
189     def stop_with_conditions(self, guid):
190         rm = self.get_resource(guid)
191         return rm.stop_with_conditions()
192
193     def start_with_conditions(self, guid):
194         rm = self.get_resource(guid)
195         return rm.start_with_condition()
196
197     def deploy(self, group = None, wait_all_deployed = True):
198         """ Deploy all resource manager in group
199
200         :param group: List of guids of RMs to deploy
201         :type group: list
202
203         :param wait_all_deployed: Wait until all RMs are deployed in
204             order to start the RMs
205         :type guid: int
206
207         """
208         def steps(rm):
209             rm.deploy()
210             rm.start_with_conditions()
211
212             # Only if the RM has STOP consitions we
213             # schedule a stop. Otherwise the RM will stop immediately
214             if rm.conditions.get(ResourceAction.STOP):
215                 rm.stop_with_conditions()
216
217         if not group:
218             group = self.resources
219
220         threads = []
221         for guid in group:
222             rm = self.get_resource(guid)
223
224             if wait_all_deployed:
225                 towait = list(group)
226                 towait.remove(guid)
227                 self.register_condition(guid, ResourceAction.START, 
228                         towait, ResourceState.DEPLOYED)
229
230             thread = threading.Thread(target = steps, args = (rm,))
231             threads.append(thread)
232             thread.start()
233
234         for thread in threads:
235             thread.join()
236
237     def release(self, group = None):
238         if not group:
239             group = self.resources
240
241         threads = []
242         for guid in group:
243             rm = self.get_resource(guid)
244             thread = threading.Thread(target=rm.release)
245             threads.append(thread)
246             thread.start()
247
248         for thread in threads:
249             thread.join()
250
251     def shutdown(self):
252         self.release()
253         
254         self._stop = True
255         self._cond.acquire()
256         self._cond.notify()
257         self._cond.release()
258         if self._thread.is_alive():
259            self._thread.join()
260
261     def schedule(self, date, callback, track = False):
262         """ Schedule a callback to be executed at time date.
263
264             date    string containing execution time for the task.
265                     It can be expressed as an absolute time, using
266                     timestamp format, or as a relative time matching
267                     ^\d+.\d+(h|m|s|ms|us)$
268
269             callback    code to be executed for the task. Must be a
270                         Python function, and receives args and kwargs
271                         as arguments.
272
273             track   if set to True, the task will be retrivable with
274                     the get_task() method
275         """
276         timestamp = strfvalid(date)
277         
278         task = Task(timestamp, callback)
279         task = self._scheduler.schedule(task)
280
281         if track:
282             self._tasks[task.id] = task
283   
284         # Notify condition to wake up the processing thread
285         self._cond.acquire()
286         self._cond.notify()
287         self._cond.release()
288
289         return task.id
290      
291     def _process(self):
292         runner = ParallelRun(maxthreads = 50)
293         runner.start()
294
295         try:
296             while not self._stop:
297                 self._cond.acquire()
298                 task = self._scheduler.next()
299                 self._cond.release()
300
301                 if not task:
302                     # It there are not tasks in the tasks queue we need to 
303                     # wait until a call to schedule wakes us up
304                     self._cond.acquire()
305                     self._cond.wait()
306                     self._cond.release()
307                 else: 
308                     # If the task timestamp is in the future the thread needs to wait
309                     # until time elapse or until another task is scheduled
310                     now = strfnow()
311                     if now < task.timestamp:
312                         # Calculate time difference in seconds
313                         timeout = strfdiff(task.timestamp, now)
314                         # Re-schedule task with the same timestamp
315                         self._scheduler.schedule(task)
316                         # Sleep until timeout or until a new task awakes the condition
317                         self._cond.acquire()
318                         self._cond.wait(timeout)
319                         self._cond.release()
320                     else:
321                         # Process tasks in parallel
322                         runner.put(self._execute, task)
323         except:  
324             import traceback
325             err = traceback.format_exc()
326             self._logger.error("Error while processing tasks in the EC: %s" % err)
327
328     def _execute(self, task):
329         # Invoke callback
330         task.status = TaskStatus.DONE
331
332         try:
333             task.result = task.callback()
334         except:
335             import traceback
336             err = traceback.format_exc()
337             self._logger.error("Error while executing event: %s" % err)
338
339             task.result = err
340             task.status = TaskStatus.ERROR
341