-from neco.execution.resource import Resource
-from neco.util.sshfuncs import eintr_retry, rexec, rcopy, \
- rspawn, rcheck_pid, rstatus, rkill, make_control_path, RUNNING
+from neco.execution.attribute import Attribute, Flags
+from neco.execution.resource import ResourceManager, clsinit, ResourceState
+from neco.resources.linux import rpmfuncs, debfuncs
+from neco.util import sshfuncs, execfuncs
-import cStringIO
+import collections
import logging
-import os.path
-import subprocess
-
-class LinuxNode(Resource):
- def __init__(self, box, ec):
- super(LinuxNode, self).__init__(box, ec)
- self.ip = None
- self.host = None
- self.user = None
- self.port = None
- self.identity_file = None
- self.enable_x11 = False
- self.forward_agent = True
-
- # packet management system - either yum or apt for now...
- self._pm = None
-
- # Logging
- loglevel = "debug"
- self._logger = logging.getLogger("neco.resources.base.LinuxNode.%s" %\
- self.box.guid)
- self._logger.setLevel(getattr(logging, loglevel.upper()))
-
- # For ssh connections we use the ControlMaster option which
- # allows us to decrease the number of open ssh network connections.
- # Subsequent ssh connections will reuse a same master connection.
- # This might pose a problem when using X11 and ssh-agent, since
- # display and agent forwarded will be those of the first connection,
- # which created the master.
- # To avoid reusing a master created by a previous LinuxNode instance,
- # we explicitly erase the ControlPath socket.
- control_path = make_control_path(self.user, self.host, self.port)
- try:
- os.remove(control_path)
- except:
- pass
+import os
+import random
+import re
+import tempfile
+import time
+import threading
+
+# TODO: Verify files and dirs exists already
+# TODO: Blacklist nodes!
+# TODO: Unify delays!!
+# TODO: Validate outcome of uploads!!
+
+reschedule_delay = "0.5s"
+
+
+@clsinit
+class LinuxNode(ResourceManager):
+ _rtype = "LinuxNode"
+
+ @classmethod
+ def _register_attributes(cls):
+ hostname = Attribute("hostname", "Hostname of the machine",
+ flags = Flags.ExecReadOnly)
+
+ username = Attribute("username", "Local account username",
+ flags = Flags.Credential)
+
+ port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
+
+ home = Attribute("home",
+ "Experiment home directory to store all experiment related files",
+ flags = Flags.ExecReadOnly)
+
+ identity = Attribute("identity", "SSH identity file",
+ flags = Flags.Credential)
+
+ server_key = Attribute("serverKey", "Server public key",
+ flags = Flags.ExecReadOnly)
+
+ clean_home = Attribute("cleanHome", "Remove all files and directories " + \
+ " from home folder before starting experiment",
+ flags = Flags.ExecReadOnly)
+
+ clean_processes = Attribute("cleanProcesses",
+ "Kill all running processes before starting experiment",
+ flags = Flags.ExecReadOnly)
+
+ tear_down = Attribute("tearDown", "Bash script to be executed before " + \
+ "releasing the resource",
+ flags = Flags.ExecReadOnly)
+
+ cls._register_attribute(hostname)
+ cls._register_attribute(username)
+ cls._register_attribute(port)
+ cls._register_attribute(home)
+ cls._register_attribute(identity)
+ cls._register_attribute(server_key)
+ cls._register_attribute(clean_home)
+ cls._register_attribute(clean_processes)
+ cls._register_attribute(tear_down)
+
+ def __init__(self, ec, guid):
+ super(LinuxNode, self).__init__(ec, guid)
+ self._os = None
+
+ # lock to avoid concurrency issues on methods used by applications
+ self._lock = threading.Lock()
+
+ self._logger = logging.getLogger("LinuxNode")
+
+ def log_message(self, msg):
+ return " guid %d - host %s - %s " % (self.guid,
+ self.get("hostname"), msg)
@property
- def pm(self):
- if self._pm:
- return self._pm
+ def home(self):
+ return self.get("home") or ""
- if (not (self.host or self.ip) or not self.user):
- msg = "Can't resolve package management system. Insufficient data."
- self._logger.error(msg)
- raise RuntimeError(msg)
+ @property
+ def exp_home(self):
+ return os.path.join(self.home, self.ec.exp_id)
- out = self.execute("cat /etc/issue")
+ @property
+ def node_home(self):
+ node_home = "node-%d" % self.guid
+ return os.path.join(self.exp_home, node_home)
- if out.find("Fedora") == 0:
- self._pm = "yum"
- elif out.find("Debian") == 0 or out.find("Ubuntu") ==0:
- self._pm = "apt-get"
+ @property
+ def os(self):
+ if self._os:
+ return self._os
+
+ if (not self.get("hostname") or not self.get("username")):
+ msg = "Can't resolve OS, insufficient data "
+ self.error(msg)
+ raise RuntimeError, msg
+
+ (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
+
+ if err and proc.poll():
+ msg = "Error detecting OS "
+ self.error(msg, out, err)
+ raise RuntimeError, "%s - %s - %s" %( msg, out, err )
+
+ if out.find("Fedora release 12") == 0:
+ self._os = "f12"
+ elif out.find("Fedora release 14") == 0:
+ self._os = "f14"
+ elif out.find("Debian") == 0:
+ self._os = "debian"
+ elif out.find("Ubuntu") ==0:
+ self._os = "ubuntu"
else:
- msg = "Can't resolve package management system. Unknown OS."
- self._logger.error(msg)
- raise RuntimeError(msg)
+ msg = "Unsupported OS"
+ self.error(msg, out)
+ raise RuntimeError, "%s - %s " %( msg, out )
- return self._pm
+ return self._os
@property
- def is_localhost(self):
- return ( self.host or self.ip ) in ['localhost', '127.0.0.7', '::1']
+ def localhost(self):
+ return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
+
+ def provision(self, filters = None):
+ if not self.is_alive():
+ self._state = ResourceState.FAILED
+ msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
+ self.error(msg)
+ raise RuntimeError, msg
+
+ if self.get("cleanProcesses"):
+ self.clean_processes()
+
+ if self.get("cleanHome"):
+ self.clean_home()
+
+ self.mkdir(self.node_home)
+
+ super(LinuxNode, self).provision()
+
+ def deploy(self):
+ if self.state == ResourceState.NEW:
+ try:
+ self.discover()
+ self.provision()
+ except:
+ self._state = ResourceState.FAILED
+ raise
+
+ # Node needs to wait until all associated interfaces are
+ # ready before it can finalize deployment
+ from neco.resources.linux.interface import LinuxInterface
+ ifaces = self.get_connected(LinuxInterface.rtype())
+ for iface in ifaces:
+ if iface.state < ResourceState.READY:
+ self.ec.schedule(reschedule_delay, self.deploy)
+ return
+
+ super(LinuxNode, self).deploy()
+
+ def release(self):
+ tear_down = self.get("tearDown")
+ if tear_down:
+ self.execute(tear_down)
+
+ super(LinuxNode, self).release()
+
+ def valid_connection(self, guid):
+ # TODO: Validate!
+ return True
+
+ def clean_processes(self, killer = False):
+ self.info("Cleaning up processes")
+
+ if killer:
+ # Hardcore kill
+ cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
+ "sudo -S killall python tcpdump || /bin/true ; " +
+ "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
+ "sudo -S killall -u root || /bin/true ; " +
+ "sudo -S killall -u root || /bin/true ; ")
+ else:
+ # Be gentler...
+ cmd = ("sudo -S killall tcpdump || /bin/true ; " +
+ "sudo -S killall tcpdump || /bin/true ; " +
+ "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
+ "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
+
+ out = err = ""
+ (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
+
+ def clean_home(self):
+ self.info("Cleaning up home")
+
+ cmd = (
+ # "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
+ "find . -maxdepth 1 -name 'nepi-*' " +
+ " -execdir rm -rf {} + "
+ )
+
+ if self.home:
+ cmd = "cd %s ; " % self.home + cmd
- def install(self, packages):
- if not isinstance(packages, list):
- packages = [packages]
+ out = err = ""
+ (out, err), proc = self.execute(cmd, with_lock = True)
- for p in packages:
- self.execute("%s -y install %s" % (self.pm, p), sudo = True,
- tty = True)
+ def upload(self, src, dst, text = False):
+ """ Copy content to destination
- def uninstall(self, packages):
- if not isinstance(packages, list):
- packages = [packages]
+ src content to copy. Can be a local file, directory or a list of files
- for p in packages:
- self.execute("%s -y remove %s" % (self.pm, p), sudo = True,
- tty = True)
+ dst destination path on the remote host (remote is always self.host)
- def upload(self, src, dst):
- if not os.path.isfile(src):
- src = cStringIO.StringIO(src)
+ text src is text input, it must be stored into a temp file before uploading
+ """
+ # If source is a string input
+ f = None
+ if text and not os.path.isfile(src):
+ # src is text input that should be uploaded as file
+ # create a temporal file with the content to upload
+ f = tempfile.NamedTemporaryFile(delete=False)
+ f.write(src)
+ f.close()
+ src = f.name
- if not self.is_localhost:
+ if not self.localhost:
# Build destination as <user>@<server>:<path>
- dst = "%s@%s:%s" % (self.user, self.host or self.ip, dst)
- return self.copy(src, dst)
+ dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
+
+ result = self.copy(src, dst)
+
+ # clean up temp file
+ if f:
+ os.remove(f.name)
+
+ return result
def download(self, src, dst):
- if not self.is_localhost:
+ if not self.localhost:
# Build destination as <user>@<server>:<path>
- src = "%s@%s:%s" % (self.user, self.host or self.ip, src)
+ src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
return self.copy(src, dst)
+
+ def install_packages(self, packages, home = None):
+ home = home or self.node_home
+
+ cmd = ""
+ if self.os in ["f12", "f14"]:
+ cmd = rpmfuncs.install_packages_command(self.os, packages)
+ elif self.os in ["debian", "ubuntu"]:
+ cmd = debfuncs.install_packages_command(self.os, packages)
+ else:
+ msg = "Error installing packages ( OS not known ) "
+ self.error(msg, self.os)
+ raise RuntimeError, msg
+
+ out = err = ""
+ (out, err), proc = self.run_and_wait(cmd, home,
+ pidfile = "instpkg_pid",
+ stdout = "instpkg_out",
+ stderr = "instpkg_err",
+ raise_on_error = True)
+
+ return (out, err), proc
+
+ def remove_packages(self, packages, home = None):
+ home = home or self.node_home
+
+ cmd = ""
+ if self.os in ["f12", "f14"]:
+ cmd = rpmfuncs.remove_packages_command(self.os, packages)
+ elif self.os in ["debian", "ubuntu"]:
+ cmd = debfuncs.remove_packages_command(self.os, packages)
+ else:
+ msg = "Error removing packages ( OS not known ) "
+ self.error(msg)
+ raise RuntimeError, msg
+
+ out = err = ""
+ (out, err), proc = self.run_and_wait(cmd, home,
+ pidfile = "rmpkg_pid",
+ stdout = "rmpkg_out",
+ stderr = "rmpkg_err",
+ raise_on_error = True)
+
+ return (out, err), proc
+
+ def mkdir(self, path, clean = False):
+ if clean:
+ self.rmdir(path)
+
+ return self.execute("mkdir -p %s" % path, with_lock = True)
+
+ def rmdir(self, path):
+ return self.execute("rm -rf %s" % path, with_lock = True)
+
+ def run_and_wait(self, command,
+ home = ".",
+ pidfile = "pid",
+ stdin = None,
+ stdout = 'stdout',
+ stderr = 'stderr',
+ sudo = False,
+ tty = False,
+ raise_on_error = False):
+ """ runs a command in background on the remote host, but waits
+ until the command finishes execution.
+ This is more robust than doing a simple synchronized 'execute',
+ since in the remote host the command can continue to run detached
+ even if network disconnections occur
+ """
+ # run command in background in remote host
+ (out, err), proc = self.run(command, home,
+ pidfile = pidfile,
+ stdin = stdin,
+ stdout = stdout,
+ stderr = stderr,
+ sudo = sudo,
+ tty = tty)
+
+ # check no errors occurred
+ if proc.poll() and err:
+ msg = " Failed to run command '%s' " % command
+ self.error(msg, out, err)
+ if raise_on_error:
+ raise RuntimeError, msg
+
+ # Wait for pid file to be generated
+ pid, ppid = self.wait_pid(
+ home = home,
+ pidfile = pidfile,
+ raise_on_error = raise_on_error)
+
+ # wait until command finishes to execute
+ self.wait_run(pid, ppid)
+
+ # check if execution errors occurred
+ (out, err), proc = self.check_output(home, stderr)
+
+ if err or out:
+ msg = " Failed to run command '%s' " % command
+ self.error(msg, out, err)
+
+ if raise_on_error:
+ raise RuntimeError, msg
- def is_alive(self, verbose = False):
- if self.is_localhost:
+ return (out, err), proc
+
+ def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
+ """ Waits until the pid file for the command is generated,
+ and returns the pid and ppid of the process """
+ pid = ppid = None
+ delay = 1.0
+ for i in xrange(5):
+ pidtuple = self.checkpid(home = home, pidfile = pidfile)
+
+ if pidtuple:
+ pid, ppid = pidtuple
+ break
+ else:
+ time.sleep(delay)
+ delay = min(30,delay*1.2)
+ else:
+ msg = " Failed to get pid for pidfile %s/%s " % (
+ home, pidfile )
+ self.error(msg)
+
+ if raise_on_error:
+ raise RuntimeError, msg
+
+ return pid, ppid
+
+ def wait_run(self, pid, ppid, trial = 0):
+ """ wait for a remote process to finish execution """
+ delay = 1.0
+ first = True
+ bustspin = 0
+
+ while True:
+ status = self.status(pid, ppid)
+
+ if status is sshfuncs.FINISHED:
+ break
+ elif status is not sshfuncs.RUNNING:
+ bustspin += 1
+ time.sleep(delay*(5.5+random.random()))
+ if bustspin > 12:
+ break
+ else:
+ if first:
+ first = False
+
+ time.sleep(delay*(0.5+random.random()))
+ delay = min(30,delay*1.2)
+ bustspin = 0
+
+ def check_output(self, home, filename):
+ """ checks file content """
+ (out, err), proc = self.execute("cat %s" %
+ os.path.join(home, filename), retry = 1, with_lock = True)
+ return (out, err), proc
+
+ def is_alive(self):
+ if self.localhost:
return True
+ out = err = ""
try:
- out = self.execute("echo 'ALIVE'",
- timeout = 60,
- err_on_timeout = False,
- persistent = False)
+ # TODO: FIX NOT ALIVE!!!!
+ (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5,
+ with_lock = True)
except:
- if verbose:
- self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
+ import traceback
+ trace = traceback.format_exc()
+ msg = "Unresponsive host %s " % err
+ self.error(msg, out, trace)
return False
if out.strip().startswith('ALIVE'):
return True
else:
- if verbose:
- self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
+ msg = "Unresponsive host "
+ self.error(msg, out, err)
return False
- def mkdir(self, path, clean = True):
- if clean:
- self.rmdir(path)
-
- return self.execute(
- "mkdir -p %s" % path,
- timeout = 120,
- retry = 3
- )
-
- def rmdir(self, path):
- return self.execute(
- "rm -rf %s" % path,
- timeout = 120,
- retry = 3
- )
-
def copy(self, src, dst):
- if self.is_localhost:
- command = ["cp", "-R", src, dst]
- p = subprocess.Popen(command, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- out, err = p.communicate()
+ if self.localhost:
+ (out, err), proc = execfuncs.lcopy(source, dest,
+ recursive = True,
+ strict_host_checking = False)
else:
- (out, err), proc = eintr_retry(rcopy)(
- src, dst,
- port = self.port,
- agent = self.agent,
- identity_file = self.identity_file)
-
- if proc.wait():
- msg = "Error uploading to %s got:\n%s%s" %\
- (self.host or self.ip, out, err)
- self._logger.error(msg)
- raise RuntimeError(msg)
+ with self._lock:
+ (out, err), proc = sshfuncs.rcopy(
+ src, dst,
+ port = self.get("port"),
+ identity = self.get("identity"),
+ server_key = self.get("serverKey"),
+ recursive = True,
+ strict_host_checking = False)
- return (out, err)
+ return (out, err), proc
def execute(self, command,
sudo = False,
stdin = None,
- tty = False,
env = None,
+ tty = False,
+ forward_x11 = False,
timeout = None,
- retry = 0,
+ retry = 3,
err_on_timeout = True,
connect_timeout = 30,
- persistent = True):
+ strict_host_checking = False,
+ persistent = True,
+ with_lock = False
+ ):
""" Notice that this invocation will block until the
execution finishes. If this is not the desired behavior,
use 'run' instead."""
- if self.is_localhost:
- if env:
- export = ''
- for envkey, envval in env.iteritems():
- export += '%s=%s ' % (envkey, envval)
- command = export + command
-
- if sudo:
- command = "sudo " + command
-
- p = subprocess.Popen(command, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- out, err = p.communicate()
+ if self.localhost:
+ (out, err), proc = execfuncs.lexec(command,
+ user = user,
+ sudo = sudo,
+ stdin = stdin,
+ env = env)
else:
- (out, err), proc = eintr_retry(rexec)(
+ if with_lock:
+ with self._lock:
+ (out, err), proc = sshfuncs.rexec(
+ command,
+ host = self.get("hostname"),
+ user = self.get("username"),
+ port = self.get("port"),
+ agent = True,
+ sudo = sudo,
+ stdin = stdin,
+ identity = self.get("identity"),
+ server_key = self.get("serverKey"),
+ env = env,
+ tty = tty,
+ forward_x11 = forward_x11,
+ timeout = timeout,
+ retry = retry,
+ err_on_timeout = err_on_timeout,
+ connect_timeout = connect_timeout,
+ persistent = persistent,
+ strict_host_checking = strict_host_checking
+ )
+ else:
+ (out, err), proc = sshfuncs.rexec(
command,
- self.host or self.ip,
- self.user,
- port = self.port,
- agent = self.forward_agent,
+ host = self.get("hostname"),
+ user = self.get("username"),
+ port = self.get("port"),
+ agent = True,
sudo = sudo,
- stdin = stdin,
- identity_file = self.identity_file,
- tty = tty,
- x11 = self.enable_x11,
+ stdin = stdin,
+ identity = self.get("identity"),
+ server_key = self.get("serverKey"),
env = env,
+ tty = tty,
+ forward_x11 = forward_x11,
timeout = timeout,
retry = retry,
err_on_timeout = err_on_timeout,
connect_timeout = connect_timeout,
- persistent = persistent)
-
- if proc.wait():
- msg = "Failed to execute command %s at node %s: %s %s" % \
- (command, self.host or self.ip, out, err,)
- self._logger.warn(msg)
- raise RuntimeError(msg)
+ persistent = persistent
+ )
- return (out, err)
+ return (out, err), proc
- def run(self, command, home,
+ def run(self, command,
+ home = None,
+ create_home = False,
+ pidfile = "pid",
stdin = None,
stdout = 'stdout',
stderr = 'stderr',
- sudo = False):
- self._logger.info("Running %s", command)
-
- pidfile = './pid',
+ sudo = False,
+ tty = False):
- if self.is_localhost:
- if stderr == stdout:
- stderr = '&1'
- else:
- stderr = ' ' + stderr
-
- daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
- 'command' : command,
- 'pidfile' : pidfile,
-
- 'stdout' : stdout,
- 'stderr' : stderr,
- 'stdin' : stdin,
- }
-
- cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c '%(command)s' " % {
- 'command' : daemon_command,
-
- 'sudo' : 'sudo -S' if sudo else '',
-
- 'pidfile' : pidfile,
- 'gohome' : 'cd %s ; ' % home if home else '',
- 'create' : 'mkdir -p %s ; ' % home if create_home else '',
- }
- p = subprocess.Popen(command, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- out, err = p.communicate()
+ self.debug("Running command '%s'" % command)
+
+ if self.localhost:
+ (out, err), proc = execfuncs.lspawn(command, pidfile,
+ stdout = stdout,
+ stderr = stderr,
+ stdin = stdin,
+ home = home,
+ create_home = create_home,
+ sudo = sudo,
+ user = user)
else:
# Start process in a "daemonized" way, using nohup and heavy
# stdin/out redirection to avoid connection issues
- (out,err), proc = rspawn(
- command,
- pidfile = pidfile,
- home = home,
- stdin = stdin if stdin is not None else '/dev/null',
- stdout = stdout if stdout else '/dev/null',
- stderr = stderr if stderr else '/dev/null',
- sudo = sudo,
- host = self.host,
- user = self.user,
- port = self.port,
- agent = self.forward_agent,
- identity_file = self.identity_file
- )
-
- if proc.wait():
- raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
-
- return (out, err)
-
- def checkpid(self, path):
- # Get PID/PPID
- # NOTE: wait a bit for the pidfile to be created
- pidtuple = rcheck_pid(
- os.path.join(path, 'pid'),
- host = self.host,
- user = self.user,
- port = self.port,
- agent = self.forward_agent,
- identity_file = self.identity_file
- )
+ with self._lock:
+ (out,err), proc = sshfuncs.rspawn(
+ command,
+ pidfile = pidfile,
+ home = home,
+ create_home = create_home,
+ stdin = stdin if stdin is not None else '/dev/null',
+ stdout = stdout if stdout else '/dev/null',
+ stderr = stderr if stderr else '/dev/null',
+ sudo = sudo,
+ host = self.get("hostname"),
+ user = self.get("username"),
+ port = self.get("port"),
+ agent = True,
+ identity = self.get("identity"),
+ server_key = self.get("serverKey"),
+ tty = tty
+ )
+
+ return (out, err), proc
+
+ def checkpid(self, home = ".", pidfile = "pid"):
+ if self.localhost:
+ pidtuple = execfuncs.lcheckpid(os.path.join(home, pidfile))
+ else:
+ with self._lock:
+ pidtuple = sshfuncs.rcheckpid(
+ os.path.join(home, pidfile),
+ host = self.get("hostname"),
+ user = self.get("username"),
+ port = self.get("port"),
+ agent = True,
+ identity = self.get("identity"),
+ server_key = self.get("serverKey")
+ )
return pidtuple
def status(self, pid, ppid):
- status = rstatus(
- pid, ppid,
- host = self.host,
- user = self.user,
- port = self.port,
- agent = self.forward_agent,
- identity_file = self.identity_file
- )
+ if self.localhost:
+ status = execfuncs.lstatus(pid, ppid)
+ else:
+ with self._lock:
+ status = sshfuncs.rstatus(
+ pid, ppid,
+ host = self.get("hostname"),
+ user = self.get("username"),
+ port = self.get("port"),
+ agent = True,
+ identity = self.get("identity"),
+ server_key = self.get("serverKey")
+ )
return status
def kill(self, pid, ppid, sudo = False):
+ out = err = ""
+ proc = None
status = self.status(pid, ppid)
- if status == RUNNING:
- # kill by ppid+pid - SIGTERM first, then try SIGKILL
- rkill(
- pid, ppid,
- host = self.host,
- user = self.user,
- port = self.port,
- agent = self.forward_agent,
- sudo = sudo,
- identity_file = self.identity_file
- )
+
+ if status == sshfuncs.RUNNING:
+ if self.localhost:
+ (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
+ else:
+ with self._lock:
+ (out, err), proc = sshfuncs.rkill(
+ pid, ppid,
+ host = self.get("hostname"),
+ user = self.get("username"),
+ port = self.get("port"),
+ agent = True,
+ sudo = sudo,
+ identity = self.get("identity"),
+ server_key = self.get("serverKey")
+ )
+ return (out, err), proc
+
+ def check_bad_host(self, out, err):
+ badre = re.compile(r'(?:'
+ r'|Error: disk I/O error'
+ r')',
+ re.I)
+ return badre.search(out) or badre.search(err)
+
+ def blacklist(self):
+ # TODO!!!!
+ self.warn(" Blacklisting malfunctioning node ")
+ #import util
+ #util.appendBlacklist(self.hostname)