import threading
# TODO: Verify files and dirs exists already
+# TODO: Blacklist node!
+
+DELAY ="1s"
@clsinit
class LinuxNode(ResourceManager):
@classmethod
def _register_attributes(cls):
- hostname = Attribute("hostname", "Hostname of the machine")
+ hostname = Attribute("hostname", "Hostname of the machine",
+ flags = Flags.ExecReadOnly)
username = Attribute("username", "Local account username",
flags = Flags.Credential)
- port = Attribute("port", "SSH port", flags = Flags.Credential)
+ port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
- home = Attribute("home",
- "Experiment home directory to store all experiment related files")
+ home = Attribute("home",
+ "Experiment home directory to store all experiment related files",
+ flags = Flags.ExecReadOnly)
identity = Attribute("identity", "SSH identity file",
flags = Flags.Credential)
server_key = Attribute("serverKey", "Server public key",
- flags = Flags.Credential)
+ flags = Flags.ExecReadOnly)
clean_home = Attribute("cleanHome", "Remove all files and directories " + \
" from home folder before starting experiment",
- flags = Flags.ReadOnly)
+ flags = Flags.ExecReadOnly)
clean_processes = Attribute("cleanProcesses",
- "Kill all running processes before starting experiment",
- flags = Flags.ReadOnly)
+ "Kill all running processes before starting experiment",
+ flags = Flags.ExecReadOnly)
tear_down = Attribute("tearDown", "Bash script to be executed before " + \
- "releasing the resource", flags = Flags.ReadOnly)
+ "releasing the resource",
+ flags = Flags.ExecReadOnly)
cls._register_attribute(hostname)
cls._register_attribute(username)
def __init__(self, ec, guid):
super(LinuxNode, self).__init__(ec, guid)
self._os = None
- self._home = "nepi-exp-%s" % os.urandom(8).encode('hex')
# lock to avoid concurrency issues on methods used by applications
self._lock = threading.Lock()
@property
def home(self):
- home = self.get("home")
- if home and not home.startswith("nepi-"):
- home = "nepi-" + home
- return home or self._home
+ return self.get("home") or "/tmp"
+
+ @property
+ def exp_dir(self):
+ exp_dir = os.path.join(self.home, self.ec.exp_id)
+ return exp_dir if exp_dir.startswith('/') else "${HOME}/"
+
+ @property
+ def node_dir(self):
+ node_dir = "node-%d" % self.guid
+ return os.path.join(self.exp_dir, node_dir)
@property
def os(self):
self.logger.error("Deploy failed. Unresponsive node")
return
- def deploy(self):
- self.provision()
-
if self.get("cleanProcesses"):
self.clean_processes()
if self.get("cleanHome"):
- # self.clean_home() -> this is dangerous
- pass
+ self.clean_home()
+
+ self.mkdir(self.node_dir)
+
+ super(LinuxNode, self).provision()
- self.mkdir(self.home)
+ def deploy(self):
+ if self.state == ResourceState.NEW:
+ self.discover()
+ self.provision()
+
+ # Node needs to wait until all associated interfaces are
+ # ready before it can finalize deployment
+ from neco.resources.linux.interface import LinuxInterface
+ ifaces = self.get_connected(LinuxInterface.rtype())
+ for iface in ifaces:
+ if iface.state < ResourceState.READY:
+ self.ec.schedule(DELAY, self.deploy)
+ return
super(LinuxNode, self).deploy()
super(LinuxNode, self).release()
- def validate_connection(self, guid):
+ def valid_connection(self, guid):
# TODO: Validate!
return True
out = err = ""
with self._lock:
- (out, err), proc = self.run_and_wait(cmd, self.home,
- pidfile = "cppid",
- stdout = "cplog",
- stderr = "cperr",
- raise_on_error = True)
-
- return (out, err)
+ (out, err), proc = self.execute(cmd)
def clean_home(self):
self.logger.info("Cleaning up home")
- cmd = "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \) -execdir rm -rf {} + "
+ cmd = ("cd %s ; " % self.home +
+ "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)"+
+ " -execdir rm -rf {} + ")
out = err = ""
with self._lock:
- (out, err), proc = self.run_and_wait(cmd, self.home,
- pidfile = "chpid",
- stdout = "chlog",
- stderr = "cherr",
- raise_on_error = True)
-
- return (out, err)
+ (out, err), proc = self.execute(cmd)
- def upload(self, src, dst):
+ def upload(self, src, dst, text = False):
""" Copy content to destination
- src content to copy. Can be a local file, directory or text input
+ src content to copy. Can be a local file, directory or a list of files
dst destination path on the remote host (remote is always self.host)
+
+ text src is text input, it must be stored into a temp file before uploading
"""
# If source is a string input
- if not os.path.isfile(src):
+ f = None
+ if text and not os.path.isfile(src):
# src is text input that should be uploaded as file
# create a temporal file with the content to upload
f = tempfile.NamedTemporaryFile(delete=False)
# Build destination as <user>@<server>:<path>
dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
- return self.copy(src, dst)
+ result = self.copy(src, dst)
+
+ # clean up temp file
+ if f:
+ os.remove(f.name)
+
+ return result
def download(self, src, dst):
if not self.localhost:
src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
return self.copy(src, dst)
- def install_packages(self, packages):
+ def install_packages(self, packages, home = None):
+ home = home or self.node_dir
+
cmd = ""
if self.os in ["f12", "f14"]:
cmd = rpmfuncs.install_packages_command(self.os, packages)
out = err = ""
with self._lock:
- (out, err), proc = self.run_and_wait(cmd, self.home,
- pidfile = "instpkgpid",
- stdout = "instpkglog",
- stderr = "instpkgerr",
+ (out, err), proc = self.run_and_wait(cmd, home,
+ pidfile = "instpkg_pid",
+ stdout = "instpkg_log",
+ stderr = "instpkg_err",
raise_on_error = True)
return (out, err), proc
- def remove_packages(self, packages):
+ def remove_packages(self, packages, home = None):
+ home = home or self.node_dir
+
cmd = ""
if self.os in ["f12", "f14"]:
cmd = rpmfuncs.remove_packages_command(self.os, packages)
out = err = ""
with self._lock:
- (out, err), proc = self.run_and_wait(cmd, self.home,
- pidfile = "rmpkgpid",
- stdout = "rmpkglog",
- stderr = "rmpkgerr",
+ (out, err), proc = self.run_and_wait(cmd, home,
+ pidfile = "rmpkg_pid",
+ stdout = "rmpkg_log",
+ stderr = "rmpkg_err",
raise_on_error = True)
return (out, err), proc
stderr = 'stderr',
sudo = False,
raise_on_error = False):
-
+ """ runs a command in background on the remote host, but waits
+ until the command finishes execution.
+ This is more robust than doing a simple synchronized 'execute',
+ since in the remote host the command can continue to run detached
+ even if network disconnections occur
+ """
+ # run command in background in remote host
(out, err), proc = self.run(command, home,
pidfile = pidfile,
stdin = stdin,
stderr = stderr,
sudo = sudo)
+ # check no errors occurred
if proc.poll() and err:
msg = " Failed to run command %s on host %s" % (
command, self.get("hostname"))
self.logger.error(msg)
if raise_on_error:
raise RuntimeError, msg
-
+
+ # Wait for pid file to be generated
pid, ppid = self.wait_pid(
home = home,
pidfile = pidfile,
raise_on_error = raise_on_error)
+ # wait until command finishes to execute
self.wait_run(pid, ppid)
-
- (out, err), proc = self.check_run_error(home, stderr)
+
+ # check if execution errors occurred
+ (out, err), proc = self.check_output(home, stderr)
if err or out:
msg = "Error while running command %s on host %s. error output: %s" % (
return (out, err), proc
def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
+ """ Waits until the pid file for the command is generated,
+ and returns the pid and ppid of the process """
pid = ppid = None
delay = 1.0
for i in xrange(5):
return pid, ppid
def wait_run(self, pid, ppid, trial = 0):
+ """ wait for a remote process to finish execution """
delay = 1.0
first = True
bustspin = 0
delay = min(30,delay*1.2)
bustspin = 0
- def check_run_error(self, home, stderr = 'stderr'):
- (out, err), proc = self.execute("cat %s" %
- os.path.join(home, stderr))
- return (out, err), proc
-
- def check_run_output(self, home, stdout = 'stdout'):
+ def check_output(self, home, filename):
+ """ checks file content """
(out, err), proc = self.execute("cat %s" %
- os.path.join(home, stdout))
+ os.path.join(home, filename))
return (out, err), proc
def is_alive(self):
self.logger.error("%s. out: %s error: %s", fail_msg, out, err)
break
except RuntimeError, e:
- if x >= 3:
+ if i >= 3:
self.logger.error("%s. error: %s", fail_msg, e.args)
return (out, err), proc