import base64
import nepi.core.execute
+import nepi.util.environ
from nepi.core.attributes import AttributesMap, Attribute
from nepi.util import server, validation
from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
import tempfile
import shutil
import functools
+import os
# PROTOCOL REPLIES
OK = 0
GET_TESTBED_ID = 39
GET_TESTBED_VERSION = 40
TRACES_INFO = 41
+EXEC_XML = 42
+TESTBED_STATUS = 43
+STARTED_TIME = 44
+STOPPED_TIME = 45
instruction_text = dict({
OK: "OK",
ERROR: "ERROR",
XML: "XML",
+ EXEC_XML: "EXEC_XML",
TRACE: "TRACE",
FINISHED: "FINISHED",
START: "START",
TESTBED_ID: "TESTBED_ID",
TESTBED_VERSION: "TESTBED_VERSION",
TRACES_INFO: "TRACES_INFO",
+ STARTED_TIME: "STARTED_TIME",
+ STOPPED_TIME: "STOPPED_TIME",
+
})
def log_msg(server, params):
def to_server_log_level(log_level):
return (
- server.DEBUG_LEVEL
+ DC.DEBUG_LEVEL
if log_level == DC.DEBUG_LEVEL
- else server.ERROR_LEVEL
+ else DC.ERROR_LEVEL
)
def get_access_config_params(access_config):
+ mode = access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
+ launch = not access_config.get_attribute_value(DC.RECOVER)
root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
log_level = to_server_log_level(log_level)
- user = host = port = agent = key = None
communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
environment_setup = (
access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
- else None
+ else ""
)
- if communication == DC.ACCESS_SSH:
- user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
- host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
- port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
- agent = access_config.get_attribute_value(DC.USE_AGENT)
- key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
- return (root_dir, log_level, user, host, port, key, agent, environment_setup)
+ user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
+ host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
+ port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
+ agent = access_config.get_attribute_value(DC.USE_AGENT)
+ sudo = access_config.get_attribute_value(DC.USE_SUDO)
+ key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
+ communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
+ clean_root = access_config.get_attribute_value(DC.CLEAN_ROOT)
+ return (mode, launch, root_dir, log_level, communication, user, host, port,
+ key, agent, sudo, environment_setup, clean_root)
class AccessConfiguration(AttributesMap):
def __init__(self, params = None):
from nepi.core.metadata import Metadata
- for _,attr_info in Metadata.DEPLOYMENT_ATTRIBUTES.iteritems():
+ for _,attr_info in Metadata.PROXY_ATTRIBUTES.iteritems():
self.add_attribute(**attr_info)
-
+
if params:
for attr_name, attr_value in params.iteritems():
parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
self.path = path
def create_experiment_controller(xml, access_config = None):
- mode = None if not access_config \
- else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
- launch = True if not access_config \
- else not access_config.get_attribute_value(DC.RECOVER)
+ mode = None
+ launch = True
+ log_level = DC.ERROR_LEVEL
+ if access_config:
+ (mode, launch, root_dir, log_level, communication, user, host, port,
+ key, agent, sudo, environment_setup, clean_root) \
+ = get_access_config_params(access_config)
+
+ os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level
+
if not mode or mode == DC.MODE_SINGLE_PROCESS:
- if not launch:
- raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
-
from nepi.core.execute import ExperimentController
if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
# up at destruction time.
controller._tempdir = root_dir
+ if not launch:
+ # try to recover
+ controller.recover()
+
return controller
elif mode == DC.MODE_DAEMON:
- (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
- get_access_config_params(access_config)
- return ExperimentControllerProxy(root_dir, log_level,
- experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
- agent = agent, launch = launch,
- environment_setup = environment_setup)
+ try:
+ return ExperimentControllerProxy(root_dir, log_level,
+ experiment_xml = xml,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ ident_key = key,
+ agent = agent,
+ sudo = sudo,
+ launch = launch,
+ environment_setup = environment_setup,
+ clean_root = clean_root)
+ except:
+ if not launch:
+ # Maybe controller died, recover from persisted testbed information if possible
+ controller = ExperimentControllerProxy(root_dir, log_level,
+ experiment_xml = xml,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ ident_key = key,
+ agent = agent,
+ sudo = sudo,
+ launch = True,
+ environment_setup = environment_setup,
+ clean_root = clean_root)
+ controller.recover()
+ return controller
+ else:
+ raise
raise RuntimeError("Unsupported access configuration '%s'" % mode)
def create_testbed_controller(testbed_id, testbed_version, access_config):
- mode = None if not access_config \
- else access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
- launch = True if not access_config \
- else not access_config.get_attribute_value(DC.RECOVER)
+ mode = None
+ launch = True
+ log_level = DC.ERROR_LEVEL
+ if access_config:
+ (mode, launch, root_dir, log_level, communication, user, host, port,
+ key, agent, sudo, environment_setup, clean_root) \
+ = get_access_config_params(access_config)
+
+ os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level
+
if not mode or mode == DC.MODE_SINGLE_PROCESS:
if not launch:
- raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
+ raise ValueError, "Unsupported instantiation mode: %s with launch=False" % (mode,)
return _build_testbed_controller(testbed_id, testbed_version)
elif mode == DC.MODE_DAEMON:
- (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
- get_access_config_params(access_config)
- return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id,
- testbed_version = testbed_version, host = host, port = port, ident_key = key,
- user = user, agent = agent, launch = launch,
- environment_setup = environment_setup)
+ return TestbedControllerProxy(root_dir, log_level,
+ testbed_id = testbed_id,
+ testbed_version = testbed_version,
+ communication = communication,
+ host = host,
+ port = port,
+ ident_key = key,
+ user = user,
+ agent = agent,
+ sudo = sudo,
+ launch = launch,
+ environment_setup = environment_setup,
+ clean_root = clean_root)
raise RuntimeError("Unsupported access configuration '%s'" % mode)
def _build_testbed_controller(testbed_id, testbed_version):
- mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
+ mod_name = nepi.util.environ.find_testbed(testbed_id)
+
if not mod_name in sys.modules:
- __import__(mod_name)
+ try:
+ __import__(mod_name)
+ except ImportError:
+ raise ImportError, "Cannot find module %s in %r" % (mod_name, sys.path)
+
module = sys.modules[mod_name]
- return module.TestbedController(testbed_version)
+ tc = module.TestbedController()
+ if tc.testbed_version != testbed_version:
+ raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
+ (testbed_id, testbed_version, tc.testbed_version))
+ return tc
# Just a namespace class
class Marshalling:
@staticmethod
def nullint(sdata):
return None if sdata == "None" else int(sdata)
-
+
@staticmethod
def bool(sdata):
return sdata == 'True'
return reply
class TestbedControllerServer(BaseServer):
- def __init__(self, root_dir, log_level, testbed_id, testbed_version):
- super(TestbedControllerServer, self).__init__(root_dir, log_level)
+ def __init__(self, root_dir, log_level, testbed_id, testbed_version,
+ environment_setup, clean_root):
+ super(TestbedControllerServer, self).__init__(root_dir, log_level,
+ environment_setup = environment_setup, clean_root = clean_root)
self._testbed_id = testbed_id
self._testbed_version = testbed_version
self._testbed = None
self._testbed.defer_add_trace(guid, trace_id)
@Marshalling.handles(ADD_ADDRESS)
- @Marshalling.args(int, str, int, str)
+ @Marshalling.args(int, str, int, Marshalling.pickled_data)
@Marshalling.retvoid
def defer_add_address(self, guid, address, netprefix, broadcast):
self._testbed.defer_add_address(guid, address, netprefix,
broadcast)
@Marshalling.handles(ADD_ROUTE)
- @Marshalling.args(int, str, int, str)
+ @Marshalling.args(int, str, int, str, int)
@Marshalling.retvoid
- def defer_add_route(self, guid, destination, netprefix, nexthop):
- self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
+ def defer_add_route(self, guid, destination, netprefix, nexthop, metric):
+ self._testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
@Marshalling.handles(DO_SETUP)
@Marshalling.args()
def status(self, guid):
return self._testbed.status(guid)
+ @Marshalling.handles(TESTBED_STATUS)
+ @Marshalling.args()
+ @Marshalling.retval(int)
+ def testbed_status(self):
+ return self._testbed.testbed_status()
+
@Marshalling.handles(GET_ATTRIBUTE_LIST)
- @Marshalling.args(int, int)
+ @Marshalling.args(int, Marshalling.nullint, Marshalling.bool)
@Marshalling.retval( Marshalling.pickled_data )
- def get_attribute_list(self, guid, filter_flags = None):
- return self._testbed.get_attribute_list(guid, filter_flags)
+ def get_attribute_list(self, guid, filter_flags = None, exclude = False):
+ return self._testbed.get_attribute_list(guid, filter_flags, exclude)
@Marshalling.handles(GET_FACTORY_ID)
@Marshalling.args(int)
def get_factory_id(self, guid):
return self._testbed.get_factory_id(guid)
+ @Marshalling.handles(RECOVER)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def recover(self):
+ self._testbed.recover()
+
+
class ExperimentControllerServer(BaseServer):
- def __init__(self, root_dir, log_level, experiment_xml):
- super(ExperimentControllerServer, self).__init__(root_dir, log_level)
+ def __init__(self, root_dir, log_level, experiment_xml, environment_setup,
+ clean_root):
+ super(ExperimentControllerServer, self).__init__(root_dir, log_level,
+ environment_setup = environment_setup, clean_root = clean_root)
self._experiment_xml = experiment_xml
self._experiment = None
def guids(self):
return self._experiment.guids
+ @Marshalling.handles(STARTED_TIME)
+ @Marshalling.args()
+ @Marshalling.retval( Marshalling.pickled_data )
+ def started_time(self):
+ return self._experiment.started_time
+
+ @Marshalling.handles(STOPPED_TIME)
+ @Marshalling.args()
+ @Marshalling.retval( Marshalling.pickled_data )
+ def stopped_time(self):
+ return self._experiment.stopped_time
+
@Marshalling.handles(XML)
@Marshalling.args()
@Marshalling.retval()
- def experiment_xml(self):
- return self._experiment.experiment_xml
+ def experiment_design_xml(self):
+ return self._experiment.experiment_design_xml
+
+ @Marshalling.handles(EXEC_XML)
+ @Marshalling.args()
+ @Marshalling.retval()
+ def experiment_execute_xml(self):
+ return self._experiment.experiment_execute_xml
@Marshalling.handles(TRACE)
@Marshalling.args(int, str, Marshalling.base64_data)
def is_finished(self, guid):
return self._experiment.is_finished(guid)
+ @Marshalling.handles(STATUS)
+ @Marshalling.args(int)
+ @Marshalling.retval(int)
+ def status(self, guid):
+ return self._experiment.is_finished(guid)
+
@Marshalling.handles(GET)
@Marshalling.args(int, Marshalling.base64_data, str)
@Marshalling.retval( Marshalling.pickled_data )
_ServerClass = None
_ServerClassModule = "nepi.util.proxy"
- def __init__(self,
- ctor_args, root_dir,
- launch = True, host = None,
- port = None, user = None, ident_key = None, agent = None,
- environment_setup = ""):
+ def __init__(self, ctor_args, root_dir,
+ launch = True,
+ communication = DC.ACCESS_LOCAL,
+ host = None,
+ port = None,
+ user = None,
+ ident_key = None,
+ agent = None,
+ sudo = False,
+ environment_setup = "",
+ clean_root = False):
if launch:
- # ssh
- if host != None:
- python_code = (
+ python_code = (
"from %(classmodule)s import %(classname)s;"
"s = %(classname)s%(ctor_args)r;"
"s.run()"
classmodule = self._ServerClassModule,
ctor_args = ctor_args
) )
- proc = server.popen_ssh_subprocess(python_code, host = host,
- port = port, user = user, agent = agent,
- ident_key = ident_key,
- environment_setup = environment_setup,
- waitcommand = True)
- if proc.poll():
- err = proc.stderr.read()
- raise RuntimeError, "Server could not be executed: %s" % (err,)
+ proc = server.popen_python(python_code,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ agent = agent,
+ ident_key = ident_key,
+ sudo = sudo,
+ environment_setup = environment_setup)
+ # Wait for the server to be ready, otherwise nobody
+ # will be able to connect to it
+ err = []
+ helo = "nope"
+ while helo:
+ helo = proc.stderr.readline()
+ if helo == 'SERVER_READY.\n':
+ break
+ err.append(helo)
else:
- # launch daemon
- s = self._ServerClass(*ctor_args)
- s.run()
-
+ raise AssertionError, "Expected 'SERVER_READY.', got: %s" % (''.join(err),)
# connect client to server
- self._client = server.Client(root_dir, host = host, port = port,
- user = user, agent = agent,
+ self._client = server.Client(root_dir,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ agent = agent,
+ sudo = sudo,
environment_setup = environment_setup)
@staticmethod
_ServerClass = TestbedControllerServer
- def __init__(self, root_dir, log_level, testbed_id = None,
- testbed_version = None, launch = True, host = None,
- port = None, user = None, ident_key = None, agent = None,
- environment_setup = ""):
+ def __init__(self, root_dir, log_level,
+ testbed_id = None,
+ testbed_version = None,
+ launch = True,
+ communication = DC.ACCESS_LOCAL,
+ host = None,
+ port = None,
+ user = None,
+ ident_key = None,
+ agent = None,
+ sudo = False,
+ environment_setup = "",
+ clean_root = False):
if launch and (testbed_id == None or testbed_version == None):
raise RuntimeError("To launch a TesbedControllerServer a "
"testbed_id and testbed_version are required")
super(TestbedControllerProxy,self).__init__(
- ctor_args = (root_dir, log_level, testbed_id, testbed_version),
+ ctor_args = (root_dir, log_level, testbed_id, testbed_version,
+ environment_setup, clean_root),
root_dir = root_dir,
- launch = launch, host = host, port = port, user = user,
- ident_key = ident_key, agent = agent,
+ launch = launch,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ ident_key = ident_key,
+ agent = agent,
+ sudo = sudo,
environment_setup = environment_setup)
locals().update( BaseProxy._make_stubs(
class ExperimentControllerProxy(BaseProxy):
_ServerClass = ExperimentControllerServer
- def __init__(self, root_dir, log_level, experiment_xml = None,
- launch = True, host = None, port = None, user = None,
- ident_key = None, agent = None, environment_setup = ""):
- if launch and experiment_xml is None:
- raise RuntimeError("To launch a ExperimentControllerServer a \
- xml description of the experiment is required")
+ def __init__(self, root_dir, log_level,
+ experiment_xml = None,
+ launch = True,
+ communication = DC.ACCESS_LOCAL,
+ host = None,
+ port = None,
+ user = None,
+ ident_key = None,
+ agent = None,
+ sudo = False,
+ environment_setup = "",
+ clean_root = False):
super(ExperimentControllerProxy,self).__init__(
- ctor_args = (root_dir, log_level, experiment_xml),
+ ctor_args = (root_dir, log_level, experiment_xml, environment_setup,
+ clean_root),
root_dir = root_dir,
- launch = launch, host = host, port = port, user = user,
- ident_key = ident_key, agent = agent,
- environment_setup = environment_setup)
+ launch = launch,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ ident_key = ident_key,
+ agent = agent,
+ sudo = sudo,
+ environment_setup = environment_setup,
+ clean_root = clean_root)
locals().update( BaseProxy._make_stubs(
server_class = ExperimentControllerServer,