+ def deploy(self, group = None, start_when_all_ready = True):
+ if not group:
+ group = self.resources
+
+ threads = []
+ for guid in group:
+ rm = self._resources(guid1)
+
+ kwargs = {'target': rm.deploy}
+ if start_when_all_ready:
+ towait = list(group)
+ towait.remove(guid)
+ kwargs['args'] = towait
+
+ thread = threading.Thread(kwargs)
+ threads.append(thread)
+ thread.start()
+
+ for thread in threads:
+ thread.join()
+
+ def release(self, group = None):
+ if not group:
+ group = self.resources
+
+ threads = []
+ for guid in group:
+ rm = self._resources(guid1)
+ thread = threading.Thread(target=rm.release)
+ threads.append(thread)
+ thread.start()
+
+ for thread in threads:
+ thread.join()
+
+ def shutdown(self):
+ self._stop = False
+ self.release()
+
+ def schedule(self, date, callback):
+ """
+ date string containing execution time for the task.
+ It can be expressed as an absolute time, using
+ timestamp format, or as a relative time matching
+ ^\d+.\d+(h|m|s|ms|us)$
+
+ callback code to be executed for the task. Must be a
+ Python function, and receives args and kwargs
+ as arguments.
+ """
+ timestamp = strfvalid(date)
+
+ task = Task(timestamp, callback)
+ task = self._scheduler.schedule(task)
+
+ # Notify condition to wake up the processing thread
+ self._cond.acquire()
+ self._cond.notify()
+ self._cond.release()
+ return task
+
+ def _process(self):
+ runner = ParallelRun(maxthreads = 50)
+ runner.start()
+
+ try:
+ while not self._stop:
+ self._cond.acquire()
+ task = self._scheduler.next()
+ self._cond.release()
+
+ if not task:
+ # It there are not tasks in the tasks queue we need to
+ # wait until a call to schedule wakes us up
+ self._cond.acquire()
+ self._cond.wait()
+ self._cond.release()
+ else:
+ # If the task timestamp is in the future the thread needs to wait
+ # until time elapse or until another task is scheduled
+ now = strfnow()
+ if now < task.timestamp:
+ # Calculate time difference in seconds
+ timeout = strfdiff(task.timestamp, now)
+ # Re-schedule task with the same timestamp
+ self._scheduler.schedule(task)
+ # Sleep until timeout or until a new task awakes the condition
+ self._cond.acquire()
+ self._cond.wait(timeout)
+ self._cond.release()
+ else:
+ # Process tasks in parallel
+ runner.put(task.callback)
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self._logger.error("Error while processing tasks in the EC: %s" % err)
+