import threading
# TODO: Verify files and dirs exists already
-# TODO: Blacklist node!
+# TODO: Blacklist nodes!
+# TODO: Unify delays!!
+# TODO: Validate outcome of uploads!!
+
+reschedule_delay = "0.5s"
-DELAY ="1s"
@clsinit
class LinuxNode(ResourceManager):
# lock to avoid concurrency issues on methods used by applications
self._lock = threading.Lock()
- self._logger = logging.getLogger("neco.linux.Node.%d " % self.guid)
+ self._logger = logging.getLogger("LinuxNode")
+
+ def log_message(self, msg):
+ return " guid %d - host %s - %s " % (self.guid,
+ self.get("hostname"), msg)
@property
def home(self):
- return self.get("home") or "/tmp"
+ return self.get("home") or ""
@property
- def exp_dir(self):
- exp_dir = os.path.join(self.home, self.ec.exp_id)
- return exp_dir if exp_dir.startswith('/') else "${HOME}/"
+ def exp_home(self):
+ return os.path.join(self.home, self.ec.exp_id)
@property
- def node_dir(self):
- node_dir = "node-%d" % self.guid
- return os.path.join(self.exp_dir, node_dir)
+ def node_home(self):
+ node_home = "node-%d" % self.guid
+ return os.path.join(self.exp_home, node_home)
@property
def os(self):
return self._os
if (not self.get("hostname") or not self.get("username")):
- msg = "Can't resolve OS for guid %d. Insufficient data." % self.guid
- self.logger.error(msg)
+ msg = "Can't resolve OS, insufficient data "
+ self.error(msg)
raise RuntimeError, msg
- (out, err), proc = self.execute("cat /etc/issue")
+ (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
if err and proc.poll():
- msg = "Error detecting OS for host %s. err: %s " % (self.get("hostname"), err)
- self.logger.error(msg)
- raise RuntimeError, msg
+ msg = "Error detecting OS "
+ self.error(msg, out, err)
+ raise RuntimeError, "%s - %s - %s" %( msg, out, err )
if out.find("Fedora release 12") == 0:
self._os = "f12"
elif out.find("Ubuntu") ==0:
self._os = "ubuntu"
else:
- msg = "Unsupported OS %s for host %s" % (out, self.get("hostname"))
- self.logger.error(msg)
- raise RuntimeError, msg
+ msg = "Unsupported OS"
+ self.error(msg, out)
+ raise RuntimeError, "%s - %s " %( msg, out )
return self._os
def provision(self, filters = None):
if not self.is_alive():
self._state = ResourceState.FAILED
- self.logger.error("Deploy failed. Unresponsive node")
- return
+ msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
+ self.error(msg)
+ raise RuntimeError, msg
if self.get("cleanProcesses"):
self.clean_processes()
if self.get("cleanHome"):
self.clean_home()
- self.mkdir(self.node_dir)
+ self.mkdir(self.node_home)
super(LinuxNode, self).provision()
def deploy(self):
if self.state == ResourceState.NEW:
- self.discover()
- self.provision()
+ try:
+ self.discover()
+ self.provision()
+ except:
+ self._state = ResourceState.FAILED
+ raise
# Node needs to wait until all associated interfaces are
# ready before it can finalize deployment
ifaces = self.get_connected(LinuxInterface.rtype())
for iface in ifaces:
if iface.state < ResourceState.READY:
- self.ec.schedule(DELAY, self.deploy)
+ self.ec.schedule(reschedule_delay, self.deploy)
return
super(LinuxNode, self).deploy()
# TODO: Validate!
return True
- def clean_processes(self):
- self.logger.info("Cleaning up processes")
+ def clean_processes(self, killer = False):
+ self.info("Cleaning up processes")
- cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
- "sudo -S killall python tcpdump || /bin/true ; " +
- "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
- "sudo -S killall -u root || /bin/true ; " +
- "sudo -S killall -u root || /bin/true ; ")
+ if killer:
+ # Hardcore kill
+ cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
+ "sudo -S killall python tcpdump || /bin/true ; " +
+ "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
+ "sudo -S killall -u root || /bin/true ; " +
+ "sudo -S killall -u root || /bin/true ; ")
+ else:
+ # Be gentler...
+ cmd = ("sudo -S killall tcpdump || /bin/true ; " +
+ "sudo -S killall tcpdump || /bin/true ; " +
+ "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
+ "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
out = err = ""
- with self._lock:
- (out, err), proc = self.execute(cmd)
+ (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
def clean_home(self):
- self.logger.info("Cleaning up home")
-
- cmd = ("cd %s ; " % self.home +
- "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)"+
- " -execdir rm -rf {} + ")
+ self.info("Cleaning up home")
+
+ cmd = (
+ # "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
+ "find . -maxdepth 1 -name 'nepi-*' " +
+ " -execdir rm -rf {} + "
+ )
+
+ if self.home:
+ cmd = "cd %s ; " % self.home + cmd
out = err = ""
- with self._lock:
- (out, err), proc = self.execute(cmd)
+ (out, err), proc = self.execute(cmd, with_lock = True)
def upload(self, src, dst, text = False):
""" Copy content to destination
return self.copy(src, dst)
def install_packages(self, packages, home = None):
- home = home or self.node_dir
+ home = home or self.node_home
cmd = ""
if self.os in ["f12", "f14"]:
elif self.os in ["debian", "ubuntu"]:
cmd = debfuncs.install_packages_command(self.os, packages)
else:
- msg = "Error installing packages. OS not known for host %s " % (
- self.get("hostname"))
- self.logger.error(msg)
+ msg = "Error installing packages ( OS not known ) "
+ self.error(msg, self.os)
raise RuntimeError, msg
out = err = ""
- with self._lock:
- (out, err), proc = self.run_and_wait(cmd, home,
- pidfile = "instpkg_pid",
- stdout = "instpkg_log",
- stderr = "instpkg_err",
- raise_on_error = True)
+ (out, err), proc = self.run_and_wait(cmd, home,
+ pidfile = "instpkg_pid",
+ stdout = "instpkg_out",
+ stderr = "instpkg_err",
+ raise_on_error = True)
return (out, err), proc
def remove_packages(self, packages, home = None):
- home = home or self.node_dir
+ home = home or self.node_home
cmd = ""
if self.os in ["f12", "f14"]:
elif self.os in ["debian", "ubuntu"]:
cmd = debfuncs.remove_packages_command(self.os, packages)
else:
- msg = "Error removing packages. OS not known for host %s " % (
- self.get("hostname"))
- self.logger.error(msg)
+ msg = "Error removing packages ( OS not known ) "
+ self.error(msg)
raise RuntimeError, msg
out = err = ""
- with self._lock:
- (out, err), proc = self.run_and_wait(cmd, home,
- pidfile = "rmpkg_pid",
- stdout = "rmpkg_log",
- stderr = "rmpkg_err",
- raise_on_error = True)
+ (out, err), proc = self.run_and_wait(cmd, home,
+ pidfile = "rmpkg_pid",
+ stdout = "rmpkg_out",
+ stderr = "rmpkg_err",
+ raise_on_error = True)
return (out, err), proc
if clean:
self.rmdir(path)
- return self.execute("mkdir -p %s" % path)
+ return self.execute("mkdir -p %s" % path, with_lock = True)
def rmdir(self, path):
- return self.execute("rm -rf %s" % path)
+ return self.execute("rm -rf %s" % path, with_lock = True)
def run_and_wait(self, command,
home = ".",
stdout = 'stdout',
stderr = 'stderr',
sudo = False,
+ tty = False,
raise_on_error = False):
""" runs a command in background on the remote host, but waits
until the command finishes execution.
stdin = stdin,
stdout = stdout,
stderr = stderr,
- sudo = sudo)
+ sudo = sudo,
+ tty = tty)
# check no errors occurred
if proc.poll() and err:
- msg = " Failed to run command %s on host %s" % (
- command, self.get("hostname"))
- self.logger.error(msg)
+ msg = " Failed to run command '%s' " % command
+ self.error(msg, out, err)
if raise_on_error:
raise RuntimeError, msg
(out, err), proc = self.check_output(home, stderr)
if err or out:
- msg = "Error while running command %s on host %s. error output: %s" % (
- command, self.get("hostname"), out)
- if err:
- msg += " . err: %s" % err
+ msg = " Failed to run command '%s' " % command
+ self.error(msg, out, err)
- self.logger.error(msg)
if raise_on_error:
raise RuntimeError, msg
time.sleep(delay)
delay = min(30,delay*1.2)
else:
- msg = " Failed to get pid for pidfile %s/%s on host %s" % (
- home, pidfile, self.get("hostname"))
- self.logger.error(msg)
+ msg = " Failed to get pid for pidfile %s/%s " % (
+ home, pidfile )
+ self.error(msg)
+
if raise_on_error:
raise RuntimeError, msg
def check_output(self, home, filename):
""" checks file content """
(out, err), proc = self.execute("cat %s" %
- os.path.join(home, filename))
+ os.path.join(home, filename), retry = 1, with_lock = True)
return (out, err), proc
def is_alive(self):
out = err = ""
try:
- (out, err), proc = self.execute("echo 'ALIVE'")
+ # TODO: FIX NOT ALIVE!!!!
+ (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5,
+ with_lock = True)
except:
import traceback
trace = traceback.format_exc()
- self.logger.warn("Unresponsive host %s. got:\n out: %s err: %s\n traceback: %s",
- self.get("hostname"), out, err, trace)
+ msg = "Unresponsive host %s " % err
+ self.error(msg, out, trace)
return False
if out.strip().startswith('ALIVE'):
return True
else:
- self.logger.warn("Unresponsive host %s. got:\n%s%s",
- self.get("hostname"), out, err)
+ msg = "Unresponsive host "
+ self.error(msg, out, err)
return False
- # TODO!
- #if self.check_bad_host(out,err):
- # self.blacklist()
-
def copy(self, src, dst):
if self.localhost:
(out, err), proc = execfuncs.lcopy(source, dest,
- recursive = True)
+ recursive = True,
+ strict_host_checking = False)
else:
- (out, err), proc = self.safe_retry(sshfuncs.rcopy)(
- src, dst,
- port = self.get("port"),
- identity = self.get("identity"),
- server_key = self.get("serverKey"),
- recursive = True)
+ with self._lock:
+ (out, err), proc = sshfuncs.rcopy(
+ src, dst,
+ port = self.get("port"),
+ identity = self.get("identity"),
+ server_key = self.get("serverKey"),
+ recursive = True,
+ strict_host_checking = False)
return (out, err), proc
tty = False,
forward_x11 = False,
timeout = None,
- retry = 0,
+ retry = 3,
err_on_timeout = True,
connect_timeout = 30,
- persistent = True
+ strict_host_checking = False,
+ persistent = True,
+ with_lock = False
):
""" Notice that this invocation will block until the
execution finishes. If this is not the desired behavior,
stdin = stdin,
env = env)
else:
- (out, err), proc = self.safe_retry(sshfuncs.rexec)(
+ if with_lock:
+ with self._lock:
+ (out, err), proc = sshfuncs.rexec(
+ command,
+ host = self.get("hostname"),
+ user = self.get("username"),
+ port = self.get("port"),
+ agent = True,
+ sudo = sudo,
+ stdin = stdin,
+ identity = self.get("identity"),
+ server_key = self.get("serverKey"),
+ env = env,
+ tty = tty,
+ forward_x11 = forward_x11,
+ timeout = timeout,
+ retry = retry,
+ err_on_timeout = err_on_timeout,
+ connect_timeout = connect_timeout,
+ persistent = persistent,
+ strict_host_checking = strict_host_checking
+ )
+ else:
+ (out, err), proc = sshfuncs.rexec(
command,
host = self.get("hostname"),
user = self.get("username"),
def run(self, command,
home = None,
- create_home = True,
+ create_home = False,
pidfile = "pid",
stdin = None,
stdout = 'stdout',
stderr = 'stderr',
- sudo = False):
+ sudo = False,
+ tty = False):
- self.logger.info("Running %s", command)
+ self.debug("Running command '%s'" % command)
if self.localhost:
(out, err), proc = execfuncs.lspawn(command, pidfile,
else:
# Start process in a "daemonized" way, using nohup and heavy
# stdin/out redirection to avoid connection issues
- (out,err), proc = self.safe_retry(sshfuncs.rspawn)(
- command,
- pidfile = pidfile,
- home = home,
- create_home = create_home,
- stdin = stdin if stdin is not None else '/dev/null',
- stdout = stdout if stdout else '/dev/null',
- stderr = stderr if stderr else '/dev/null',
- sudo = sudo,
- host = self.get("hostname"),
- user = self.get("username"),
- port = self.get("port"),
- agent = True,
- identity = self.get("identity"),
- server_key = self.get("serverKey")
- )
+ with self._lock:
+ (out,err), proc = sshfuncs.rspawn(
+ command,
+ pidfile = pidfile,
+ home = home,
+ create_home = create_home,
+ stdin = stdin if stdin is not None else '/dev/null',
+ stdout = stdout if stdout else '/dev/null',
+ stderr = stderr if stderr else '/dev/null',
+ sudo = sudo,
+ host = self.get("hostname"),
+ user = self.get("username"),
+ port = self.get("port"),
+ agent = True,
+ identity = self.get("identity"),
+ server_key = self.get("serverKey"),
+ tty = tty
+ )
return (out, err), proc
if self.localhost:
pidtuple = execfuncs.lcheckpid(os.path.join(home, pidfile))
else:
- pidtuple = sshfuncs.rcheckpid(
- os.path.join(home, pidfile),
- host = self.get("hostname"),
- user = self.get("username"),
- port = self.get("port"),
- agent = True,
- identity = self.get("identity"),
- server_key = self.get("serverKey")
- )
-
- return pidtuple
-
- def status(self, pid, ppid):
- if self.localhost:
- status = execfuncs.lstatus(pid, ppid)
- else:
- status = sshfuncs.rstatus(
- pid, ppid,
+ with self._lock:
+ pidtuple = sshfuncs.rcheckpid(
+ os.path.join(home, pidfile),
host = self.get("hostname"),
user = self.get("username"),
port = self.get("port"),
identity = self.get("identity"),
server_key = self.get("serverKey")
)
+
+ return pidtuple
+
+ def status(self, pid, ppid):
+ if self.localhost:
+ status = execfuncs.lstatus(pid, ppid)
+ else:
+ with self._lock:
+ status = sshfuncs.rstatus(
+ pid, ppid,
+ host = self.get("hostname"),
+ user = self.get("username"),
+ port = self.get("port"),
+ agent = True,
+ identity = self.get("identity"),
+ server_key = self.get("serverKey")
+ )
return status
if self.localhost:
(out, err), proc = execfuncs.lkill(pid, ppid, sudo)
else:
- (out, err), proc = self.safe_retry(sshfuncs.rkill)(
- pid, ppid,
- host = self.get("hostname"),
- user = self.get("username"),
- port = self.get("port"),
- agent = True,
- sudo = sudo,
- identity = self.get("identity"),
- server_key = self.get("serverKey")
- )
+ with self._lock:
+ (out, err), proc = sshfuncs.rkill(
+ pid, ppid,
+ host = self.get("hostname"),
+ user = self.get("username"),
+ port = self.get("port"),
+ agent = True,
+ sudo = sudo,
+ identity = self.get("identity"),
+ server_key = self.get("serverKey")
+ )
return (out, err), proc
def check_bad_host(self, out, err):
def blacklist(self):
# TODO!!!!
- self.logger.warn("Blacklisting malfunctioning node %s", self.hostname)
+ self.warn(" Blacklisting malfunctioning node ")
#import util
#util.appendBlacklist(self.hostname)
- def safe_retry(self, func):
- """Retries a function invocation using a lock"""
- import functools
- @functools.wraps(func)
- def rv(*p, **kw):
- fail_msg = " Failed to execute function %s(%s, %s) at host %s" % (
- func.__name__, p, kw, self.get("hostname"))
- retry = kw.pop("_retry", False)
- wlock = kw.pop("_with_lock", False)
-
- out = err = ""
- proc = None
- for i in xrange(0 if retry else 4):
- try:
- if wlock:
- with self._lock:
- (out, err), proc = func(*p, **kw)
- else:
- (out, err), proc = func(*p, **kw)
-
- if proc.poll():
- if retry:
- time.sleep(i*15)
- continue
- else:
- self.logger.error("%s. out: %s error: %s", fail_msg, out, err)
- break
- except RuntimeError, e:
- if i >= 3:
- self.logger.error("%s. error: %s", fail_msg, e.args)
- return (out, err), proc
-
- return rv
-