Fixing ec unit tests
[nepi.git] / src / neco / execution / ec.py
1 import logging
2 import os
3 import sys
4 import time
5 import threading
6
7 from neco.util import guid
8 from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
9 from neco.execution.resource import ResourceFactory
10 from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
11 from neco.util.parallel import ParallelRun
12
13 class ExperimentController(object):
14     def __init__(self, root_dir = "/tmp", loglevel = 'error'): 
15         super(ExperimentController, self).__init__()
16         # root directory to store files
17         self._root_dir = root_dir
18
19         # generator of globally unique ids
20         self._guid_generator = guid.GuidGenerator()
21         
22         # Resource managers
23         self._resources = dict()
24
25         # Scheduler
26         self._scheduler = HeapScheduler()
27
28         # Tasks
29         self._tasks = dict()
30
31         # Event processing thread
32         self._stop = False
33         self._cond = threading.Condition()
34         self._thread = threading.Thread(target = self._process)
35         self._thread.start()
36
37         # Logging
38         self._logger = logging.getLogger("neco.execution.ec")
39         self._logger.setLevel(getattr(logging, loglevel.upper()))
40
41     def get_task(self, tid):
42         return self._tasks.get(tid)
43
44     def get_resource(self, guid):
45         return self._resources.get(guid)
46
47     @property
48     def resources(self):
49         return self._resources.keys()
50
51     def register_resource(self, rtype, guid = None, creds = None):
52         # Get next available guid
53         guid = self._guid_generator.next(guid)
54         
55         # Instantiate RM
56         rm = ResourceFactory.create(rtype, self, guid, creds)
57
58         # Store RM
59         self._resources[guid] = rm
60
61         return guid
62
63     def get_attributes(self, guid):
64         rm = self.get_resource(guid)
65         return rm.get_attributes()
66
67     def get_filters(self, guid):
68         rm = self.get_resource(guid)
69         return rm.get_filters()
70
71     def register_connection(self, guid1, guid2):
72         rm1 = self.get_resource(guid1)
73         rm2 = self.get_resource(guid2)
74
75         rm1.connect(guid2)
76         rm2.connect(guid1)
77
78     def discover_resource(self, guid, filters):
79         rm = self.get_resource(guid)
80         return rm.discover(filters)
81
82     def provision_resource(self, guid, filters):
83         rm = self.get_resource(guid)
84         return rm.provision(filters)
85
86     def register_start(self, group1, time, after_status, group2):
87         if isinstance(group1, int):
88             group1 = list[group1]
89         if isinstance(group2, int):
90             group2 = list[group2]
91
92         for guid1 in group1:
93             for guid2 in group2:
94                 rm = self.get_resource(guid)
95                 rm.start_after(time, after_status, guid2)
96
97     def register_stop(self, group1, time, after_status, group2):
98         if isinstance(group1, int):
99             group1 = list[group1]
100         if isinstance(group2, int):
101             group2 = list[group2]
102
103         for guid1 in group1:
104             for guid2 in group2:
105                 rm = self.get_resource(guid)
106                 rm.stop_after(time, after_status, guid2)
107
108     def register_set(self, name, value, group1, time, after_status, group2):
109         if isinstance(group1, int):
110             group1 = list[group1]
111         if isinstance(group2, int):
112             group2 = list[group2]
113
114         for guid1 in group1:
115             for guid2 in group2:
116                 rm = self.get_resource(guid)
117                 rm.set_after(name, value, time, after_status, guid2)
118
119     def get(self, guid, name):
120         rm = self.get_resource(guid)
121         return rm.get(name)
122
123     def set(self, guid, name, value):
124         rm = self.get_resource(guid)
125         return rm.set(name, value)
126
127     def status(self, guid):
128         rm = self.get_resource(guid)
129         return rm.status()
130
131     def stop(self, guid):
132         rm = self.get_resource(guid)
133         return rm.stop()
134
135     def deploy(self, group = None, start_when_all_ready = True):
136         if not group:
137             group = self.resources
138
139         threads = []
140         for guid in group:
141             rm = self.get_resource(guid)
142
143             kwargs = {'target': rm.deploy}
144             if start_when_all_ready:
145                 towait = list(group)
146                 towait.remove(guid)
147                 kwargs['args'] = towait
148
149             thread = threading.Thread(kwargs)
150             threads.append(thread)
151             thread.start()
152
153         for thread in threads:
154             thread.join()
155
156     def release(self, group = None):
157         if not group:
158             group = self.resources
159
160         threads = []
161         for guid in group:
162             rm = self.get_resource(guid)
163             thread = threading.Thread(target=rm.release)
164             threads.append(thread)
165             thread.start()
166
167         for thread in threads:
168             thread.join()
169
170     def shutdown(self):
171         self.release()
172         
173         self._stop = True
174         self._cond.acquire()
175         self._cond.notify()
176         self._cond.release()
177         if self._thread.is_alive():
178            self._thread.join()
179
180     def schedule(self, date, callback, track = False):
181         """
182             date    string containing execution time for the task.
183                     It can be expressed as an absolute time, using
184                     timestamp format, or as a relative time matching
185                     ^\d+.\d+(h|m|s|ms|us)$
186
187             callback    code to be executed for the task. Must be a
188                         Python function, and receives args and kwargs
189                         as arguments.
190
191             track   if set to True, the task will be retrivable with
192                     the get_task() method
193         """
194         timestamp = strfvalid(date)
195         
196         task = Task(timestamp, callback)
197         task = self._scheduler.schedule(task)
198
199         if track:
200             self._tasks[task.id] = task
201   
202         # Notify condition to wake up the processing thread
203         self._cond.acquire()
204         self._cond.notify()
205         self._cond.release()
206
207         return task.id
208      
209     def _process(self):
210         runner = ParallelRun(maxthreads = 50)
211         runner.start()
212
213         try:
214             while not self._stop:
215                 self._cond.acquire()
216                 task = self._scheduler.next()
217                 self._cond.release()
218
219                 if not task:
220                     # It there are not tasks in the tasks queue we need to 
221                     # wait until a call to schedule wakes us up
222                     self._cond.acquire()
223                     self._cond.wait()
224                     self._cond.release()
225                 else: 
226                     # If the task timestamp is in the future the thread needs to wait
227                     # until time elapse or until another task is scheduled
228                     now = strfnow()
229                     if now < task.timestamp:
230                         # Calculate time difference in seconds
231                         timeout = strfdiff(task.timestamp, now)
232                         # Re-schedule task with the same timestamp
233                         self._scheduler.schedule(task)
234                         # Sleep until timeout or until a new task awakes the condition
235                         self._cond.acquire()
236                         self._cond.wait(timeout)
237                         self._cond.release()
238                     else:
239                         # Process tasks in parallel
240                         runner.put(self._execute, task)
241         except:  
242             import traceback
243             err = traceback.format_exc()
244             self._logger.error("Error while processing tasks in the EC: %s" % err)
245
246     def _execute(self, task):
247         # Invoke callback
248         task.status = TaskStatus.DONE
249
250         try:
251             task.result = task.callback()
252         except:
253             import traceback
254             err = traceback.format_exc()
255             self._logger.error("Error while executing event: %s" % err)
256
257             task.result = err
258             task.status = TaskStatus.ERROR
259