-from neco.execution.attribute import Attribute, Flags
-from neco.execution.resource import ResourceManager, clsinit, ResourceState
-from neco.resources.linux import rpmfuncs, debfuncs
-from neco.util import sshfuncs, execfuncs
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+from nepi.execution.attribute import Attribute, Flags
+from nepi.execution.resource import ResourceManager, clsinit, ResourceState
+from nepi.resources.linux import rpmfuncs, debfuncs
+from nepi.util import sshfuncs, execfuncs
+from nepi.util.sshfuncs import ProcStatus
import collections
-import logging
import os
import random
import re
reschedule_delay = "0.5s"
+class ExitCode:
+ """
+ Error codes that the rexitcode function can return if unable to
+ check the exit code of a spawned process
+ """
+ FILENOTFOUND = -1
+ CORRUPTFILE = -2
+ ERROR = -3
+ OK = 0
+
+class OSType:
+ """
+ Supported flavors of Linux OS
+ """
+ FEDORA_12 = "f12"
+ FEDORA_14 = "f14"
+ FEDORA = "fedora"
+ UBUNTU = "ubuntu"
+ DEBIAN = "debian"
@clsinit
class LinuxNode(ResourceManager):
# lock to avoid concurrency issues on methods used by applications
self._lock = threading.Lock()
-
- self._logger = logging.getLogger("LinuxNode")
def log_message(self, msg):
return " guid %d - host %s - %s " % (self.guid,
raise RuntimeError, "%s - %s - %s" %( msg, out, err )
if out.find("Fedora release 12") == 0:
- self._os = "f12"
+ self._os = OSType.FEDORA_12
elif out.find("Fedora release 14") == 0:
- self._os = "f14"
+ self._os = OSType.FEDORA_14
elif out.find("Debian") == 0:
- self._os = "debian"
+ self._os = OSType.DEBIAN
elif out.find("Ubuntu") ==0:
- self._os = "ubuntu"
+ self._os = OSType.UBUNTU
else:
msg = "Unsupported OS"
self.error(msg, out)
def localhost(self):
return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
- def provision(self, filters = None):
+ def provision(self):
if not self.is_alive():
self._state = ResourceState.FAILED
msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
def deploy(self):
if self.state == ResourceState.NEW:
try:
- self.discover()
- self.provision()
+ self.discover()
+ self.provision()
except:
self._state = ResourceState.FAILED
raise
# Node needs to wait until all associated interfaces are
# ready before it can finalize deployment
- from neco.resources.linux.interface import LinuxInterface
+ from nepi.resources.linux.interface import LinuxInterface
ifaces = self.get_connected(LinuxInterface.rtype())
for iface in ifaces:
if iface.state < ResourceState.READY:
src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
return self.copy(src, dst)
- def install_packages(self, packages, home = None):
- home = home or self.node_home
-
- cmd = ""
- if self.os in ["f12", "f14"]:
- cmd = rpmfuncs.install_packages_command(self.os, packages)
- elif self.os in ["debian", "ubuntu"]:
- cmd = debfuncs.install_packages_command(self.os, packages)
+ def install_packages(self, packages, home):
+ command = ""
+ if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
+ command = rpmfuncs.install_packages_command(self.os, packages)
+ elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
+ command = debfuncs.install_packages_command(self.os, packages)
else:
msg = "Error installing packages ( OS not known ) "
self.error(msg, self.os)
raise RuntimeError, msg
out = err = ""
- (out, err), proc = self.run_and_wait(cmd, home,
- pidfile = "instpkg_pid",
- stdout = "instpkg_out",
- stderr = "instpkg_err",
+ (out, err), proc = self.run_and_wait(command, home,
+ shfile = "instpkg.sh",
+ pidfile = "instpkg_pidfile",
+ ecodefile = "instpkg_exitcode",
+ stdout = "instpkg_stdout",
+ stderr = "instpkg_stderr",
raise_on_error = True)
return (out, err), proc
- def remove_packages(self, packages, home = None):
- home = home or self.node_home
-
- cmd = ""
- if self.os in ["f12", "f14"]:
- cmd = rpmfuncs.remove_packages_command(self.os, packages)
- elif self.os in ["debian", "ubuntu"]:
- cmd = debfuncs.remove_packages_command(self.os, packages)
+ def remove_packages(self, packages, home):
+ command = ""
+ if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
+ command = rpmfuncs.remove_packages_command(self.os, packages)
+ elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
+ command = debfuncs.remove_packages_command(self.os, packages)
else:
msg = "Error removing packages ( OS not known ) "
self.error(msg)
raise RuntimeError, msg
out = err = ""
- (out, err), proc = self.run_and_wait(cmd, home,
- pidfile = "rmpkg_pid",
- stdout = "rmpkg_out",
- stderr = "rmpkg_err",
+ (out, err), proc = self.run_and_wait(command, home,
+ shfile = "rmpkg.sh",
+ pidfile = "rmpkg_pidfile",
+ ecodefile = "rmpkg_exitcode",
+ stdout = "rmpkg_stdout",
+ stderr = "rmpkg_stderr",
raise_on_error = True)
return (out, err), proc
def rmdir(self, path):
return self.execute("rm -rf %s" % path, with_lock = True)
-
- def run_and_wait(self, command,
- home = ".",
- pidfile = "pid",
+
+ def run_and_wait(self, command, home,
+ shfile = "cmd.sh",
+ env = None,
+ pidfile = "pidfile",
+ ecodefile = "exitcode",
stdin = None,
- stdout = 'stdout',
- stderr = 'stderr',
+ stdout = "stdout",
+ stderr = "stderr",
sudo = False,
tty = False,
raise_on_error = False):
- """ runs a command in background on the remote host, but waits
- until the command finishes execution.
- This is more robust than doing a simple synchronized 'execute',
- since in the remote host the command can continue to run detached
- even if network disconnections occur
+ """
+ runs a command in background on the remote host, busy-waiting
+ until the command finishes execution.
+ This is more robust than doing a simple synchronized 'execute',
+ since in the remote host the command can continue to run detached
+ even if network disconnections occur
"""
+ self.upload_command(command, home,
+ shfile = shfile,
+ ecodefile = ecodefile,
+ env = env)
+
+ command = "bash ./%s" % shfile
# run command in background in remote host
(out, err), proc = self.run(command, home,
pidfile = pidfile,
# wait until command finishes to execute
self.wait_run(pid, ppid)
-
- # check if execution errors occurred
- (out, err), proc = self.check_output(home, stderr)
+
+ (out, err), proc = self.check_errors(home, ecodefile, stderr)
- if err or out:
+ # Out is what was written in the stderr file
+ if out or err:
msg = " Failed to run command '%s' " % command
self.error(msg, out, err)
raise RuntimeError, msg
return (out, err), proc
+
+ def exitcode(self, home, ecodefile = "exitcode"):
+ """
+ Get the exit code of an application.
+ Returns an integer value with the exit code
+ """
+ (out, err), proc = self.check_output(home, ecodefile)
+
+ # Succeeded to open file, return exit code in the file
+ if proc.wait() == 0:
+ try:
+ return int(out.strip())
+ except:
+ # Error in the content of the file!
+ return ExitCode.CORRUPTFILE
+
+ # No such file or directory
+ if proc.returncode == 1:
+ return ExitCode.FILENOTFOUND
+
+ # Other error from 'cat'
+ return ExitCode.ERROR
+
+ def upload_command(self, command, home,
+ shfile = "cmd.sh",
+ ecodefile = "exitcode",
+ env = None):
+ """ Saves the command as a bash script file in the remote host, and
+ forces to save the exit code of the command execution to the ecodefile
+ """
+
+ # The exit code of the command will be stored in ecodefile
+ command = " %(command)s ; echo $? > %(ecodefile)s ;" % {
+ 'command': command,
+ 'ecodefile': ecodefile,
+ }
+
+ # Export environment
+ environ = "\n".join(map(lambda e: "export %s" % e, env.split(" "))) + "\n" \
+ if env else ""
+
+ # Add environ to command
+ command = environ + command
+
+ dst = os.path.join(home, shfile)
+ return self.upload(command, dst, text = True)
+
+ def check_errors(self, home,
+ ecodefile = "exitcode",
+ stderr = "stderr"):
+ """
+ Checks whether errors occurred while running a command.
+ It first checks the exit code for the command, and only if the
+ exit code is an error one it returns the error output.
+ """
+ out = err = ""
+ proc = None
+
+ # get Exit code
+ ecode = self.exitcode(home, ecodefile)
+
+ if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
+ err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
+ elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
+ # The process returned an error code or didn't exist.
+ # Check standard error.
+ (out, err), proc = self.check_output(home, stderr)
+
+ # If the stderr file was not found, assume nothing happened.
+ # We just ignore the error.
+ # (cat returns 1 for error "No such file or directory")
+ if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
+ out = err = ""
+
+ return (out, err), proc
- def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
+ def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
""" Waits until the pid file for the command is generated,
and returns the pid and ppid of the process """
pid = ppid = None
delay = 1.0
- for i in xrange(5):
- pidtuple = self.checkpid(home = home, pidfile = pidfile)
+
+ for i in xrange(4):
+ pidtuple = self.getpid(home = home, pidfile = pidfile)
if pidtuple:
pid, ppid = pidtuple
break
else:
time.sleep(delay)
- delay = min(30,delay*1.2)
+ delay = delay * 1.5
else:
msg = " Failed to get pid for pidfile %s/%s " % (
home, pidfile )
def wait_run(self, pid, ppid, trial = 0):
""" wait for a remote process to finish execution """
- delay = 1.0
- first = True
- bustspin = 0
+ start_delay = 1.0
while True:
status = self.status(pid, ppid)
- if status is sshfuncs.FINISHED:
+ if status is ProcStatus.FINISHED:
break
- elif status is not sshfuncs.RUNNING:
- bustspin += 1
- time.sleep(delay*(5.5+random.random()))
- if bustspin > 12:
+ elif status is not ProcStatus.RUNNING:
+ delay = delay * 1.5
+ time.sleep(delay)
+ # If it takes more than 20 seconds to start, then
+ # asume something went wrong
+ if delay > 20:
break
else:
- if first:
- first = False
-
- time.sleep(delay*(0.5+random.random()))
- delay = min(30,delay*1.2)
- bustspin = 0
+ # The app is running, just wait...
+ time.sleep(0.5)
def check_output(self, home, filename):
- """ checks file content """
+ """ Retrives content of file """
(out, err), proc = self.execute("cat %s" %
os.path.join(home, filename), retry = 1, with_lock = True)
return (out, err), proc
def copy(self, src, dst):
if self.localhost:
- (out, err), proc = execfuncs.lcopy(source, dest,
+ (out, err), proc = execfuncs.lcopy(source, dest,
recursive = True,
strict_host_checking = False)
else:
connect_timeout = 30,
strict_host_checking = False,
persistent = True,
+ blocking = True,
with_lock = False
):
""" Notice that this invocation will block until the
err_on_timeout = err_on_timeout,
connect_timeout = connect_timeout,
persistent = persistent,
+ blocking = blocking,
strict_host_checking = strict_host_checking
)
else:
retry = retry,
err_on_timeout = err_on_timeout,
connect_timeout = connect_timeout,
- persistent = persistent
+ persistent = persistent,
+ blocking = blocking,
+ strict_host_checking = strict_host_checking
)
return (out, err), proc
- def run(self, command,
- home = None,
+ def run(self, command, home,
create_home = False,
- pidfile = "pid",
+ pidfile = 'pidfile',
stdin = None,
stdout = 'stdout',
stderr = 'stderr',
sudo = False,
tty = False):
-
+
self.debug("Running command '%s'" % command)
if self.localhost:
sudo = sudo,
user = user)
else:
- # Start process in a "daemonized" way, using nohup and heavy
- # stdin/out redirection to avoid connection issues
with self._lock:
- (out,err), proc = sshfuncs.rspawn(
+ (out, err), proc = sshfuncs.rspawn(
command,
pidfile = pidfile,
home = home,
return (out, err), proc
- def checkpid(self, home = ".", pidfile = "pid"):
+ def getpid(self, home, pidfile = "pidfile"):
if self.localhost:
- pidtuple = execfuncs.lcheckpid(os.path.join(home, pidfile))
+ pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
else:
with self._lock:
- pidtuple = sshfuncs.rcheckpid(
+ pidtuple = sshfuncs.rgetpid(
os.path.join(home, pidfile),
host = self.get("hostname"),
user = self.get("username"),
)
return pidtuple
-
+
def status(self, pid, ppid):
if self.localhost:
status = execfuncs.lstatus(pid, ppid)
proc = None
status = self.status(pid, ppid)
- if status == sshfuncs.RUNNING:
+ if status == sshfuncs.ProcStatus.RUNNING:
if self.localhost:
(out, err), proc = execfuncs.lkill(pid, ppid, sudo)
else:
identity = self.get("identity"),
server_key = self.get("serverKey")
)
- return (out, err), proc
- def check_bad_host(self, out, err):
- badre = re.compile(r'(?:'
- r'|Error: disk I/O error'
- r')',
- re.I)
- return badre.search(out) or badre.search(err)
-
- def blacklist(self):
- # TODO!!!!
- self.warn(" Blacklisting malfunctioning node ")
- #import util
- #util.appendBlacklist(self.hostname)
+ return (out, err), proc