from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.resource import ResourceManager, clsinit_copy, \
- ResourceState, reschedule_delay
+ ResourceState
from nepi.resources.linux import rpmfuncs, debfuncs
from nepi.util import sshfuncs, execfuncs
from nepi.util.sshfuncs import ProcStatus
"""
Supported flavors of Linux OS
"""
- FEDORA_8 = "f8"
- FEDORA_12 = "f12"
- FEDORA_14 = "f14"
- FEDORA = "fedora"
- UBUNTU = "ubuntu"
- DEBIAN = "debian"
+ DEBIAN = 1
+ UBUNTU = 1 << 1
+ FEDORA = 1 << 2
+ FEDORA_8 = 1 << 3 | FEDORA
+ FEDORA_12 = 1 << 4 | FEDORA
+ FEDORA_14 = 1 << 5 | FEDORA
@clsinit_copy
class LinuxNode(ResourceManager):
source compilation, file download, etc)
"""
- _rtype = "LinuxNode"
+ _rtype = "linux::Node"
_help = "Controls Linux host machines ( either localhost or a host " \
"that can be accessed using a SSH key)"
- _backend_type = "linux"
+ _platform = "linux"
@classmethod
def _register_attributes(cls):
hostname = Attribute("hostname", "Hostname of the machine",
- flags = Flags.ExecReadOnly)
+ flags = Flags.Design)
username = Attribute("username", "Local account username",
flags = Flags.Credential)
- port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
+ port = Attribute("port", "SSH port", flags = Flags.Design)
home = Attribute("home",
"Experiment home directory to store all experiment related files",
- flags = Flags.ExecReadOnly)
+ flags = Flags.Design)
identity = Attribute("identity", "SSH identity file",
flags = Flags.Credential)
server_key = Attribute("serverKey", "Server public key",
- flags = Flags.ExecReadOnly)
+ flags = Flags.Design)
clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
" from node home folder before starting experiment",
type = Types.Bool,
default = False,
- flags = Flags.ExecReadOnly)
+ flags = Flags.Design)
clean_experiment = Attribute("cleanExperiment", "Remove all files and directories "
" from a previous same experiment, before the new experiment starts",
type = Types.Bool,
default = False,
- flags = Flags.ExecReadOnly)
+ flags = Flags.Design)
clean_processes = Attribute("cleanProcesses",
"Kill all running processes before starting experiment",
type = Types.Bool,
default = False,
- flags = Flags.ExecReadOnly)
+ flags = Flags.Design)
tear_down = Attribute("tearDown", "Bash script to be executed before " + \
"releasing the resource",
- flags = Flags.ExecReadOnly)
+ flags = Flags.Design)
gateway_user = Attribute("gatewayUser", "Gateway account username",
- flags = Flags.ExecReadOnly)
+ flags = Flags.Design)
gateway = Attribute("gateway", "Hostname of the gateway machine",
- flags = Flags.ExecReadOnly)
+ flags = Flags.Design)
+
+ ip = Attribute("ip", "Linux host public IP address. "
+ "Must not be modified by the user unless hostname is 'localhost'",
+ flags = Flags.Design)
cls._register_attribute(hostname)
cls._register_attribute(username)
cls._register_attribute(tear_down)
cls._register_attribute(gateway_user)
cls._register_attribute(gateway)
+ cls._register_attribute(ip)
def __init__(self, ec, guid):
super(LinuxNode, self).__init__(ec, guid)
home = os.path.join(self._home_dir, home)
return home
+ @property
+ def nepi_home(self):
+ return os.path.join(self.home_dir, ".nepi")
+
@property
def usr_dir(self):
- return os.path.join(self.home_dir, "nepi-usr")
+ return os.path.join(self.nepi_home, "nepi-usr")
@property
def lib_dir(self):
@property
def exp_dir(self):
- return os.path.join(self.home_dir, "nepi-exp")
+ return os.path.join(self.nepi_home, "nepi-exp")
@property
def exp_home(self):
if self._os:
return self._os
- if (not self.get("hostname") or not self.get("username")):
+ if not self.localhost and not self.get("username"):
msg = "Can't resolve OS, insufficient data "
self.error(msg)
raise RuntimeError, msg
out = self.get_os()
- if out.find("Fedora release 8") == 0:
- self._os = OSType.FEDORA_8
- elif out.find("Fedora release 12") == 0:
- self._os = OSType.FEDORA_12
- elif out.find("Fedora release 14") == 0:
- self._os = OSType.FEDORA_14
- elif out.find("Fedora release") == 0:
- self._os = OSType.FEDORA
- elif out.find("Debian") == 0:
+ if out.find("Debian") == 0:
self._os = OSType.DEBIAN
elif out.find("Ubuntu") ==0:
self._os = OSType.UBUNTU
+ elif out.find("Fedora release") == 0:
+ self._os = OSType.FEDORA
+ if out.find("Fedora release 8") == 0:
+ self._os = OSType.FEDORA_8
+ elif out.find("Fedora release 12") == 0:
+ self._os = OSType.FEDORA_12
+ elif out.find("Fedora release 14") == 0:
+ self._os = OSType.FEDORA_14
else:
msg = "Unsupported OS"
self.error(msg, out)
@property
def use_deb(self):
- return self.os in [OSType.DEBIAN, OSType.UBUNTU]
+ return (self.os & (OSType.DEBIAN|OSType.UBUNTU))
@property
def use_rpm(self):
- return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
- OSType.FEDORA]
+ return (self.os & OSType.FEDORA)
@property
def localhost(self):
- return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
+ return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
def do_provision(self):
# check if host is alive
if self.get("cleanExperiment"):
self.clean_experiment()
- # Create shared directory structure
- self.mkdir(self.lib_dir)
- self.mkdir(self.bin_dir)
- self.mkdir(self.src_dir)
- self.mkdir(self.share_dir)
+ # Create shared directory structure and node home directory
+ paths = [self.lib_dir,
+ self.bin_dir,
+ self.src_dir,
+ self.share_dir,
+ self.node_home]
+
+ self.mkdir(paths)
- # Create experiment node home directory
- self.mkdir(self.node_home)
+ # Get Public IP address if possible
+ if not self.get("ip"):
+ try:
+ ip = sshfuncs.gethostbyname(self.get("hostname"))
+ except:
+ msg = "DNS can not resolve hostname %s" % self.get("hostname")
+ self.debug(msg)
+
+ self.set("ip", ip)
super(LinuxNode, self).do_provision()
ifaces = self.get_connected(LinuxInterface.get_rtype())
for iface in ifaces:
if iface.state < ResourceState.READY:
- self.ec.schedule(reschedule_delay, self.deploy)
+ self.ec.schedule(self.reschedule_delay, self.deploy)
return
super(LinuxNode, self).do_deploy()
# Node needs to wait until all associated RMs are released
# before it can be released
if rm.state != ResourceState.RELEASED:
- self.ec.schedule(reschedule_delay, self.release)
+ self.ec.schedule(self.reschedule_delay, self.release)
return
tear_down = self.get("tearDown")
def clean_processes(self):
self.info("Cleaning up processes")
-
+
+ if self.localhost:
+ return
+
if self.get("username") != 'root':
cmd = ("sudo -S killall tcpdump || /bin/true ; " +
- "sudo -S kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
+ "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
"sudo -S killall -u %s || /bin/true ; " % self.get("username"))
else:
if self.state >= ResourceState.READY:
import pickle
- pids = pickle.load(open("save.proc", "rb"))
+ pids = pickle.load(open("/tmp/save.proc", "rb"))
pids_temp = dict()
ps_aux = "ps aux |awk '{print $2,$11}'"
(out, err), proc = self.execute(ps_aux)
- for line in out.strip().split("\n"):
- parts = line.strip().split(" ")
- pids_temp[parts[0]] = parts[1]
- kill_pids = set(pids_temp.items()) - set(pids.items())
- kill_pids = ' '.join(dict(kill_pids).keys())
-
- cmd = ("killall tcpdump || /bin/true ; " +
- "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
- "kill %s || /bin/true ; " % kill_pids)
+ if len(out) != 0:
+ for line in out.strip().split("\n"):
+ parts = line.strip().split(" ")
+ pids_temp[parts[0]] = parts[1]
+ kill_pids = set(pids_temp.items()) - set(pids.items())
+ kill_pids = ' '.join(dict(kill_pids).keys())
+
+ cmd = ("killall tcpdump || /bin/true ; " +
+ "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
+ "kill %s || /bin/true ; " % kill_pids)
+ else:
+ cmd = ("killall tcpdump || /bin/true ; " +
+ "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
else:
cmd = ("killall tcpdump || /bin/true ; " +
- "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
+ "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
- out = err = ""
(out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
def clean_home(self):
"""
self.info("Cleaning up home")
- cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
+ cmd = "cd %s ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {} + " % (
self.home_dir )
return self.execute(cmd, with_lock = True)
def execute(self, command,
sudo = False,
- stdin = None,
env = None,
tty = False,
forward_x11 = False,
- timeout = None,
retry = 3,
- err_on_timeout = True,
connect_timeout = 30,
strict_host_checking = False,
persistent = True,
(out, err), proc = execfuncs.lexec(command,
user = self.get("username"), # still problem with localhost
sudo = sudo,
- stdin = stdin,
env = env)
else:
if with_lock:
+ # If the execute command is blocking, we don't want to keep
+ # the node lock. This lock is used to avoid race conditions
+ # when creating the ControlMaster sockets. A more elegant
+ # solution is needed.
with self._node_lock:
(out, err), proc = sshfuncs.rexec(
command,
gw = self.get("gateway"),
agent = True,
sudo = sudo,
- stdin = stdin,
identity = self.get("identity"),
server_key = self.get("serverKey"),
env = env,
tty = tty,
forward_x11 = forward_x11,
- timeout = timeout,
retry = retry,
- err_on_timeout = err_on_timeout,
connect_timeout = connect_timeout,
persistent = persistent,
blocking = blocking,
gw = self.get("gateway"),
agent = True,
sudo = sudo,
- stdin = stdin,
identity = self.get("identity"),
server_key = self.get("serverKey"),
env = env,
tty = tty,
forward_x11 = forward_x11,
- timeout = timeout,
retry = retry,
- err_on_timeout = err_on_timeout,
connect_timeout = connect_timeout,
persistent = persistent,
blocking = blocking,
stdout = 'stdout',
stderr = 'stderr',
sudo = False,
- tty = False):
+ tty = False,
+ strict_host_checking = False):
self.debug("Running command '%s'" % command)
if self.localhost:
- (out, err), proc = execfuncs.lspawn(command, pidfile,
- stdout = stdout,
- stderr = stderr,
- stdin = stdin,
+ (out, err), proc = execfuncs.lspawn(command, pidfile,
home = home,
create_home = create_home,
- sudo = sudo,
- user = user)
+ stdin = stdin or '/dev/null',
+ stdout = stdout or '/dev/null',
+ stderr = stderr or '/dev/null',
+ sudo = sudo)
else:
with self._node_lock:
(out, err), proc = sshfuncs.rspawn(
pidfile = pidfile,
home = home,
create_home = create_home,
- stdin = stdin if stdin is not None else '/dev/null',
- stdout = stdout if stdout else '/dev/null',
- stderr = stderr if stderr else '/dev/null',
+ stdin = stdin or '/dev/null',
+ stdout = stdout or '/dev/null',
+ stderr = stderr or '/dev/null',
sudo = sudo,
host = self.get("hostname"),
user = self.get("username"),
agent = True,
identity = self.get("identity"),
server_key = self.get("serverKey"),
- tty = tty
+ tty = tty,
+ strict_host_checking = strict_host_checking
)
return (out, err), proc
gw = self.get("gateway"),
agent = True,
identity = self.get("identity"),
- server_key = self.get("serverKey")
+ server_key = self.get("serverKey"),
+ strict_host_checking = False
)
return pidtuple
gw = self.get("gateway"),
agent = True,
identity = self.get("identity"),
- server_key = self.get("serverKey")
+ server_key = self.get("serverKey"),
+ strict_host_checking = False
)
return status
agent = True,
sudo = sudo,
identity = self.get("identity"),
- server_key = self.get("serverKey")
+ server_key = self.get("serverKey"),
+ strict_host_checking = False
)
return (out, err), proc
def copy(self, src, dst):
if self.localhost:
- (out, err), proc = execfuncs.lcopy(source, dest,
- recursive = True,
- strict_host_checking = False)
+ (out, err), proc = execfuncs.lcopy(src, dst,
+ recursive = True)
else:
with self._node_lock:
(out, err), proc = sshfuncs.rcopy(
return (out, err), proc
- def upload(self, src, dst, text = False, overwrite = True):
+ def upload(self, src, dst, text = False, overwrite = True,
+ raise_on_error = True):
""" Copy content to destination
- src content to copy. Can be a local file, directory or a list of files
+ src string with the content to copy. Can be:
+ - plain text
+ - a string with the path to a local file
+ - a string with a semi-colon separeted list of local files
+ - a string with a local directory
- dst destination path on the remote host (remote is always self.host)
+ dst string with destination path on the remote host (remote is
+ always self.host)
- text src is text input, it must be stored into a temp file before uploading
+ text src is text input, it must be stored into a temp file before
+ uploading
"""
# If source is a string input
f = None
src = f.name
# If dst files should not be overwritten, check that the files do not
- # exits already
+ # exits already
+ if isinstance(src, str):
+ src = map(str.strip, src.split(";"))
+
if overwrite == False:
src = self.filter_existing_files(src, dst)
if not src:
- return ("", ""), None
+ return ("", ""), None
if not self.localhost:
# Build destination as <user>@<server>:<path>
dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
- result = self.copy(src, dst)
+ ((out, err), proc) = self.copy(src, dst)
# clean up temp file
if f:
os.remove(f.name)
- return result
+ if err:
+ msg = " Failed to upload files - src: %s dst: %s" % (";".join(src), dst)
+ self.error(msg, out, err)
+
+ msg = "%s out: %s err: %s" % (msg, out, err)
+ if raise_on_error:
+ raise RuntimeError, msg
+
+ return ((out, err), proc)
- def download(self, src, dst):
+ def download(self, src, dst, raise_on_error = True):
if not self.localhost:
# Build destination as <user>@<server>:<path>
src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
- return self.copy(src, dst)
+
+ ((out, err), proc) = self.copy(src, dst)
+
+ if err:
+ msg = " Failed to download files - src: %s dst: %s" % (";".join(src), dst)
+ self.error(msg, out, err)
+
+ if raise_on_error:
+ raise RuntimeError, msg
+
+ return ((out, err), proc)
def install_packages_command(self, packages):
command = ""
return command
- def install_packages(self, packages, home, run_home = None):
+ def install_packages(self, packages, home, run_home = None,
+ raise_on_error = True):
""" Install packages in the Linux host.
'home' is the directory to upload the package installation script.
stdout = "instpkg_stdout",
stderr = "instpkg_stderr",
overwrite = False,
- raise_on_error = True)
+ raise_on_error = raise_on_error)
return (out, err), proc
- def remove_packages(self, packages, home, run_home = None):
+ def remove_packages(self, packages, home, run_home = None,
+ raise_on_error = True):
""" Uninstall packages from the Linux host.
'home' is the directory to upload the package un-installation script.
stdout = "rmpkg_stdout",
stderr = "rmpkg_stderr",
overwrite = False,
- raise_on_error = True)
+ raise_on_error = raise_on_error)
return (out, err), proc
- def mkdir(self, path, clean = False):
+ def mkdir(self, paths, clean = False):
+ """ Paths is either a single remote directory path to create,
+ or a list of directories to create.
+ """
if clean:
- self.rmdir(path)
+ self.rmdir(paths)
+
+ if isinstance(paths, str):
+ paths = [paths]
+
+ cmd = " ; ".join(map(lambda path: "mkdir -p %s" % path, paths))
+
+ return self.execute(cmd, with_lock = True)
- return self.execute("mkdir -p %s" % path, with_lock = True)
+ def rmdir(self, paths):
+ """ Paths is either a single remote directory path to delete,
+ or a list of directories to delete.
+ """
+
+ if isinstance(paths, str):
+ paths = [paths]
- def rmdir(self, path):
- return self.execute("rm -rf %s" % path, with_lock = True)
+ cmd = " ; ".join(map(lambda path: "rm -rf %s" % path, paths))
+
+ return self.execute(cmd, with_lock = True)
def run_and_wait(self, command, home,
- shfile = "cmd.sh",
- env = None,
- overwrite = True,
- pidfile = "pidfile",
- ecodefile = "exitcode",
- stdin = None,
- stdout = "stdout",
- stderr = "stderr",
- sudo = False,
- tty = False,
- raise_on_error = False):
+ shfile="cmd.sh",
+ env=None,
+ overwrite=True,
+ wait_run=True,
+ pidfile="pidfile",
+ ecodefile="exitcode",
+ stdin=None,
+ stdout="stdout",
+ stderr="stderr",
+ sudo=False,
+ tty=False,
+ raise_on_error=True):
"""
Uploads the 'command' to a bash script in the host.
Then runs the script detached in background in the host, and
pidfile = pidfile,
raise_on_error = raise_on_error)
- # wait until command finishes to execute
- self.wait_run(pid, ppid)
-
- (eout, err), proc = self.check_errors(home,
- ecodefile = ecodefile,
- stderr = stderr)
+ if wait_run:
+ # wait until command finishes to execute
+ self.wait_run(pid, ppid)
+
+ (eout, err), proc = self.check_errors(home,
+ ecodefile = ecodefile,
+ stderr = stderr)
- # Out is what was written in the stderr file
- if err:
- msg = " Failed to run command '%s' " % command
- self.error(msg, eout, err)
+ # Out is what was written in the stderr file
+ if err:
+ msg = " Failed to run command '%s' " % command
+ self.error(msg, eout, err)
- if raise_on_error:
- raise RuntimeError, msg
+ if raise_on_error:
+ raise RuntimeError, msg
(out, oerr), proc = self.check_output(home, stdout)
return (out, err), proc
-
+
def exitcode(self, home, ecodefile = "exitcode"):
"""
Get the exit code of an application.
return ExitCode.ERROR
def upload_command(self, command,
- shfile = "cmd.sh",
- ecodefile = "exitcode",
- overwrite = True,
- env = None):
+ shfile="cmd.sh",
+ ecodefile="exitcode",
+ overwrite=True,
+ env=None):
""" Saves the command as a bash script file in the remote host, and
forces to save the exit code of the command execution to the ecodefile
"""
# Add environ to command
command = environ + command
- return self.upload(command, shfile, text = True, overwrite = overwrite)
+ return self.upload(command, shfile, text=True, overwrite=overwrite)
- def format_environment(self, env, inline = False):
+ def format_environment(self, env, inline=False):
""" Formats the environment variables for a command to be executed
either as an inline command
(i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
""" Removes files that already exist in the Linux host from src list
"""
# construct a dictionary with { dst: src }
- dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ), x ),
- src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
+ dests = dict(map(lambda s: (os.path.join(dst, os.path.basename(s)), s), src)) \
+ if len(src) > 1 else dict({dst: src[0]})
command = []
for d in dests.keys():
del dests[d]
if not dests:
- return ""
+ return []
- return " ".join(dests.values())
+ return dests.values()