+"""
+ NEPI, a framework to manage network experiments
+ Copyright (C) 2013 INRIA
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+
+import functools
import logging
import os
import random
"""
self.logger.debug(" ------- DEPLOY START ------ ")
- stop = []
-
- def steps(rm):
- try:
- rm.deploy()
- rm.start_with_conditions()
-
- # Only if the RM has STOP conditions we
- # schedule a stop. Otherwise the RM will stop immediately
- if rm.conditions.get(ResourceAction.STOP):
- rm.stop_with_conditions()
- except:
- import traceback
- err = traceback.format_exc()
-
- self._logger.error("Error occurred while deploying resources: %s" % err)
-
- # stop deployment
- stop.append(None)
-
if not group:
group = self.resources
# Before starting deployment we disorder the group list with the
# purpose of speeding up the whole deployment process.
# It is likely that the user inserted in the 'group' list closely
- # resources resources one after another (e.g. all applications
+ # resources one after another (e.g. all applications
# connected to the same node can likely appear one after another).
# This can originate a slow down in the deployment since the N
# threads the parallel runner uses to processes tasks may all
# be taken up by the same family of resources waiting for the
- # same conditions.
- # If we disorder the group list, this problem can be mitigated
+ # same conditions (e.g. LinuxApplications running on a same
+ # node share a single lock, so they will tend to be serialized).
+ # If we disorder the group list, this problem can be mitigated.
random.shuffle(group)
- threads = []
+ def wait_all_and_start(group):
+ reschedule = False
+ for guid in group:
+ rm = self.get_resource(guid)
+ if rm.state < ResourceState.READY:
+ reschedule = True
+ break
+
+ if reschedule:
+ callback = functools.partial(wait_all_and_start, group)
+ self.schedule("1s", callback)
+ else:
+ # If all resources are read, we schedule the start
+ for guid in group:
+ rm = self.get_resource(guid)
+ self.schedule("0.01s", rm.start_with_conditions)
+
+ if wait_all_ready:
+ # Schedule the function that will check all resources are
+ # READY, and only then it will schedule the start.
+ # This is aimed to reduce the number of tasks looping in the scheduler.
+ # Intead of having N start tasks, we will have only one
+ callback = functools.partial(wait_all_and_start, group)
+ self.schedule("1s", callback)
+
for guid in group:
rm = self.get_resource(guid)
+ self.schedule("0.001s", rm.deploy)
- if wait_all_ready:
- towait = list(group)
- towait.remove(guid)
- self.register_condition(guid, ResourceAction.START,
- towait, ResourceState.READY)
-
- thread = threading.Thread(target = steps, args = (rm,))
- threads.append(thread)
- thread.setDaemon(True)
- thread.start()
-
- while list(threads) and not self.finished and not stop:
- thread = threads[0]
- # Time out after 5 seconds to check EC not terminated
- thread.join(1)
- if not thread.is_alive():
- threads.remove(thread)
-
- if stop:
- # stop the scheduler
- self._stop_scheduler()
+ if not wait_all_ready:
+ self.schedule("1s", rm.start_with_conditions)
- if self._thread.is_alive():
- self._thread.join()
+ if rm.conditions.get(ResourceAction.STOP):
+ # Only if the RM has STOP conditions we
+ # schedule a stop. Otherwise the RM will stop immediately
+ self.schedule("2s", rm.stop_with_conditions)
- raise RuntimeError, "Error occurred, interrupting deployment "
def release(self, group = None):
if not group:
thread.join(5)
if not thread.is_alive():
threads.remove(thread)
-
+
def shutdown(self):
self.release()
self._cond.acquire()
task = self._scheduler.next()
self._cond.release()
-
+
if not task:
# It there are not tasks in the tasks queue we need to
# wait until a call to schedule wakes us up
else:
# Process tasks in parallel
runner.put(self._execute, task)
-
except:
import traceback
err = traceback.format_exc()
self._logger.error("Error while processing tasks in the EC: %s" % err)
self._state = ECState.FAILED
- finally:
- runner.sync()
# Mark EC state as terminated
if self.ecstate == ECState.RUNNING:
+ # Synchronize to get errors if occurred
+ runner.sync()
self._state = ECState.TERMINATED
def _execute(self, task):