from neco.util import guid
from neco.util.timefuncs import strfnow, strfdiff, strfvalid
-from neco.execution.resource import ResourceFactory
+from neco.execution.resource import ResourceFactory, ResourceAction, \
+ ResourceState
from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
from neco.util.parallel import ParallelRun
+# TODO: use multiprocessing instead of threading
+
class ExperimentController(object):
def __init__(self, root_dir = "/tmp", loglevel = 'error'):
super(ExperimentController, self).__init__()
rm1.connect(guid2)
rm2.connect(guid1)
- def discover_resource(self, guid, filters):
- rm = self.get_resource(guid)
- return rm.discover(filters)
+ def register_condition(self, group1, action, group2, state,
+ time = None):
+ """ Registers an action START or STOP for all RM on group1 to occur
+ time 'time' after all elements in group2 reached state 'state'.
- def provision_resource(self, guid, filters):
- rm = self.get_resource(guid)
- return rm.provision(filters)
+ :param group1: List of guids of RMs subjected to action
+ :type group1: list
- def register_start(self, group1, time, after_status, group2):
- if isinstance(group1, int):
- group1 = list[group1]
- if isinstance(group2, int):
- group2 = list[group2]
+ :param action: Action to register (either START or STOP)
+ :type action: ResourceAction
- for guid1 in group1:
- for guid2 in group2:
- rm = self.get_resource(guid)
- rm.start_after(time, after_status, guid2)
+ :param group2: List of guids of RMs to we waited for
+ :type group2: list
- def register_stop(self, group1, time, after_status, group2):
- if isinstance(group1, int):
- group1 = list[group1]
- if isinstance(group2, int):
- group2 = list[group2]
+ :param state: State to wait for on RMs (STARTED, STOPPED, etc)
+ :type state: ResourceState
- for guid1 in group1:
- for guid2 in group2:
- rm = self.get_resource(guid)
- rm.stop_after(time, after_status, guid2)
+ :param time: Time to wait after group2 has reached status
+ :type time: string
- def register_set(self, name, value, group1, time, after_status, group2):
+ """
if isinstance(group1, int):
group1 = list[group1]
if isinstance(group2, int):
group2 = list[group2]
for guid1 in group1:
- for guid2 in group2:
- rm = self.get_resource(guid)
- rm.set_after(name, value, time, after_status, guid2)
+ rm = self.get_resource(guid)
+ rm.register_condition(action, group2, state, time)
+
+ def discover(self, guid, filters):
+ rm = self.get_resource(guid)
+ return rm.discover(filters)
+
+ def provision(self, guid, filters):
+ rm = self.get_resource(guid)
+ return rm.provision(filters)
def get(self, guid, name):
rm = self.get_resource(guid)
rm = self.get_resource(guid)
return rm.set(name, value)
- def status(self, guid):
+ def state(self, guid):
rm = self.get_resource(guid)
- return rm.status()
+ return rm.state
def stop(self, guid):
rm = self.get_resource(guid)
return rm.stop()
- def deploy(self, group = None, start_when_all_ready = True):
+ def start(self, guid):
+ rm = self.get_resource(guid)
+ return rm.start()
+
+ def set_with_conditions(self, name, value, group1, group2, state,
+ time = None):
+ """ Set value 'value' on attribute with name 'name' on all RMs of
+ group1 when 'time' has elapsed since all elements in group2
+ have reached state 'state'.
+
+ :param name: Name of attribute to set in RM
+ :type name: string
+
+ :param value: Value of attribute to set in RM
+ :type name: string
+
+ :param group1: List of guids of RMs subjected to action
+ :type group1: list
+
+ :param action: Action to register (either START or STOP)
+ :type action: ResourceAction
+
+ :param group2: List of guids of RMs to we waited for
+ :type group2: list
+
+ :param state: State to wait for on RMs (STARTED, STOPPED, etc)
+ :type state: ResourceState
+
+ :param time: Time to wait after group2 has reached status
+ :type time: string
+
+ """
+ if isinstance(group1, int):
+ group1 = list[group1]
+ if isinstance(group2, int):
+ group2 = list[group2]
+
+ for guid1 in group1:
+ rm = self.get_resource(guid)
+ rm.set_with_conditions(name, value, group2, state, time)
+
+ def stop_with_conditions(self, guid):
+ rm = self.get_resource(guid)
+ return rm.stop_with_conditions()
+
+ def start_with_conditions(self, guid):
+ rm = self.get_resource(guid)
+ return rm.start_with_condition()
+
+ def deploy(self, group = None, wait_all_ready = True):
+ """ Deploy all resource manager in group
+
+ :param group: List of guids of RMs to deploy
+ :type group: list
+
+ :param wait_all_ready: Wait until all RMs are deployed in
+ order to start the RMs
+ :type guid: int
+
+ """
+ def steps(rm):
+ rm.deploy()
+ rm.start_with_conditions()
+
+ # Only if the RM has STOP consitions we
+ # schedule a stop. Otherwise the RM will stop immediately
+ if rm.conditions.get(ResourceAction.STOP):
+ rm.stop_with_conditions()
+
if not group:
group = self.resources
for guid in group:
rm = self.get_resource(guid)
- kwargs = {'target': rm.deploy}
- if start_when_all_ready:
+ if wait_all_ready:
towait = list(group)
towait.remove(guid)
- kwargs['args'] = towait
+ self.register_condition(guid, ResourceAction.START,
+ towait, ResourceState.DEPLOYED)
- thread = threading.Thread(kwargs)
+ thread = threading.Thread(target = steps, args = (rm))
threads.append(thread)
thread.start()