GET_TESTBED_VERSION = 40
TRACES_INFO = 41
EXEC_XML = 42
+TESTBED_STATUS = 43
+STARTED_TIME = 44
+STOPPED_TIME = 45
instruction_text = dict({
OK: "OK",
TESTBED_ID: "TESTBED_ID",
TESTBED_VERSION: "TESTBED_VERSION",
TRACES_INFO: "TRACES_INFO",
+ STARTED_TIME: "STARTED_TIME",
+ STOPPED_TIME: "STOPPED_TIME",
+
})
def log_msg(server, params):
def to_server_log_level(log_level):
return (
- server.DEBUG_LEVEL
+ DC.DEBUG_LEVEL
if log_level == DC.DEBUG_LEVEL
- else server.ERROR_LEVEL
+ else DC.ERROR_LEVEL
)
def get_access_config_params(access_config):
+ mode = access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
+ launch = not access_config.get_attribute_value(DC.RECOVER)
root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
log_level = to_server_log_level(log_level)
- user = host = port = agent = key = None
communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
environment_setup = (
access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
else ""
)
- if communication == DC.ACCESS_SSH:
- user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
- host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
- port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
- agent = access_config.get_attribute_value(DC.USE_AGENT)
- key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
- return (root_dir, log_level, user, host, port, key, agent, environment_setup)
+ user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
+ host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
+ port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
+ agent = access_config.get_attribute_value(DC.USE_AGENT)
+ sudo = access_config.get_attribute_value(DC.USE_SUDO)
+ key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
+ communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
+ clean_root = access_config.get_attribute_value(DC.CLEAN_ROOT)
+ return (mode, launch, root_dir, log_level, communication, user, host, port,
+ key, agent, sudo, environment_setup, clean_root)
class AccessConfiguration(AttributesMap):
def __init__(self, params = None):
self.path = path
def create_experiment_controller(xml, access_config = None):
- mode = None if not access_config \
- else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
- launch = True if not access_config \
- else not access_config.get_attribute_value(DC.RECOVER)
+ mode = None
+ launch = True
+ log_level = DC.ERROR_LEVEL
+ if access_config:
+ (mode, launch, root_dir, log_level, communication, user, host, port,
+ key, agent, sudo, environment_setup, clean_root) \
+ = get_access_config_params(access_config)
+
+ os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level
+
if not mode or mode == DC.MODE_SINGLE_PROCESS:
from nepi.core.execute import ExperimentController
return controller
elif mode == DC.MODE_DAEMON:
- (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
- get_access_config_params(access_config)
try:
return ExperimentControllerProxy(root_dir, log_level,
- experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
- agent = agent, launch = launch,
- environment_setup = environment_setup)
+ experiment_xml = xml,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ ident_key = key,
+ agent = agent,
+ sudo = sudo,
+ launch = launch,
+ environment_setup = environment_setup,
+ clean_root = clean_root)
except:
if not launch:
# Maybe controller died, recover from persisted testbed information if possible
controller = ExperimentControllerProxy(root_dir, log_level,
- experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
- agent = agent, launch = True,
- environment_setup = environment_setup)
+ experiment_xml = xml,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ ident_key = key,
+ agent = agent,
+ sudo = sudo,
+ launch = True,
+ environment_setup = environment_setup,
+ clean_root = clean_root)
controller.recover()
return controller
else:
raise RuntimeError("Unsupported access configuration '%s'" % mode)
def create_testbed_controller(testbed_id, testbed_version, access_config):
- mode = None if not access_config \
- else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
- launch = True if not access_config \
- else not access_config.get_attribute_value(DC.RECOVER)
+ mode = None
+ launch = True
+ log_level = DC.ERROR_LEVEL
+ if access_config:
+ (mode, launch, root_dir, log_level, communication, user, host, port,
+ key, agent, sudo, environment_setup, clean_root) \
+ = get_access_config_params(access_config)
+
+ os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level
+
if not mode or mode == DC.MODE_SINGLE_PROCESS:
if not launch:
- raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
+ raise ValueError, "Unsupported instantiation mode: %s with launch=False" % (mode,)
return _build_testbed_controller(testbed_id, testbed_version)
elif mode == DC.MODE_DAEMON:
- (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
- get_access_config_params(access_config)
- return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id,
- testbed_version = testbed_version, host = host, port = port, ident_key = key,
- user = user, agent = agent, launch = launch,
- environment_setup = environment_setup)
+ return TestbedControllerProxy(root_dir, log_level,
+ testbed_id = testbed_id,
+ testbed_version = testbed_version,
+ communication = communication,
+ host = host,
+ port = port,
+ ident_key = key,
+ user = user,
+ agent = agent,
+ sudo = sudo,
+ launch = launch,
+ environment_setup = environment_setup,
+ clean_root = clean_root)
raise RuntimeError("Unsupported access configuration '%s'" % mode)
def _build_testbed_controller(testbed_id, testbed_version):
@staticmethod
def nullint(sdata):
return None if sdata == "None" else int(sdata)
-
+
@staticmethod
def bool(sdata):
return sdata == 'True'
return reply
class TestbedControllerServer(BaseServer):
- def __init__(self, root_dir, log_level, testbed_id, testbed_version, environment_setup):
+ def __init__(self, root_dir, log_level, testbed_id, testbed_version,
+ environment_setup, clean_root):
super(TestbedControllerServer, self).__init__(root_dir, log_level,
- environment_setup = environment_setup )
+ environment_setup = environment_setup, clean_root = clean_root)
self._testbed_id = testbed_id
self._testbed_version = testbed_version
self._testbed = None
def status(self, guid):
return self._testbed.status(guid)
+ @Marshalling.handles(TESTBED_STATUS)
+ @Marshalling.args()
+ @Marshalling.retval(int)
+ def testbed_status(self):
+ return self._testbed.testbed_status()
+
@Marshalling.handles(GET_ATTRIBUTE_LIST)
@Marshalling.args(int, Marshalling.nullint, Marshalling.bool)
@Marshalling.retval( Marshalling.pickled_data )
class ExperimentControllerServer(BaseServer):
- def __init__(self, root_dir, log_level, experiment_xml, environment_setup):
+ def __init__(self, root_dir, log_level, experiment_xml, environment_setup,
+ clean_root):
super(ExperimentControllerServer, self).__init__(root_dir, log_level,
- environment_setup = environment_setup )
+ environment_setup = environment_setup, clean_root = clean_root)
self._experiment_xml = experiment_xml
self._experiment = None
def guids(self):
return self._experiment.guids
+ @Marshalling.handles(STARTED_TIME)
+ @Marshalling.args()
+ @Marshalling.retval( Marshalling.pickled_data )
+ def started_time(self):
+ return self._experiment.started_time
+
+ @Marshalling.handles(STOPPED_TIME)
+ @Marshalling.args()
+ @Marshalling.retval( Marshalling.pickled_data )
+ def stopped_time(self):
+ return self._experiment.stopped_time
+
@Marshalling.handles(XML)
@Marshalling.args()
@Marshalling.retval()
_ServerClass = None
_ServerClassModule = "nepi.util.proxy"
- def __init__(self,
- ctor_args, root_dir,
- launch = True, host = None,
- port = None, user = None, ident_key = None, agent = None,
- environment_setup = ""):
+ def __init__(self, ctor_args, root_dir,
+ launch = True,
+ communication = DC.ACCESS_LOCAL,
+ host = None,
+ port = None,
+ user = None,
+ ident_key = None,
+ agent = None,
+ sudo = False,
+ environment_setup = "",
+ clean_root = False):
if launch:
- # ssh
- if host != None:
- python_code = (
+ python_code = (
"from %(classmodule)s import %(classname)s;"
"s = %(classname)s%(ctor_args)r;"
"s.run()"
classmodule = self._ServerClassModule,
ctor_args = ctor_args
) )
- proc = server.popen_ssh_subprocess(python_code, host = host,
- port = port, user = user, agent = agent,
- ident_key = ident_key,
- environment_setup = environment_setup,
- waitcommand = True)
- if proc.poll():
- err = proc.stderr.read()
- raise RuntimeError, "Server could not be executed: %s" % (err,)
+ proc = server.popen_python(python_code,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ agent = agent,
+ ident_key = ident_key,
+ sudo = sudo,
+ environment_setup = environment_setup)
+ # Wait for the server to be ready, otherwise nobody
+ # will be able to connect to it
+ err = []
+ helo = "nope"
+ while helo:
+ helo = proc.stderr.readline()
+ if helo == 'SERVER_READY.\n':
+ break
+ err.append(helo)
else:
- # launch daemon
- s = self._ServerClass(*ctor_args)
- s.run()
-
+ raise AssertionError, "Expected 'SERVER_READY.', got: %s" % (''.join(err),)
# connect client to server
- self._client = server.Client(root_dir, host = host, port = port,
- user = user, agent = agent,
+ self._client = server.Client(root_dir,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ agent = agent,
+ sudo = sudo,
environment_setup = environment_setup)
@staticmethod
_ServerClass = TestbedControllerServer
- def __init__(self, root_dir, log_level, testbed_id = None,
- testbed_version = None, launch = True, host = None,
- port = None, user = None, ident_key = None, agent = None,
- environment_setup = ""):
+ def __init__(self, root_dir, log_level,
+ testbed_id = None,
+ testbed_version = None,
+ launch = True,
+ communication = DC.ACCESS_LOCAL,
+ host = None,
+ port = None,
+ user = None,
+ ident_key = None,
+ agent = None,
+ sudo = False,
+ environment_setup = "",
+ clean_root = False):
if launch and (testbed_id == None or testbed_version == None):
raise RuntimeError("To launch a TesbedControllerServer a "
"testbed_id and testbed_version are required")
super(TestbedControllerProxy,self).__init__(
- ctor_args = (root_dir, log_level, testbed_id, testbed_version, environment_setup),
+ ctor_args = (root_dir, log_level, testbed_id, testbed_version,
+ environment_setup, clean_root),
root_dir = root_dir,
- launch = launch, host = host, port = port, user = user,
- ident_key = ident_key, agent = agent,
+ launch = launch,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ ident_key = ident_key,
+ agent = agent,
+ sudo = sudo,
environment_setup = environment_setup)
locals().update( BaseProxy._make_stubs(
class ExperimentControllerProxy(BaseProxy):
_ServerClass = ExperimentControllerServer
- def __init__(self, root_dir, log_level, experiment_xml = None,
- launch = True, host = None, port = None, user = None,
- ident_key = None, agent = None, environment_setup = ""):
+ def __init__(self, root_dir, log_level,
+ experiment_xml = None,
+ launch = True,
+ communication = DC.ACCESS_LOCAL,
+ host = None,
+ port = None,
+ user = None,
+ ident_key = None,
+ agent = None,
+ sudo = False,
+ environment_setup = "",
+ clean_root = False):
super(ExperimentControllerProxy,self).__init__(
- ctor_args = (root_dir, log_level, experiment_xml, environment_setup),
+ ctor_args = (root_dir, log_level, experiment_xml, environment_setup,
+ clean_root),
root_dir = root_dir,
- launch = launch, host = host, port = port, user = user,
- ident_key = ident_key, agent = agent,
- environment_setup = environment_setup)
+ launch = launch,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ ident_key = ident_key,
+ agent = agent,
+ sudo = sudo,
+ environment_setup = environment_setup,
+ clean_root = clean_root)
locals().update( BaseProxy._make_stubs(
server_class = ExperimentControllerServer,