Added scheduler and task processing thread to ec. Completed deploy and release methods.
[nepi.git] / src / neco / execution / ec.py
1 import logging
2 import os
3 import sys
4 import time
5
6 from neco.util import guid
7 from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
8 from neco.execution.resource import ResourceFactory
9 from neco.execution.scheduler import HeapScheduler, Task
10 from neco.util.parallel import ParallelRun
11
12 class ExperimentController(object):
13     def __init__(self, root_dir = "/tmp", loglevel = 'error'):
14         super(ExperimentController, self).__init__()
15         # root directory to store files
16         self._root_dir = root_dir
17
18         # generator of globally unique ids
19         self._guid_generator = guid.GuidGenerator()
20         
21         # Resource managers
22         self._resources = dict()
23
24         # Scheduler
25         self._scheduler = HeapScheduler()
26
27         # Event processing thread
28         self._stop = False
29         self._cond = threading.Condition()
30         self._thread = threading.Thread(target = self._process)
31         self._thread.start()
32
33         # Logging
34         self._logger = logging.getLogger("neco.execution.ec")
35         self._logger.setLevel(getattr(logging, loglevel.upper()))
36
37     def resource(self, guid):
38         return self._resources.get(guid)
39
40     @property
41     def resources(self):
42         return self._resources.keys()
43
44     def register_resource(self, rtype, guid = None, creds = None):
45         # Get next available guid
46         guid = self._guid_generator.next(guid)
47         
48         # Instantiate RM
49         rm = ResourceFactory.create(rtype, self, guid, creds)
50
51         # Store RM
52         self._resources[guid] = rm
53
54         return guid
55
56     def get_attributes(self, guid):
57         rm = self._resources[guid]
58         return rm.get_attributes()
59
60     def get_filters(self, guid):
61         rm = self._resources[guid]
62         return rm.get_filters()
63
64     def register_connection(self, guid1, guid2):
65         rm1 = self._resources[guid1]
66         rm2 = self._resources[guid2]
67
68         rm1.connect(guid2)
69         rm2.connect(guid1)
70
71     def discover_resource(self, guid, filters):
72         rm = self._resources[guid]
73         return rm.discover(filters)
74
75     def provision_resource(self, guid, filters):
76         rm = self._resources[guid]
77         return rm.provision(filters)
78
79     def register_start(self, group1, time, after_status, group2):
80         if isinstance(group1, int):
81             group1 = list[group1]
82         if isinstance(group2, int):
83             group2 = list[group2]
84
85         for guid1 in group1:
86             for guid2 in group2:
87                 rm = self._resources(guid1)
88                 rm.start_after(time, after_status, guid2)
89
90     def register_stop(self, group1, time, after_status, group2):
91         if isinstance(group1, int):
92             group1 = list[group1]
93         if isinstance(group2, int):
94             group2 = list[group2]
95
96         for guid1 in group1:
97             for guid2 in group2:
98                 rm = self._resources(guid1)
99                 rm.stop_after(time, after_status, guid2)
100
101     def register_set(self, name, value, group1, time, after_status, group2):
102         if isinstance(group1, int):
103             group1 = list[group1]
104         if isinstance(group2, int):
105             group2 = list[group2]
106
107         for guid1 in group1:
108             for guid2 in group2:
109                 rm = self._resources(guid1)
110                 rm.set_after(name, value, time, after_status, guid2)
111
112     def get(self, guid, name):
113         rm = self._resources(guid)
114         return rm.get(name)
115
116     def set(self, guid, name, value):
117         rm = self._resources(guid)
118         return rm.set(name, value)
119
120     def status(self, guid):
121         rm = self._resources(guid)
122         return rm.status()
123
124     def stop(self, guid):
125         rm = self._resources(guid)
126         return rm.stop()
127
128     def deploy(self, group = None, start_when_all_ready = True):
129         if not group:
130             group = self.resources
131
132         threads = []
133         for guid in group:
134             rm = self._resources(guid1)
135
136             kwargs = {'target': rm.deploy}
137             if start_when_all_ready:
138                 towait = list(group)
139                 towait.remove(guid)
140                 kwargs['args'] = towait
141
142             thread = threading.Thread(kwargs)
143             threads.append(thread)
144             thread.start()
145
146         for thread in threads:
147             thread.join()
148
149     def release(self, group = None):
150         if not group:
151             group = self.resources
152
153         threads = []
154         for guid in group:
155             rm = self._resources(guid1)
156             thread = threading.Thread(target=rm.release)
157             threads.append(thread)
158             thread.start()
159
160         for thread in threads:
161             thread.join()
162
163     def shutdown(self):
164         self._stop = False
165         self.release()
166
167     def schedule(self, date, callback):
168         """
169             date    string containing execution time for the task.
170                     It can be expressed as an absolute time, using
171                     timestamp format, or as a relative time matching
172                     ^\d+.\d+(h|m|s|ms|us)$
173
174             callback    code to be executed for the task. Must be a
175                         Python function, and receives args and kwargs
176                         as arguments.
177         """
178         timestamp = strfvalid(date)
179         
180         task = Task(timestamp, callback)
181         task = self._scheduler.schedule(task)
182
183         # Notify condition to wake up the processing thread
184         self._cond.acquire()
185         self._cond.notify()
186         self._cond.release()
187         return task
188      
189     def _process(self):
190         runner = ParallelRun(maxthreads = 50)
191         runner.start()
192
193         try:
194             while not self._stop:
195                 self._cond.acquire()
196                 task = self._scheduler.next()
197                 self._cond.release()
198
199                 if not task:
200                     # It there are not tasks in the tasks queue we need to 
201                     # wait until a call to schedule wakes us up
202                     self._cond.acquire()
203                     self._cond.wait()
204                     self._cond.release()
205                 else: 
206                     # If the task timestamp is in the future the thread needs to wait
207                     # until time elapse or until another task is scheduled
208                     now = strfnow()
209                     if now < task.timestamp:
210                         # Calculate time difference in seconds
211                         timeout = strfdiff(task.timestamp, now)
212                         # Re-schedule task with the same timestamp
213                         self._scheduler.schedule(task)
214                         # Sleep until timeout or until a new task awakes the condition
215                         self._cond.acquire()
216                         self._cond.wait(timeout)
217                         self._cond.release()
218                     else:
219                         # Process tasks in parallel
220                         runner.put(task.callback)
221         except:  
222             import traceback
223             err = traceback.format_exc()
224             self._logger.error("Error while processing tasks in the EC: %s" % err)
225