6 from neco.util import guid
7 from neco.util.timefuncs import strfnow, strfdiff, strfvalid
8 from neco.execution.resource import ResourceFactory
9 from neco.execution.scheduler import HeapScheduler, Task
10 from neco.util.parallel import ParallelRun
12 class ExperimentController(object):
13 def __init__(self, root_dir = "/tmp", loglevel = 'error'):
14 super(ExperimentController, self).__init__()
15 # root directory to store files
16 self._root_dir = root_dir
18 # generator of globally unique ids
19 self._guid_generator = guid.GuidGenerator()
22 self._resources = dict()
25 self._scheduler = HeapScheduler()
27 # Event processing thread
29 self._cond = threading.Condition()
30 self._thread = threading.Thread(target = self._process)
34 self._logger = logging.getLogger("neco.execution.ec")
35 self._logger.setLevel(getattr(logging, loglevel.upper()))
37 def resource(self, guid):
38 return self._resources.get(guid)
42 return self._resources.keys()
44 def register_resource(self, rtype, guid = None, creds = None):
45 # Get next available guid
46 guid = self._guid_generator.next(guid)
49 rm = ResourceFactory.create(rtype, self, guid, creds)
52 self._resources[guid] = rm
56 def get_attributes(self, guid):
57 rm = self._resources[guid]
58 return rm.get_attributes()
60 def get_filters(self, guid):
61 rm = self._resources[guid]
62 return rm.get_filters()
64 def register_connection(self, guid1, guid2):
65 rm1 = self._resources[guid1]
66 rm2 = self._resources[guid2]
71 def discover_resource(self, guid, filters):
72 rm = self._resources[guid]
73 return rm.discover(filters)
75 def provision_resource(self, guid, filters):
76 rm = self._resources[guid]
77 return rm.provision(filters)
79 def register_start(self, group1, time, after_status, group2):
80 if isinstance(group1, int):
82 if isinstance(group2, int):
87 rm = self._resources(guid1)
88 rm.start_after(time, after_status, guid2)
90 def register_stop(self, group1, time, after_status, group2):
91 if isinstance(group1, int):
93 if isinstance(group2, int):
98 rm = self._resources(guid1)
99 rm.stop_after(time, after_status, guid2)
101 def register_set(self, name, value, group1, time, after_status, group2):
102 if isinstance(group1, int):
103 group1 = list[group1]
104 if isinstance(group2, int):
105 group2 = list[group2]
109 rm = self._resources(guid1)
110 rm.set_after(name, value, time, after_status, guid2)
112 def get(self, guid, name):
113 rm = self._resources(guid)
116 def set(self, guid, name, value):
117 rm = self._resources(guid)
118 return rm.set(name, value)
120 def status(self, guid):
121 rm = self._resources(guid)
124 def stop(self, guid):
125 rm = self._resources(guid)
128 def deploy(self, group = None, start_when_all_ready = True):
130 group = self.resources
134 rm = self._resources(guid1)
136 kwargs = {'target': rm.deploy}
137 if start_when_all_ready:
140 kwargs['args'] = towait
142 thread = threading.Thread(kwargs)
143 threads.append(thread)
146 for thread in threads:
149 def release(self, group = None):
151 group = self.resources
155 rm = self._resources(guid1)
156 thread = threading.Thread(target=rm.release)
157 threads.append(thread)
160 for thread in threads:
167 def schedule(self, date, callback):
169 date string containing execution time for the task.
170 It can be expressed as an absolute time, using
171 timestamp format, or as a relative time matching
172 ^\d+.\d+(h|m|s|ms|us)$
174 callback code to be executed for the task. Must be a
175 Python function, and receives args and kwargs
178 timestamp = strfvalid(date)
180 task = Task(timestamp, callback)
181 task = self._scheduler.schedule(task)
183 # Notify condition to wake up the processing thread
190 runner = ParallelRun(maxthreads = 50)
194 while not self._stop:
196 task = self._scheduler.next()
200 # It there are not tasks in the tasks queue we need to
201 # wait until a call to schedule wakes us up
206 # If the task timestamp is in the future the thread needs to wait
207 # until time elapse or until another task is scheduled
209 if now < task.timestamp:
210 # Calculate time difference in seconds
211 timeout = strfdiff(task.timestamp, now)
212 # Re-schedule task with the same timestamp
213 self._scheduler.schedule(task)
214 # Sleep until timeout or until a new task awakes the condition
216 self._cond.wait(timeout)
219 # Process tasks in parallel
220 runner.put(task.callback)
223 err = traceback.format_exc()
224 self._logger.error("Error while processing tasks in the EC: %s" % err)