def __init__(self, path):
self.path = path
+def create_experiment_suite(xml, access_config, repetitions = None,
+ duration = None, wait_guids = None):
+ mode = None
+ if access_config :
+ (mode, launch, root_dir, log_level, communication, user, host, port,
+ key, agent, sudo, environment_setup, clean_root) \
+ = get_access_config_params(access_config)
+
+ if not mode or mode == DC.MODE_SINGLE_PROCESS:
+ from nepi.core.execute import ExperimentSuite
+ if not root_dir:
+ root_dir = TempDir()
+ else:
+ root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
+
+ exp_suite = ExperimentSuite(xml, access_config, repetitions, duration,
+ wait_guids)
+
+ # inject reference to temporary dir, so that it gets cleaned
+ # up at destruction time.
+ exp_suite._tempdir = root_dir
+ return exp_suite
+ elif mode == DC.MODE_DAEMON:
+ return ExperimentSuiteProxy(root_dir, log_level,
+ experiment_xml = xml,
+ repetitions = repetitions,
+ duration = duration,
+ wait_guids = wait_guids,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ ident_key = key,
+ agent = agent,
+ sudo = sudo,
+ launch = launch,
+ environment_setup = environment_setup,
+ clean_root = clean_root)
+ raise RuntimeError("Unsupported access configuration '%s'" % mode)
+
def create_experiment_controller(xml, access_config = None):
mode = None
launch = True
log_reply(self, reply)
return reply
+class ExperimentSuiteServer(BaseServer):
+ def __init__(self, root_dir, log_level,
+ xml, repetitions, duration, wait_guids,
+ communication = DC.ACCESS_LOCAL,
+ mode = None
+ host = None,
+ port = None,
+ user = None,
+ ident_key = None,
+ agent = None,
+ sudo = False,
+ environment_setup = "",
+ clean_root = False):
+ super(ExperimentSuiteServer, self).__init__(root_dir, log_level,
+ environment_setup = environment_setup, clean_root = clean_root)
+ access_config = AccessConfiguration()
+ access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
+ access_config.set_attribute_value(DC.LOG_LEVEL, log_level)
+ access_config.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, environment_setup)
+ if mode:
+ access_config.set_attribute_value(DC.DEPLOYMENT_MODE, mode)
+ if user:
+ access_config.set_attribute_value(DC.DEPLOYMENT_USER, user)
+ if host:
+ access_config.set_attribute_value(DC.DEPLOYMENT_HOST, host)
+ if port:
+ access_config.get_attribute_value(DC.DEPLOYMENT_PORT, port)
+ if agent:
+ access_config.get_attribute_value(DC.USE_AGENT, agent)
+ if sudo:
+ acess_config.get_attribute_value(DC.USE_SUDO, sudo)
+ if ident_key:
+ access_config.get_attribute_value(DC.DEPLOYMENT_KEY, ident_key)
+ if communication:
+ access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION, communication)
+ if clean_root:
+ access_config.get_attribute_value(DC.CLEAN_ROOT, clean_root)
+ self._experiment_xml = xml
+ self._duration = duration
+ self._repetitions = repetitions
+ self._wait_pids = wait_pids
+ self._access_config = access_config
+ self._experitment_suite = None
+
+ def post_daemonize(self):
+ from nepi.core.execute import ExperimentSuite
+ self._experiment_suite = ExperimentSuite(
+ self._experiment_xml, self._access_config,
+ self._repetitions, self._duration, self._wait_guids)
+
+ def post_daemonize(self):
+ pass
+
+ @Marshalling.handles(GUIDS)
+ @Marshalling.args()
+ @Marshalling.retval( Marshalling.pickled_data )
+ def guids(self):
+ return self._testbed.guids
+
+ @Marshalling.handles(TESTBED_ID)
+ @Marshalling.args()
+ @Marshalling.retval()
+ def testbed_id(self):
+ return str(self._testbed.testbed_id)
+
+
class TestbedControllerServer(BaseServer):
def __init__(self, root_dir, log_level, testbed_id, testbed_version,
environment_setup, clean_root):
setattr(template_class, dmethname, dmeth)
return rv
-
+
+class ExperimentSuiteProxy(BaseProxy):
+
+ _ServerClass = ExperimentSuiteServer
+
+ def __init__(self, root_dir, log_level,
+ xml, repetitions, duration, wait_guids,
+ communication = DC.ACCESS_LOCAL,
+ host = None,
+ port = None,
+ user = None,
+ ident_key = None,
+ agent = None,
+ sudo = False,
+ environment_setup = "",
+ clean_root = False):
+ super(TestbedControllerProxy,self).__init__(
+ ctor_args = (root_dir, log_level,
+ experiment_xml,
+ repetitions = repetitions,
+ duration = duration,
+ wait_guids = wait_guids,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ ident_key = key,
+ agent = agent,
+ sudo = sudo,
+ environment_setup = environment_setup,
+ clean_root = clean_root),
+ root_dir = root_dir,
+ True, #launch
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ ident_key = ident_key,
+ agent = agent,
+ sudo = sudo,
+ environment_setup = environment_setup)
+
+ locals().update( BaseProxy._make_stubs(
+ server_class = ExperimentSuiteServer,
+ template_class = nepi.core.execute.ExperimentSuite,
+ ) )
+
+ # Shutdown stops the serverside...
+ def shutdown(self, _stub = shutdown):
+ rv = _stub(self)
+ self._client.send_stop()
+ self._client.read_reply() # wait for it
+ return rv
+
class TestbedControllerProxy(BaseProxy):
_ServerClass = TestbedControllerServer
template_class = nepi.core.execute.ExperimentController,
) )
-
# Shutdown stops the serverside...
def shutdown(self, _stub = shutdown):
rv = _stub(self)