Creating ssh api (unfinished)
[nepi.git] / src / neco / execution / ec.py
1 import logging
2 import os
3 import sys
4 import time
5 import threading
6
7 from neco.util import guid
8 from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
9 from neco.execution.resource import ResourceFactory
10 from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
11 from neco.util.parallel import ParallelRun
12
13 class ExperimentController(object):
14     def __init__(self, root_dir = "/tmp", loglevel = 'error'): 
15         super(ExperimentController, self).__init__()
16         # root directory to store files
17         self._root_dir = root_dir
18
19         # generator of globally unique ids
20         self._guid_generator = guid.GuidGenerator()
21         
22         # Resource managers
23         self._resources = dict()
24
25         # Scheduler
26         self._scheduler = HeapScheduler()
27
28         # Tasks
29         self._tasks = dict()
30
31         # Event processing thread
32         self._stop = False
33         self._cond = threading.Condition()
34         self._thread = threading.Thread(target = self._process)
35         self._thread.start()
36
37         # Logging
38         self._logger = logging.getLogger("neco.execution.ec")
39         self._logger.setLevel(getattr(logging, loglevel.upper()))
40
41     def get_task(self, tid):
42         return self._tasks.get(tid)
43
44     def get_resource(self, guid):
45         return self._resources.get(guid)
46
47     @property
48     def resources(self):
49         return self._resources.keys()
50
51     def register_resource(self, rtype, guid = None, creds = None):
52         # Get next available guid
53         guid = self._guid_generator.next(guid)
54         
55         # Instantiate RM
56         rm = ResourceFactory.create(rtype, self, guid, creds)
57
58         # Store RM
59         self._resources[guid] = rm
60
61         return guid
62
63     def get_attributes(self, guid):
64         rm = self.get_resource(guid)
65         return rm.get_attributes()
66
67     def get_filters(self, guid):
68         rm = self.get_resource(guid)
69         return rm.get_filters()
70
71     def register_connection(self, guid1, guid2):
72         rm1 = self.get_resource(guid1)
73         rm2 = self.get_resource(guid2)
74
75         rm1.connect(guid2)
76         rm2.connect(guid1)
77
78     def discover_resource(self, guid, filters):
79         rm = self.get_resource(guid)
80         return rm.discover(filters)
81
82     def provision_resource(self, guid, filters):
83         rm = self.get_resource(guid)
84         return rm.provision(filters)
85
86     def register_start(self, group1, time, after_status, group2):
87         if isinstance(group1, int):
88             group1 = list[group1]
89         if isinstance(group2, int):
90             group2 = list[group2]
91
92         for guid1 in group1:
93             for guid2 in group2:
94                 rm = self.get_resource(guid)
95                 rm.start_after(time, after_status, guid2)
96
97     def register_stop(self, group1, time, after_status, group2):
98         if isinstance(group1, int):
99             group1 = list[group1]
100         if isinstance(group2, int):
101             group2 = list[group2]
102
103         for guid1 in group1:
104             for guid2 in group2:
105                 rm = self.get_resource(guid)
106                 rm.stop_after(time, after_status, guid2)
107
108     def register_set(self, name, value, group1, time, after_status, group2):
109         if isinstance(group1, int):
110             group1 = list[group1]
111         if isinstance(group2, int):
112             group2 = list[group2]
113
114         for guid1 in group1:
115             for guid2 in group2:
116                 rm = self.get_resource(guid)
117                 rm.set_after(name, value, time, after_status, guid2)
118
119     def get(self, guid, name):
120         rm = self.get_resource(guid)
121         return rm.get(name)
122
123     def set(self, guid, name, value):
124         rm = self.get_resource(guid)
125         return rm.set(name, value)
126
127     def status(self, guid):
128         rm = self.get_resource(guid)
129         return rm.status()
130
131     def stop(self, guid):
132         rm = self.get_resource(guid)
133         return rm.stop()
134
135     def deploy(self, group = None, start_when_all_ready = True):
136         if not group:
137             group = self.resources
138
139         threads = []
140         for guid in group:
141             rm = self.get_resource(guid)
142
143             kwargs = {'target': rm.deploy}
144             if start_when_all_ready:
145                 towait = list(group)
146                 towait.remove(guid)
147                 kwargs['args'] = towait
148
149             thread = threading.Thread(kwargs)
150             threads.append(thread)
151             thread.start()
152
153         for thread in threads:
154             thread.join()
155
156     def release(self, group = None):
157         if not group:
158             group = self.resources
159
160         threads = []
161         for guid in group:
162             rm = self.get_resource(guid)
163             thread = threading.Thread(target=rm.release)
164             threads.append(thread)
165             thread.start()
166
167         for thread in threads:
168             thread.join()
169
170     def shutdown(self):
171         self.release()
172         
173         self._stop = True
174         self._cond.acquire()
175         self._cond.notify()
176         self._cond.release()
177         if self._thread.is_alive():
178            self._thread.join()
179
180     def schedule(self, date, callback, track = False):
181         """ Schedule a callback to be executed at time date.
182
183             date    string containing execution time for the task.
184                     It can be expressed as an absolute time, using
185                     timestamp format, or as a relative time matching
186                     ^\d+.\d+(h|m|s|ms|us)$
187
188             callback    code to be executed for the task. Must be a
189                         Python function, and receives args and kwargs
190                         as arguments.
191
192             track   if set to True, the task will be retrivable with
193                     the get_task() method
194         """
195         timestamp = strfvalid(date)
196         
197         task = Task(timestamp, callback)
198         task = self._scheduler.schedule(task)
199
200         if track:
201             self._tasks[task.id] = task
202   
203         # Notify condition to wake up the processing thread
204         self._cond.acquire()
205         self._cond.notify()
206         self._cond.release()
207
208         return task.id
209      
210     def _process(self):
211         runner = ParallelRun(maxthreads = 50)
212         runner.start()
213
214         try:
215             while not self._stop:
216                 self._cond.acquire()
217                 task = self._scheduler.next()
218                 self._cond.release()
219
220                 if not task:
221                     # It there are not tasks in the tasks queue we need to 
222                     # wait until a call to schedule wakes us up
223                     self._cond.acquire()
224                     self._cond.wait()
225                     self._cond.release()
226                 else: 
227                     # If the task timestamp is in the future the thread needs to wait
228                     # until time elapse or until another task is scheduled
229                     now = strfnow()
230                     if now < task.timestamp:
231                         # Calculate time difference in seconds
232                         timeout = strfdiff(task.timestamp, now)
233                         # Re-schedule task with the same timestamp
234                         self._scheduler.schedule(task)
235                         # Sleep until timeout or until a new task awakes the condition
236                         self._cond.acquire()
237                         self._cond.wait(timeout)
238                         self._cond.release()
239                     else:
240                         # Process tasks in parallel
241                         runner.put(self._execute, task)
242         except:  
243             import traceback
244             err = traceback.format_exc()
245             self._logger.error("Error while processing tasks in the EC: %s" % err)
246
247     def _execute(self, task):
248         # Invoke callback
249         task.status = TaskStatus.DONE
250
251         try:
252             task.result = task.callback()
253         except:
254             import traceback
255             err = traceback.format_exc()
256             self._logger.error("Error while executing event: %s" % err)
257
258             task.result = err
259             task.status = TaskStatus.ERROR
260