7 from neco.util import guid
8 from neco.util.timefuncs import strfnow, strfdiff, strfvalid
9 from neco.execution.resource import ResourceFactory
10 from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
11 from neco.util.parallel import ParallelRun
13 class ExperimentController(object):
14 def __init__(self, root_dir = "/tmp", loglevel = 'error'):
15 super(ExperimentController, self).__init__()
16 # root directory to store files
17 self._root_dir = root_dir
19 # generator of globally unique ids
20 self._guid_generator = guid.GuidGenerator()
23 self._resources = dict()
26 self._scheduler = HeapScheduler()
31 # Event processing thread
33 self._cond = threading.Condition()
34 self._thread = threading.Thread(target = self._process)
38 self._logger = logging.getLogger("neco.execution.ec")
39 self._logger.setLevel(getattr(logging, loglevel.upper()))
41 def get_task(self, tid):
42 return self._tasks.get(tid)
44 def get_resource(self, guid):
45 return self._resources.get(guid)
49 return self._resources.keys()
51 def register_resource(self, rtype, guid = None, creds = None):
52 # Get next available guid
53 guid = self._guid_generator.next(guid)
56 rm = ResourceFactory.create(rtype, self, guid, creds)
59 self._resources[guid] = rm
63 def get_attributes(self, guid):
64 rm = self.get_resource(guid)
65 return rm.get_attributes()
67 def get_filters(self, guid):
68 rm = self.get_resource(guid)
69 return rm.get_filters()
71 def register_connection(self, guid1, guid2):
72 rm1 = self.get_resource(guid1)
73 rm2 = self.get_resource(guid2)
78 def discover_resource(self, guid, filters):
79 rm = self.get_resource(guid)
80 return rm.discover(filters)
82 def provision_resource(self, guid, filters):
83 rm = self.get_resource(guid)
84 return rm.provision(filters)
86 def register_start(self, group1, time, after_status, group2):
87 if isinstance(group1, int):
89 if isinstance(group2, int):
94 rm = self.get_resource(guid)
95 rm.start_after(time, after_status, guid2)
97 def register_stop(self, group1, time, after_status, group2):
98 if isinstance(group1, int):
100 if isinstance(group2, int):
101 group2 = list[group2]
105 rm = self.get_resource(guid)
106 rm.stop_after(time, after_status, guid2)
108 def register_set(self, name, value, group1, time, after_status, group2):
109 if isinstance(group1, int):
110 group1 = list[group1]
111 if isinstance(group2, int):
112 group2 = list[group2]
116 rm = self.get_resource(guid)
117 rm.set_after(name, value, time, after_status, guid2)
119 def get(self, guid, name):
120 rm = self.get_resource(guid)
123 def set(self, guid, name, value):
124 rm = self.get_resource(guid)
125 return rm.set(name, value)
127 def status(self, guid):
128 rm = self.get_resource(guid)
131 def stop(self, guid):
132 rm = self.get_resource(guid)
135 def deploy(self, group = None, start_when_all_ready = True):
137 group = self.resources
141 rm = self.get_resource(guid)
143 kwargs = {'target': rm.deploy}
144 if start_when_all_ready:
147 kwargs['args'] = towait
149 thread = threading.Thread(kwargs)
150 threads.append(thread)
153 for thread in threads:
156 def release(self, group = None):
158 group = self.resources
162 rm = self.get_resource(guid)
163 thread = threading.Thread(target=rm.release)
164 threads.append(thread)
167 for thread in threads:
177 if self._thread.is_alive():
180 def schedule(self, date, callback, track = False):
182 date string containing execution time for the task.
183 It can be expressed as an absolute time, using
184 timestamp format, or as a relative time matching
185 ^\d+.\d+(h|m|s|ms|us)$
187 callback code to be executed for the task. Must be a
188 Python function, and receives args and kwargs
191 track if set to True, the task will be retrivable with
192 the get_task() method
194 timestamp = strfvalid(date)
196 task = Task(timestamp, callback)
197 task = self._scheduler.schedule(task)
200 self._tasks[task.id] = task
202 # Notify condition to wake up the processing thread
210 runner = ParallelRun(maxthreads = 50)
214 while not self._stop:
216 task = self._scheduler.next()
220 # It there are not tasks in the tasks queue we need to
221 # wait until a call to schedule wakes us up
226 # If the task timestamp is in the future the thread needs to wait
227 # until time elapse or until another task is scheduled
229 if now < task.timestamp:
230 # Calculate time difference in seconds
231 timeout = strfdiff(task.timestamp, now)
232 # Re-schedule task with the same timestamp
233 self._scheduler.schedule(task)
234 # Sleep until timeout or until a new task awakes the condition
236 self._cond.wait(timeout)
239 # Process tasks in parallel
240 runner.put(self._execute, task)
243 err = traceback.format_exc()
244 self._logger.error("Error while processing tasks in the EC: %s" % err)
246 def _execute(self, task):
248 task.status = TaskStatus.DONE
251 task.result = task.callback()
254 err = traceback.format_exc()
255 self._logger.error("Error while executing event: %s" % err)
258 task.status = TaskStatus.ERROR