#
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
-import functools
-import logging
-import os
-import random
-import sys
-import time
-import threading
-
from nepi.util import guid
from nepi.util.parallel import ParallelRun
from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat
from nepi.execution.trace import TraceAttr
# TODO: use multiprocessing instead of threading
-# TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
# TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
+import functools
+import logging
+import os
+import random
+import sys
+import time
+import threading
+
class ECState(object):
""" State of the Experiment Controller
def wait_finished(self, guids):
""" Blocking method that wait until all the RM from the 'guid' list
- reached the state FINISHED
+ reached the state FINISHED ( or STOPPED, FAILED or RELEASED )
:param guids: List of guids
:type guids: list
def wait_started(self, guids):
""" Blocking method that wait until all the RM from the 'guid' list
- reached the state STARTED
+ reached the state STARTED ( or STOPPED, FINISHED, FAILED, RELEASED)
:param guids: List of guids
:type guids: list
"""
- return self.wait(guids, states = [ResourceState.STARTED,
- ResourceState.STOPPED,
- ResourceState.FAILED,
- ResourceState.FINISHED])
+ return self.wait(guids, state = ResourceState.STARTED)
def wait_released(self, guids):
""" Blocking method that wait until all the RM from the 'guid' list
- reached the state RELEASED
+ reached the state RELEASED (or FAILED)
+
+ :param guids: List of guids
+ :type guids: list
+ """
+ # TODO: solve state concurrency BUG and !!!!
+ # correct waited release state to state = ResourceState.FAILED)
+ return self.wait(guids, state = ResourceState.FINISHED)
+
+ def wait_deployed(self, guids):
+ """ Blocking method that wait until all the RM from the 'guid' list
+ reached the state READY (or any higher state)
:param guids: List of guids
:type guids: list
"""
- return self.wait(guids, states = [ResourceState.RELEASED,
- ResourceState.STOPPED,
- ResourceState.FAILED,
- ResourceState.FINISHED])
+ return self.wait(guids, state = ResourceState.READY)
- def wait(self, guids, states = [ResourceState.FINISHED,
- ResourceState.FAILED,
- ResourceState.STOPPED]):
+ def wait(self, guids, state = ResourceState.STOPPED):
""" Blocking method that waits until all the RM from the 'guid' list
reached state 'state' or until a failure occurs
# If a guid reached one of the target states, remove it from list
guid = guids[0]
- state = self.state(guid)
+ rstate = self.state(guid)
- if state in states:
+ if rstate >= state:
guids.remove(guid)
else:
# Debug...
- self.logger.debug(" WAITING FOR %g - state %s " % (guid,
- self.state(guid, hr = True)))
+ self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (guid,
+ self.state(guid, hr = True), state))
# Take the opportunity to 'refresh' the states of the RMs.
# Query only the first up to N guids (not to overwhelm
# If the guid is not in one of the target states, wait and
# continue quering. We keep the sleep big to decrease the
# number of RM state queries
- time.sleep(2)
+ time.sleep(4)
def get_task(self, tid):
""" Get a specific task
if isinstance(guids, int):
guids = [guids]
- # Create deployment group
+ # Create deployment group
+ new_group = False
if not group:
+ new_group = True
group = self._group_id_generator.next(guid)
if group not in self._groups:
rm = self.get_resource(guid)
self.schedule("0s", rm.start_with_conditions)
- if wait_all_ready:
+ if wait_all_ready and new_group:
# Schedule a function to check that all resources are
# READY, and only then schedule the start.
# This aimes at reducing the number of tasks looping in the