import logging
import os
+import random
import sys
import time
import threading
from neco.execution.trace import TraceAttr
# TODO: use multiprocessing instead of threading
-# TODO: Improve speed. Too slow... !!
+# TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
class ECState(object):
RUNNING = 1
def wait_finished(self, guids):
while not all([self.state(guid) == ResourceState.FINISHED \
for guid in guids]) and not self.finished:
- time.sleep(1)
+ # We keep the sleep as large as possible to
+ # decrese the number of RM state requests
+ time.sleep(2)
def get_task(self, tid):
return self._tasks.get(tid)
"""
self.logger.debug(" ------- DEPLOY START ------ ")
+ stop = []
+
def steps(rm):
- rm.deploy()
- rm.start_with_conditions()
+ try:
+ rm.deploy()
+ rm.start_with_conditions()
+
+ # Only if the RM has STOP conditions we
+ # schedule a stop. Otherwise the RM will stop immediately
+ if rm.conditions.get(ResourceAction.STOP):
+ rm.stop_with_conditions()
+ except:
+ import traceback
+ err = traceback.format_exc()
+
+ self._logger.error("Error occurred while deploying resources: %s" % err)
- # Only if the RM has STOP consitions we
- # schedule a stop. Otherwise the RM will stop immediately
- if rm.conditions.get(ResourceAction.STOP):
- rm.stop_with_conditions()
+ # stop deployment
+ stop.append(None)
if not group:
group = self.resources
+ # Before starting deployment we disorder the group list with the
+ # purpose of speeding up the whole deployment process.
+ # It is likely that the user inserted in the 'group' list closely
+ # resources resources one after another (e.g. all applications
+ # connected to the same node can likely appear one after another).
+ # This can originate a slow down in the deployment since the N
+ # threads the parallel runner uses to processes tasks may all
+ # be taken up by the same family of resources waiting for the
+ # same conditions.
+ # If we disorder the group list, this problem can be mitigated
+ random.shuffle(group)
+
threads = []
for guid in group:
rm = self.get_resource(guid)
thread.setDaemon(True)
thread.start()
- while list(threads) and not self.finished:
+ while list(threads) and not self.finished and not stop:
thread = threads[0]
# Time out after 5 seconds to check EC not terminated
- thread.join(5)
+ thread.join(1)
if not thread.is_alive():
threads.remove(thread)
+ if stop:
+ # stop the scheduler
+ self._stop_scheduler()
+
+ if self._thread.is_alive():
+ self._thread.join()
+
+ raise RuntimeError, "Error occurred, interrupting deployment "
+
def release(self, group = None):
if not group:
group = self.resources
thread.join(5)
if not thread.is_alive():
threads.remove(thread)
-
- self._state = ECState.TERMINATED
def shutdown(self):
self.release()
-
- self._cond.acquire()
- self._cond.notify()
- self._cond.release()
+ self._stop_scheduler()
+
if self._thread.is_alive():
self._thread.join()
self._logger.error("Error while processing tasks in the EC: %s" % err)
self._state = ECState.FAILED
- return
+ finally:
+ runner.sync()
# Mark EC state as terminated
if self.ecstate == ECState.RUNNING:
self._logger.error("Error occurred while executing task: %s" % err)
- # Mark the EC as failed
- self._state = ECState.FAILED
-
- # Wake up the EC in case it was sleeping
- self._cond.acquire()
- self._cond.notify()
- self._cond.release()
+ self._stop_scheduler()
# Propage error to the ParallelRunner
raise
+ def _stop_scheduler(self):
+ # Mark the EC as failed
+ self._state = ECState.FAILED
+
+ # Wake up the EC in case it was sleeping
+ self._cond.acquire()
+ self._cond.notify()
+ self._cond.release()
+
+