7 from neco.util import guid
8 from neco.util.timefuncs import strfnow, strfdiff, strfvalid
9 from neco.execution.resource import ResourceFactory
10 from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
11 from neco.util.parallel import ParallelRun
13 class ExperimentController(object):
14 def __init__(self, root_dir = "/tmp", loglevel = 'error'):
15 super(ExperimentController, self).__init__()
16 # root directory to store files
17 self._root_dir = root_dir
19 # generator of globally unique ids
20 self._guid_generator = guid.GuidGenerator()
23 self._resources = dict()
26 self._scheduler = HeapScheduler()
31 # Event processing thread
33 self._cond = threading.Condition()
34 self._thread = threading.Thread(target = self._process)
38 self._logger = logging.getLogger("neco.execution.ec")
39 self._logger.setLevel(getattr(logging, loglevel.upper()))
41 def get_task(self, tid):
42 return self._tasks.get(tid)
44 def get_resource(self, guid):
45 return self._resources.get(guid)
49 return self._resources.keys()
51 def register_resource(self, rtype, guid = None, creds = None):
52 # Get next available guid
53 guid = self._guid_generator.next(guid)
56 rm = ResourceFactory.create(rtype, self, guid, creds)
59 self._resources[guid] = rm
63 def get_attributes(self, guid):
64 rm = self.get_resource(guid)
65 return rm.get_attributes()
67 def get_filters(self, guid):
68 rm = self.get_resource(guid)
69 return rm.get_filters()
71 def register_connection(self, guid1, guid2):
72 rm1 = self.get_resource(guid1)
73 rm2 = self.get_resource(guid2)
78 def discover_resource(self, guid, filters):
79 rm = self.get_resource(guid)
80 return rm.discover(filters)
82 def provision_resource(self, guid, filters):
83 rm = self.get_resource(guid)
84 return rm.provision(filters)
86 def register_start(self, group1, time, after_status, group2):
87 if isinstance(group1, int):
89 if isinstance(group2, int):
94 rm = self.get_resource(guid)
95 rm.start_after(time, after_status, guid2)
97 def register_stop(self, group1, time, after_status, group2):
98 if isinstance(group1, int):
100 if isinstance(group2, int):
101 group2 = list[group2]
105 rm = self.get_resource(guid)
106 rm.stop_after(time, after_status, guid2)
108 def register_set(self, name, value, group1, time, after_status, group2):
109 if isinstance(group1, int):
110 group1 = list[group1]
111 if isinstance(group2, int):
112 group2 = list[group2]
116 rm = self.get_resource(guid)
117 rm.set_after(name, value, time, after_status, guid2)
119 def get(self, guid, name):
120 rm = self.get_resource(guid)
123 def set(self, guid, name, value):
124 rm = self.get_resource(guid)
125 return rm.set(name, value)
127 def status(self, guid):
128 rm = self.get_resource(guid)
131 def stop(self, guid):
132 rm = self.get_resource(guid)
135 def deploy(self, group = None, start_when_all_ready = True):
137 group = self.resources
141 rm = self.get_resource(guid)
143 kwargs = {'target': rm.deploy}
144 if start_when_all_ready:
147 kwargs['args'] = towait
149 thread = threading.Thread(kwargs)
150 threads.append(thread)
153 for thread in threads:
156 def release(self, group = None):
158 group = self.resources
162 rm = self.get_resource(guid)
163 thread = threading.Thread(target=rm.release)
164 threads.append(thread)
167 for thread in threads:
177 if self._thread.is_alive():
180 def schedule(self, date, callback, track = False):
181 """ Schedule a callback to be executed at time date.
183 date string containing execution time for the task.
184 It can be expressed as an absolute time, using
185 timestamp format, or as a relative time matching
186 ^\d+.\d+(h|m|s|ms|us)$
188 callback code to be executed for the task. Must be a
189 Python function, and receives args and kwargs
192 track if set to True, the task will be retrivable with
193 the get_task() method
195 timestamp = strfvalid(date)
197 task = Task(timestamp, callback)
198 task = self._scheduler.schedule(task)
201 self._tasks[task.id] = task
203 # Notify condition to wake up the processing thread
211 runner = ParallelRun(maxthreads = 50)
215 while not self._stop:
217 task = self._scheduler.next()
221 # It there are not tasks in the tasks queue we need to
222 # wait until a call to schedule wakes us up
227 # If the task timestamp is in the future the thread needs to wait
228 # until time elapse or until another task is scheduled
230 if now < task.timestamp:
231 # Calculate time difference in seconds
232 timeout = strfdiff(task.timestamp, now)
233 # Re-schedule task with the same timestamp
234 self._scheduler.schedule(task)
235 # Sleep until timeout or until a new task awakes the condition
237 self._cond.wait(timeout)
240 # Process tasks in parallel
241 runner.put(self._execute, task)
244 err = traceback.format_exc()
245 self._logger.error("Error while processing tasks in the EC: %s" % err)
247 def _execute(self, task):
249 task.status = TaskStatus.DONE
252 task.result = task.callback()
255 err = traceback.format_exc()
256 self._logger.error("Error while executing event: %s" % err)
259 task.status = TaskStatus.ERROR