NoFlags = 0x00
# Attribute is not modifiable by the user
ReadOnly = 0x01
+ # Attribute is not modifiable by the user during runtime
+ ExecReadOnly = 0x02
# Attribute is an access credential
- Credential = 0x02
+ Credential = 0x04
class Attribute(object):
def __init__(self, name, help, type = Types.String,
- flags = Flags.NoFlags, default = None):
+ flags = Flags.NoFlags, default = None, allowed = None,
+ set_hook = None):
self._name = name
self._help = help
self._type = type
self._flags = flags
+ self._allowed = allowed
self._default = self._value = default
+ # callback to be invoked upon changing the
+ # attribute value
+ self.set_hook = set_hook
@property
def name(self):
def flags(self):
return self._flags
+ @property
+ def allowed(self):
+ return self._allowed
+
def has_flag(self, flag):
return (self._flags & flag) == flag
return self._value
def set_value(self, value):
- if self.is_valid_value(value):
+ valid = True
+
+ if self.type == Types.Enum:
+ valid = value in self._allowed
+
+ valid = valid and self.is_valid_value(value)
+
+ if valid:
+ if self.set_hook:
+ # Hook receives old value, new value
+ value = self.set_hook(self._value, value)
+
self._value = value
else:
raise ValueError("Invalid value %s for attribute %s" %
import threading
from neco.util import guid
+from neco.util.parallel import ParallelRun
from neco.util.timefuncs import strfnow, strfdiff, strfvalid
from neco.execution.resource import ResourceFactory, ResourceAction, \
ResourceState
from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
-from neco.util.parallel import ParallelRun
+from neco.execution.trace import TraceAttr
# TODO: use multiprocessing instead of threading
class ExperimentController(object):
- def __init__(self, root_dir = "/tmp"):
+ def __init__(self, exp_id = None, root_dir = "/tmp"):
super(ExperimentController, self).__init__()
# root directory to store files
self._root_dir = root_dir
+ # experiment identifier given by the user
+ self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
+
# generator of globally unique ids
self._guid_generator = guid.GuidGenerator()
def logger(self):
return self._logger
+ @property
+ def exp_id(self):
+ exp_id = self._exp_id
+ if not exp_id.startswith("nepi-"):
+ exp_id = "nepi-" + exp_id
+ return exp_id
def get_task(self, tid):
return self._tasks.get(tid)
rm = self.get_resource(guid1)
rm.register_condition(action, group2, state, time)
+ def register_trace(self, guid, name):
+ """ Enable trace
+
+ :param name: Name of the trace
+ :type name: str
+ """
+ rm = self.get_resource(guid)
+ rm.register_trace(name)
+
+ def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
+ """ Get information on collected trace
+
+ :param name: Name of the trace
+ :type name: str
+
+ :param attr: Can be one of:
+ - TraceAttr.ALL (complete trace content),
+ - TraceAttr.STREAM (block in bytes to read starting at offset),
+ - TraceAttr.PATH (full path to the trace file),
+ - TraceAttr.SIZE (size of trace file).
+ :type attr: str
+
+ :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
+ :type name: int
+
+ :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
+ :type name: int
+
+ :rtype: str
+ """
+ rm = self.get_resource(guid)
+ return rm.trace(name, attr, block, offset)
+
def discover(self, guid, filters):
rm = self.get_resource(guid)
return rm.discover(filters)
rm = self.get_resource(guid)
return rm.start_with_condition()
- def deploy(self, group = None, wait_all_deployed = True):
+ def deploy(self, group = None, wait_all_ready = True):
""" Deploy all resource manager in group
:param group: List of guids of RMs to deploy
:type group: list
- :param wait_all_deployed: Wait until all RMs are deployed in
+ :param wait_all_ready: Wait until all RMs are ready in
order to start the RMs
:type guid: int
for guid in group:
rm = self.get_resource(guid)
- if wait_all_deployed:
+ if wait_all_ready:
towait = list(group)
towait.remove(guid)
self.register_condition(guid, ResourceAction.START,
from neco.util.timefuncs import strfnow, strfdiff, strfvalid
+from neco.execution.trace import TraceAttr
import copy
import functools
READY = 3
STARTED = 4
STOPPED = 5
- FAILED = 6
- RELEASED = 7
+ FINISHED = 6
+ FAILED = 7
+ RELEASED = 8
def clsinit(cls):
cls._clsinit()
_rtype = "Resource"
_filters = None
_attributes = None
+ _traces = None
@classmethod
def _register_filter(cls, attr):
cls._attributes[attr.name] = attr
@classmethod
- def _register_filters(cls):
+ def _register_trace(cls, trace):
""" Resource subclasses will invoke this method to add a
- filter attribute
+ resource trace
+
+ """
+ cls._traces[trace.name] = trace
+
+
+ @classmethod
+ def _register_filters(cls):
+ """ Resource subclasses will invoke this method to register
+ resource filters
"""
pass
@classmethod
def _register_attributes(cls):
- """ Resource subclasses will invoke this method to add a
- resource attribute
+ """ Resource subclasses will invoke this method to register
+ resource attributes
+
+ """
+ pass
+
+ @classmethod
+ def _register_traces(cls):
+ """ Resource subclasses will invoke this method to register
+ resource traces
"""
pass
cls._attributes = dict()
cls._register_attributes()
+ # static template for resource traces
+ cls._traces = dict()
+ cls._register_traces()
+
@classmethod
def rtype(cls):
return cls._rtype
"""
return copy.deepcopy(cls._attributes.values())
+ @classmethod
+ def get_traces(cls):
+ """ Returns a copy of the traces
+
+ """
+ return copy.deepcopy(cls._traces.values())
+
def __init__(self, ec, guid):
self._guid = guid
self._ec = weakref.ref(ec)
self._conditions = dict()
# the resource instance gets a copy of all attributes
- # that can modify
self._attrs = copy.deepcopy(self._attributes)
+ # the resource instance gets a copy of all traces
+ self._trcs = copy.deepcopy(self._traces)
+
self._state = ResourceState.NEW
self._start_time = None
:type name: str
:param name: Value of the attribute
:type name: str
- :rtype: Boolean
"""
attr = self._attrs[name]
attr.value = value
attr = self._attrs[name]
return attr.value
+ def register_trace(self, name):
+ """ Enable trace
+
+ :param name: Name of the trace
+ :type name: str
+ """
+ trace = self._trcs[name]
+ trace.enabled = True
+
+ def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
+ """ Get information on collected trace
+
+ :param name: Name of the trace
+ :type name: str
+
+ :param attr: Can be one of:
+ - TraceAttr.ALL (complete trace content),
+ - TraceAttr.STREAM (block in bytes to read starting at offset),
+ - TraceAttr.PATH (full path to the trace file),
+ - TraceAttr.SIZE (size of trace file).
+ :type attr: str
+
+ :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
+ :type name: int
+
+ :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
+ :type name: int
+
+ :rtype: str
+ """
+ pass
+
def register_condition(self, action, group, state,
time = None):
""" Registers a condition on the resource manager to allow execution
conditions.append((group, state, time))
+ def get_connected(self, rtype):
+ connected = []
+ for guid in self.connections:
+ rm = self.ec.get_resource(guid)
+ if rm.rtype() == rtype:
+ connected.append(rm)
+ return connected
+
def _needs_reschedule(self, group, state, time):
""" Internal method that verify if 'time' has elapsed since
all elements in 'group' have reached state 'state'.
# only can start when RM is either STOPPED or READY
if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
reschedule = True
+ self.logger.debug("---- RESCHEDULING START ---- state %s " % self.state )
else:
self.logger.debug("---- START CONDITIONS ---- %s" %
self.conditions.get(ResourceAction.START))
--- /dev/null
+class TraceAttr:
+ ALL = 'all'
+ STREAM = 'stream'
+ PATH = 'path'
+ SIZE = 'size'
+
+class Trace(object):
+ def __init__(self, name, help):
+ self._name = name
+ self._help = help
+ self.enabled = False
+
+ @property
+ def name(self):
+ return self._name
+
+ @property
+ def help(self):
+ return self._help
+
-from neco.execution.attribute import Attribute, Flags
+from neco.execution.attribute import Attribute, Flags, Types
+from neco.execution.trace import Trace, TraceAttr
from neco.execution.resource import ResourceManager, clsinit, ResourceState
-from neco.resources.linux.ssh_api import SSHApiFactory
+from neco.resources.linux.node import LinuxNode
+from neco.util import sshfuncs
import logging
+import os
+
+DELAY ="1s"
+
+# TODO: Resolve wildcards in commands!!
@clsinit
class LinuxApplication(ResourceManager):
@classmethod
def _register_attributes(cls):
command = Attribute("command", "Command to execute",
- flags = Flags.ReadOnly)
+ flags = Flags.ExecReadOnly)
forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections",
- flags = Flags.ReadOnly)
+ flags = Flags.ExecReadOnly)
env = Attribute("env", "Environment variables string for command execution",
- flags = Flags.ReadOnly)
+ flags = Flags.ExecReadOnly)
sudo = Attribute("sudo", "Run with root privileges",
- flags = Flags.ReadOnly)
+ flags = Flags.ExecReadOnly)
depends = Attribute("depends",
"Space-separated list of packages required to run the application",
- flags = Flags.ReadOnly)
+ flags = Flags.ExecReadOnly)
sources = Attribute("sources",
"Space-separated list of regular files to be deployed in the working "
"path prior to building. Archives won't be expanded automatically.",
- flags = Flags.ReadOnly)
+ flags = Flags.ExecReadOnly)
+ code = Attribute("code",
+ "Plain text source code to be uploaded to the server. It will be stored "
+ "under ${SOURCES}/code",
+ flags = Flags.ExecReadOnly)
build = Attribute("build",
"Build commands to execute after deploying the sources. "
"Sources will be in the ${SOURCES} folder. "
"make and other tools to install, be sure to provide them as "
"actual dependencies instead.",
flags = Flags.ReadOnly)
- stdin = Attribute("stdin", "Standard input", flags = Flags.ReadOnly)
- stdout = Attribute("stdout", "Standard output", flags = Flags.ReadOnly)
- stderr = Attribute("stderr", "Standard error", flags = Flags.ReadOnly)
+ stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
+ stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
+ stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
+ update_home = Attribute("updateHome", "If application hash has changed remove old directory and"
+ "re-upload before starting experiment. If not keep the same directory",
+ default = True,
+ type = Types.Bool,
+ flags = Flags.ExecReadOnly)
- tear_down = Attribute("tearDown", "Bash script to be executed before
- releasing the resource", flags = Flags.ReadOnly)
+ tear_down = Attribute("tearDown", "Bash script to be executed before "
+ "releasing the resource",
+ flags = Flags.ReadOnly)
cls._register_attribute(command)
cls._register_attribute(forward_x11)
cls._register_attribute(sudo)
cls._register_attribute(depends)
cls._register_attribute(sources)
+ cls._register_attribute(code)
cls._register_attribute(build)
cls._register_attribute(install)
cls._register_attribute(stdin)
cls._register_attribute(stdout)
cls._register_attribute(stderr)
+ cls._register_attribute(update_home)
cls._register_attribute(tear_down)
+ @classmethod
+ def _register_traces(cls):
+ stdout = Trace("stdout", "Standard output stream")
+ stderr = Trace("stderr", "Standard error stream")
+ buildlog = Trace("buildlog", "Output of the build process")
+
+ cls._register_trace(stdout)
+ cls._register_trace(stderr)
+ cls._register_trace(buildlog)
+
def __init__(self, ec, guid):
super(LinuxApplication, self).__init__(ec, guid)
self._pid = None
self._ppid = None
- self._home = "app-%s" % self.box.guid
- self._node = None
+ self._home = "app-%s" % self.guid
self._logger = logging.getLogger("neco.linux.Application.%d" % guid)
@property
def node(self):
- self._node
+ node = self.get_connected(LinuxNode.rtype())
+ if node: return node[0]
+ return None
@property
def home(self):
- return self._home # + node home
+ return os.path.join(self.node.exp_dir, self._home)
+
+ @property
+ def src_dir(self):
+ return os.path.join(self.home, 'src')
+
+ @property
+ def build_dir(self):
+ return os.path.join(self.home, 'build')
@property
def pid(self):
def ppid(self):
return self._ppid
+ def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
+ path = os.path.join(self.home, name)
+
+ cmd = "(test -f %s && echo 'success') || echo 'error'" % path
+ (out, err), proc = self.node.execute(cmd)
+
+ if (err and proc.poll()) or out.find("error") != -1:
+ err_msg = " Couldn't find trace %s on host %s. Error: %s" % (
+ name, self.node.get("hostname"), err)
+ self.logger.error(err_msg)
+ return None
+
+ if attr == TraceAttr.PATH:
+ return path
+
+ if attr == TraceAttr.ALL:
+ (out, err), proc = self.node.check_output(self.home, name)
+
+ if err and proc.poll():
+ err_msg = " Couldn't read trace %s on host %s. Error: %s" % (
+ name, self.node.get("hostname"), err)
+ self.logger.error(err_msg)
+ return None
+
+ return out
+
+ if attr == TraceAttr.STREAM:
+ cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
+ elif attr == TraceAttr.SIZE:
+ cmd = "stat -c%%s %s " % path
+
+ (out, err), proc = self.node.execute(cmd)
+
+ if err and proc.poll():
+ err_msg = " Couldn't find trace %s on host %s. Error: %s" % (
+ name, self.node.get("hostname"), err)
+ self.logger.error(err_msg)
+ return None
+
+ if attr == TraceAttr.SIZE:
+ out = int(out.strip())
+
+ return out
+
def provision(self, filters = None):
- # verify home hash or clean home
+ # TODO: verify home hash or clean home
+
+ # create home dir for application
+ self.node.mkdir(self.home)
+
# upload sources
- # build
- # Install stuff!!
- # upload app command
- pass
+ self.upload_sources()
+
+ # upload code
+ self.upload_code()
+
+ # install dependencies
+ self.install_dependencies()
+
+ # build
+ self.build()
+
+ # Install
+ self.install()
+
+ super(LinuxApplication, self).provision()
+
+ def upload_sources(self):
+ # check if sources need to be uploaded and upload them
+ sources = self.get("sources")
+ if sources:
+ self.logger.debug(" Uploading sources %s" % sources)
+
+ # create dir for sources
+ self.node.mkdir(self.src_dir)
+
+ sources = self.sources.split(' ')
+
+ http_sources = list()
+ for source in list(sources):
+ if source.startswith("http") or source.startswith("https"):
+ http_sources.append(source)
+ sources.remove(source)
+
+ # Download http sources
+ for source in http_sources:
+ dst = os.path.join(self.src_dir, source.split("/")[-1])
+ command = "wget -o %s %s" % (dst, source)
+ self.node.execute(command)
+
+ self.node.upload(sources, self.src_dir)
+
+ def upload_code(self):
+ code = self.get("code")
+ if code:
+ # create dir for sources
+ self.node.mkdir(self.src_dir)
+
+ self.logger.debug(" Uploading code '%s'" % code)
+
+ dst = os.path.join(self.src_dir, "code")
+ self.node.upload(sources, dst, text = True)
+
+ def install_dependencies(self):
+ depends = self.get("depends")
+ if depends:
+ self.logger.debug(" Installing dependencies %s" % depends)
+ self.node.install_packages(depends, home = self.home)
+
+ def build(self):
+ build = self.get("build")
+ if build:
+ self.logger.debug(" Building sources '%s'" % build)
+
+ # create dir for build
+ self.node.mkdir(self.build_dir)
+
+ cmd = self.replace_paths(build)
+
+ (out, err), proc = self.run_and_wait(cmd, self.home,
+ pidfile = "build_pid",
+ stdout = "build_log",
+ stderr = "build_err",
+ raise_on_error = True)
+
+ def install(self):
+ install = self.get("install")
+ if install:
+ self.logger.debug(" Installing sources '%s'" % install)
+
+ cmd = self.replace_paths(install)
+
+ (out, err), proc = self.run_and_wait(cmd, self.home,
+ pidfile = "install_pid",
+ stdout = "install_log",
+ stderr = "install_err",
+ raise_on_error = True)
def deploy(self):
# Wait until node is associated and deployed
- self.provision()
- pass
+ node = self.node
+ if not node or node.state < ResourceState.READY:
+ self.ec.schedule(DELAY, self.deploy)
+ else:
+ self.discover()
+ self.provision()
+
+ super(LinuxApplication, self).deploy()
def start(self):
- dst = os.path.join(self.home, "app.sh")
-
- # Create shell script with the command
- # This way, complex commands and scripts can be ran seamlessly
- # sync files
- cmd = ""
+ command = self.replace_paths(self.get("command"))
env = self.get("env")
- if env:
- for envkey, envvals in env.iteritems():
- for envval in envvals:
- cmd += 'export %s=%s\n' % (envkey, envval)
+ stdin = 'stdin' if self.get("stdin") else None
+ sudo = self.get('sudo') or False
+ x11 = self.get("forwardX11") or False
+ err_msg = "Failed to run command %s on host %s" % (
+ command, self.node.get("hostname"))
+ failed = False
- cmd += self.get("command")
- self.api.upload(cmd, dst)
+ super(LinuxApplication, self).start()
- command = 'bash ./app.sh'
- stdin = 'stdin' if self.get("stdin") else None
- self.api.run(command, self.home, stdin = stdin)
- self._pid, self._ppid = self.api.checkpid(self.app_home)
+ if x11:
+ (out, err), proc = self.node.execute(command,
+ sudo = sudo,
+ stdin = stdin,
+ stdout = 'stdout',
+ stderr = 'stderr',
+ env = env,
+ forward_x11 = x11)
+
+ if proc.poll() and err:
+ failed = True
+ else:
+ (out, err), proc = self.node.run(command, self.home,
+ stdin = stdin,
+ sudo = sudo)
+
+ if proc.poll() and err:
+ failed = True
+
+ if not failed:
+ pid, ppid = self.node.wait_pid(home = self.home)
+ if pid: self._pid = int(pid)
+ if ppid: self._ppid = int(ppid)
+
+ if not self.pid or not self.ppid:
+ failed = True
+
+ (out, chkerr), proc = self.node.check_output(self.home, 'stderr')
+
+ if failed or out or chkerr:
+ # check if execution errors occurred
+ if err:
+ err_msg = "%s. Proc error: %s" % (err_msg, err)
+
+ err_msg = "%s. Run error: %s " % (err_msg, out)
+
+ if chkerr:
+ err_msg = "%s. Failed to check error: %s" % (err_msg, chkerr)
+
+ self.logger.error(err_msg)
+ self.state = ResourceState.FAILED
def stop(self):
- # Kill
- self._state = ResourceState.STOPPED
+ state = self.state
+ if state == ResourceState.STARTED:
+ (out, err), proc = self.node.kill(self.pid, self.ppid)
+
+ if out or err:
+ # check if execution errors occurred
+ err_msg = " Failed to STOP command '%s' on host %s. Check error: %s. Run error: %s" % (
+ self.get("command"), self.node.get("hostname"), err, out)
+ self.logger.error(err_msg)
+ self._state = ResourceState.FAILED
+ stopped = False
+ else:
+ super(LinuxApplication, self).stop()
def release(self):
tear_down = self.get("tearDown")
if tear_down:
- self.api.execute(tear_down)
+ self.node.execute(tear_down)
- return self.api.kill(self.pid, self.ppid)
+ self.stop()
+ if self.state == ResourceState.STOPPED:
+ super(LinuxApplication, self).release()
+
+ @property
+ def state(self):
+ if self._state == ResourceState.STARTED:
+ (out, err), proc = self.node.check_output(self.home, 'stderr')
- def status(self):
- return self.api.status(self.pid, self.ppid)
+ if out or err:
+ # check if execution errors occurred
+ err_msg = " Failed to execute command '%s' on host %s. Check error: %s. Run error: %s" % (
+ self.get("command"), self.node.get("hostname"), err, out)
+ self.logger.error(err_msg)
+ self._state = ResourceState.FAILED
- def make_app_home(self):
- self.api.mkdir(self.home)
+ elif self.pid and self.ppid:
+ status = self.node.status(self.pid, self.ppid)
- stdin = self.get("stdin")
- if stdin:
- self.api.upload(stdin, os.path.join(self.home, 'stdin'))
+ if status == sshfuncs.FINISHED:
+ self._state = ResourceState.FINISHED
- def _validate_connection(self, guid):
+ return self._state
+
+ def valid_connection(self, guid):
# TODO: Validate!
return True
# XXX: What if it is connected to more than one node?
self._node = resources[0] if len(resources) == 1 else None
return self._node
+ def hash_app(self):
+ """ Generates a hash representing univokely the application.
+ Is used to determine whether the home directory should be cleaned
+ or not.
+
+ """
+ command = self.get("command")
+ forwards_x11 = self.get("forwardX11")
+ env = self.get("env")
+ sudo = self.get("sudo")
+ depends = self.get("depends")
+ sources = self.get("sources")
+ cls._register_attribute(sources)
+ cls._register_attribute(build)
+ cls._register_attribute(install)
+ cls._register_attribute(stdin)
+ cls._register_attribute(stdout)
+ cls._register_attribute(stderr)
+ cls._register_attribute(tear_down)
+ skey = "".join(map(str, args))
+ return hashlib.md5(skey).hexdigest()
+
+ def replace_paths(self, command):
+ """
+ Replace all special path tags with shell-escaped actual paths.
+ """
+ return ( command
+ .replace("${SOURCES}", self.src_dir)
+ .replace("${BUILD}", self.build_dir)
+ .replace("${APPHOME}", self.home)
+ .replace("${NODEHOME}", self.node.home) )
+
+
--- /dev/null
+from neco.execution.attribute import Attribute, Flags
+from neco.execution.resource import ResourceManager, clsinit, ResourceState
+from neco.resources.linux.node import LinuxNode
+
+import collections
+import logging
+import os
+import random
+import re
+import tempfile
+import time
+import threading
+
+@clsinit
+class LinuxChannel(ResourceManager):
+ _rtype = "LinuxChannel"
+
+ def __init__(self, ec, guid):
+ super(LinuxChannel, self).__init__(ec, guid)
+ self._logger = logging.getLogger("neco.linux.Channel.%d " % self.guid)
+
+ def valid_connection(self, guid):
+ # TODO: Validate!
+ return True
--- /dev/null
+# TODO: Investigate using http://nixos.org/nix/
+
+def install_packages_command(os, packages):
+ if not isinstance(packages, list):
+ packages = [packages]
+
+ cmd = ""
+ for p in packages:
+ cmd += " ( dpkg -s %(package)s || sudo apt-get -y install %(package)s ) ; " % {
+ 'package': p}
+
+ #cmd = (dpkg -s vim || sudo dpkg -s install vim) ; (...)
+ return cmd
+
+def remove_packages_command(os, packages):
+ if not isinstance(packages, list):
+ packages = [packages]
+
+ cmd = ""
+ for p in packages:
+ cmd += " ( dpkg -s %(package)s && sudo apt-get -y purge %(package)s ) ; " % {
+ 'package': p}
+
+ #cmd = (dpkg -s vim || sudo apt-get -y purge vim) ; (...)
+ return cmd
+
import threading
# TODO: Verify files and dirs exists already
+# TODO: Blacklist node!
+
+DELAY ="1s"
@clsinit
class LinuxNode(ResourceManager):
@classmethod
def _register_attributes(cls):
- hostname = Attribute("hostname", "Hostname of the machine")
+ hostname = Attribute("hostname", "Hostname of the machine",
+ flags = Flags.ExecReadOnly)
username = Attribute("username", "Local account username",
flags = Flags.Credential)
- port = Attribute("port", "SSH port", flags = Flags.Credential)
+ port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
- home = Attribute("home",
- "Experiment home directory to store all experiment related files")
+ home = Attribute("home",
+ "Experiment home directory to store all experiment related files",
+ flags = Flags.ExecReadOnly)
identity = Attribute("identity", "SSH identity file",
flags = Flags.Credential)
server_key = Attribute("serverKey", "Server public key",
- flags = Flags.Credential)
+ flags = Flags.ExecReadOnly)
clean_home = Attribute("cleanHome", "Remove all files and directories " + \
" from home folder before starting experiment",
- flags = Flags.ReadOnly)
+ flags = Flags.ExecReadOnly)
clean_processes = Attribute("cleanProcesses",
- "Kill all running processes before starting experiment",
- flags = Flags.ReadOnly)
+ "Kill all running processes before starting experiment",
+ flags = Flags.ExecReadOnly)
tear_down = Attribute("tearDown", "Bash script to be executed before " + \
- "releasing the resource", flags = Flags.ReadOnly)
+ "releasing the resource",
+ flags = Flags.ExecReadOnly)
cls._register_attribute(hostname)
cls._register_attribute(username)
def __init__(self, ec, guid):
super(LinuxNode, self).__init__(ec, guid)
self._os = None
- self._home = "nepi-exp-%s" % os.urandom(8).encode('hex')
# lock to avoid concurrency issues on methods used by applications
self._lock = threading.Lock()
@property
def home(self):
- home = self.get("home")
- if home and not home.startswith("nepi-"):
- home = "nepi-" + home
- return home or self._home
+ return self.get("home") or "/tmp"
+
+ @property
+ def exp_dir(self):
+ exp_dir = os.path.join(self.home, self.ec.exp_id)
+ return exp_dir if exp_dir.startswith('/') else "${HOME}/"
+
+ @property
+ def node_dir(self):
+ node_dir = "node-%d" % self.guid
+ return os.path.join(self.exp_dir, node_dir)
@property
def os(self):
self.logger.error("Deploy failed. Unresponsive node")
return
- def deploy(self):
- self.provision()
-
if self.get("cleanProcesses"):
self.clean_processes()
if self.get("cleanHome"):
- # self.clean_home() -> this is dangerous
- pass
+ self.clean_home()
+
+ self.mkdir(self.node_dir)
- self.mkdir(self.home)
+ super(LinuxNode, self).provision()
+
+ def deploy(self):
+ if self.state == ResourceState.NEW:
+ self.discover()
+ self.provision()
+
+ # Node needs to wait until all associated interfaces are
+ # ready before it can finalize deployment
+ from neco.resources.linux.interface import LinuxInterface
+ ifaces = self.get_connected(LinuxInterface.rtype())
+ for iface in ifaces:
+ if iface.state < ResourceState.READY:
+ self.ec.schedule(DELAY, self.deploy)
+ return
super(LinuxNode, self).deploy()
super(LinuxNode, self).release()
- def validate_connection(self, guid):
+ def valid_connection(self, guid):
# TODO: Validate!
return True
out = err = ""
with self._lock:
- (out, err), proc = self.run_and_wait(cmd, self.home,
- pidfile = "cppid",
- stdout = "cplog",
- stderr = "cperr",
- raise_on_error = True)
-
- return (out, err)
+ (out, err), proc = self.execute(cmd)
def clean_home(self):
self.logger.info("Cleaning up home")
- cmd = "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \) -execdir rm -rf {} + "
+ cmd = ("cd %s ; " % self.home +
+ "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)"+
+ " -execdir rm -rf {} + ")
out = err = ""
with self._lock:
- (out, err), proc = self.run_and_wait(cmd, self.home,
- pidfile = "chpid",
- stdout = "chlog",
- stderr = "cherr",
- raise_on_error = True)
-
- return (out, err)
+ (out, err), proc = self.execute(cmd)
- def upload(self, src, dst):
+ def upload(self, src, dst, text = False):
""" Copy content to destination
- src content to copy. Can be a local file, directory or text input
+ src content to copy. Can be a local file, directory or a list of files
dst destination path on the remote host (remote is always self.host)
+
+ text src is text input, it must be stored into a temp file before uploading
"""
# If source is a string input
- if not os.path.isfile(src):
+ f = None
+ if text and not os.path.isfile(src):
# src is text input that should be uploaded as file
# create a temporal file with the content to upload
f = tempfile.NamedTemporaryFile(delete=False)
# Build destination as <user>@<server>:<path>
dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
- return self.copy(src, dst)
+ result = self.copy(src, dst)
+
+ # clean up temp file
+ if f:
+ os.remove(f.name)
+
+ return result
def download(self, src, dst):
if not self.localhost:
src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
return self.copy(src, dst)
- def install_packages(self, packages):
+ def install_packages(self, packages, home = None):
+ home = home or self.node_dir
+
cmd = ""
if self.os in ["f12", "f14"]:
cmd = rpmfuncs.install_packages_command(self.os, packages)
out = err = ""
with self._lock:
- (out, err), proc = self.run_and_wait(cmd, self.home,
- pidfile = "instpkgpid",
- stdout = "instpkglog",
- stderr = "instpkgerr",
+ (out, err), proc = self.run_and_wait(cmd, home,
+ pidfile = "instpkg_pid",
+ stdout = "instpkg_log",
+ stderr = "instpkg_err",
raise_on_error = True)
return (out, err), proc
- def remove_packages(self, packages):
+ def remove_packages(self, packages, home = None):
+ home = home or self.node_dir
+
cmd = ""
if self.os in ["f12", "f14"]:
cmd = rpmfuncs.remove_packages_command(self.os, packages)
out = err = ""
with self._lock:
- (out, err), proc = self.run_and_wait(cmd, self.home,
- pidfile = "rmpkgpid",
- stdout = "rmpkglog",
- stderr = "rmpkgerr",
+ (out, err), proc = self.run_and_wait(cmd, home,
+ pidfile = "rmpkg_pid",
+ stdout = "rmpkg_log",
+ stderr = "rmpkg_err",
raise_on_error = True)
return (out, err), proc
stderr = 'stderr',
sudo = False,
raise_on_error = False):
-
+ """ runs a command in background on the remote host, but waits
+ until the command finishes execution.
+ This is more robust than doing a simple synchronized 'execute',
+ since in the remote host the command can continue to run detached
+ even if network disconnections occur
+ """
+ # run command in background in remote host
(out, err), proc = self.run(command, home,
pidfile = pidfile,
stdin = stdin,
stderr = stderr,
sudo = sudo)
+ # check no errors occurred
if proc.poll() and err:
msg = " Failed to run command %s on host %s" % (
command, self.get("hostname"))
self.logger.error(msg)
if raise_on_error:
raise RuntimeError, msg
-
+
+ # Wait for pid file to be generated
pid, ppid = self.wait_pid(
home = home,
pidfile = pidfile,
raise_on_error = raise_on_error)
+ # wait until command finishes to execute
self.wait_run(pid, ppid)
-
- (out, err), proc = self.check_run_error(home, stderr)
+
+ # check if execution errors occurred
+ (out, err), proc = self.check_output(home, stderr)
if err or out:
msg = "Error while running command %s on host %s. error output: %s" % (
return (out, err), proc
def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
+ """ Waits until the pid file for the command is generated,
+ and returns the pid and ppid of the process """
pid = ppid = None
delay = 1.0
for i in xrange(5):
return pid, ppid
def wait_run(self, pid, ppid, trial = 0):
+ """ wait for a remote process to finish execution """
delay = 1.0
first = True
bustspin = 0
delay = min(30,delay*1.2)
bustspin = 0
- def check_run_error(self, home, stderr = 'stderr'):
+ def check_output(self, home, filename):
+ """ checks file content """
(out, err), proc = self.execute("cat %s" %
- os.path.join(home, stderr))
+ os.path.join(home, filename))
return (out, err), proc
- def check_run_output(self, home, stdout = 'stdout'):
- (out, err), proc = self.execute("cat %s" %
- os.path.join(home, stdout))
- return (out, err), proc
-
-
def is_alive(self):
if self.localhost:
return True
self.logger.error("%s. out: %s error: %s", fail_msg, out, err)
break
except RuntimeError, e:
- if x >= 3:
+ if i >= 3:
self.logger.error("%s. error: %s", fail_msg, e.args)
return (out, err), proc
--- /dev/null
+RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
+RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
+
+# TODO: Investigate using http://nixos.org/nix/
+
+def install_packages_command(os, packages):
+ if not isinstance(packages, list):
+ packages = [packages]
+
+ cmd = "( %s )" % install_rpmfusion_command(os)
+ for p in packages:
+ cmd += " ; ( rpm -q %(package)s || sudo yum -y install %(package)s ) " % {
+ 'package': p}
+
+ #cmd = ((rpm -q rpmfusion-free-release || sudo -s rpm -i ...) ; (rpm -q vim || sudo yum -y install vim))
+ return " ( %s )" % cmd
+
+def remove_packages_command(os, packages):
+ if not isinstance(packages, list):
+ packages = [packages]
+
+ cmd = ""
+ for p in packages:
+ cmd += " ( rpm -q %(package)s && sudo yum -y remove %(package)s ) ; " % {
+ 'package': p}
+
+ #cmd = (rpm -q vim || sudo yum -y remove vim) ; (...)
+ return cmd
+
+def install_rpmfusion_command(os):
+ cmd = "rpm -q rpmfusion-free-release || sudo -S rpm -i %(package)s"
+
+ if os == "f12":
+ cmd = cmd % {'package': RPM_FUSION_URL_F12}
+ elif os == "f14":
+ # This one works for f13+
+ cmd = cmd % {'package': RPM_FUSION_URL}
+ else:
+ cmd = ""
+
+ return cmd
+
--- /dev/null
+from neco.util.sshfuncs import RUNNING, FINISHED, NOT_STARTED, STDOUT
+
+import subprocess
+
+def lexec(command,
+ user = None,
+ sudo = False,
+ stdin = None,
+ env = None):
+ """
+ Executes a local command, returns ((stdout,stderr),process)
+ """
+ if env:
+ export = ''
+ for envkey, envval in env.iteritems():
+ export += '%s=%s ' % (envkey, envval)
+ command = "%s %s" % (export, command)
+
+ if sudo:
+ command = "sudo %s" % command
+ elif user:
+ command = "su %s ; %s " % (user, command)
+
+ p = subprocess.Popen(command,
+ stdout = subprocess.PIPE,
+ stderr = subprocess.PIPE,
+ stdin = stdin)
+
+ out, err = p.communicate()
+ return ((out, err), proc)
+
+def lcopy(source, dest, recursive = False):
+ """
+ Copies from/to localy.
+ """
+
+ if TRACE:
+ print "scp", source, dest
+
+ command = ["cp"]
+ if recursive:
+ command.append("-R")
+
+ command.append(src)
+ command.append(dst)
+
+ p = subprocess.Popen(command,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+
+ out, err = p.communicate()
+ return ((out, err), proc)
+
+def lspawn(command, pidfile,
+ stdout = '/dev/null',
+ stderr = STDOUT,
+ stdin = '/dev/null',
+ home = None,
+ create_home = False,
+ sudo = False,
+ user = None):
+ """
+ Spawn a local command such that it will continue working asynchronously.
+
+ Parameters:
+ command: the command to run - it should be a single line.
+
+ pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
+
+ stdout: path of a file to redirect standard output to - must be a string.
+ Defaults to /dev/null
+ stderr: path of a file to redirect standard error to - string or the special STDOUT value
+ to redirect to the same file stdout was redirected to. Defaults to STDOUT.
+ stdin: path of a file with input to be piped into the command's standard input
+
+ home: path of a folder to use as working directory - should exist, unless you specify create_home
+
+ create_home: if True, the home folder will be created first with mkdir -p
+
+ sudo: whether the command needs to be executed as root
+
+ Returns:
+ (stdout, stderr), process
+
+ Of the spawning process, which only captures errors at spawning time.
+ Usually only useful for diagnostics.
+ """
+ # Start process in a "daemonized" way, using nohup and heavy
+ # stdin/out redirection to avoid connection issues
+ if stderr is STDOUT:
+ stderr = '&1'
+ else:
+ stderr = ' ' + stderr
+
+ daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
+ 'command' : command,
+ 'pidfile' : shell_escape(pidfile),
+ 'stdout' : stdout,
+ 'stderr' : stderr,
+ 'stdin' : stdin,
+ }
+
+ cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
+ 'command' : shell_escape(daemon_command),
+ 'sudo' : 'sudo -S' if sudo else '',
+ 'pidfile' : shell_escape(pidfile),
+ 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
+ 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home else '',
+ }
+
+ (out,err),proc = lexec(cmd)
+
+ if proc.wait():
+ raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
+
+ return (out,err),proc
+
+def lcheckpid(pidfile):
+ """
+ Check the pidfile of a process spawned with remote_spawn.
+
+ Parameters:
+ pidfile: the pidfile passed to remote_span
+
+ Returns:
+
+ A (pid, ppid) tuple useful for calling remote_status and remote_kill,
+ or None if the pidfile isn't valid yet (maybe the process is still starting).
+ """
+
+ (out,err),proc = lexec("cat %s" % pidfile )
+
+ if proc.wait():
+ return None
+
+ if out:
+ try:
+ return map(int,out.strip().split(' ',1))
+ except:
+ # Ignore, many ways to fail that don't matter that much
+ return None
+
+def lstatus(pid, ppid):
+ """
+ Check the status of a process spawned with remote_spawn.
+
+ Parameters:
+ pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
+
+ Returns:
+
+ One of NOT_STARTED, RUNNING, FINISHED
+ """
+
+ (out,err),proc = lexec(
+ # Check only by pid. pid+ppid does not always work (especially with sudo)
+ " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
+ 'ppid' : ppid,
+ 'pid' : pid,
+ })
+
+ if proc.wait():
+ return NOT_STARTED
+
+ status = False
+ if out:
+ status = (out.strip() == 'wait')
+ else:
+ return NOT_STARTED
+ return RUNNING if status else FINISHED
+
+
+def lkill(pid, ppid, sudo = False):
+ """
+ Kill a process spawned with lspawn.
+
+ First tries a SIGTERM, and if the process does not end in 10 seconds,
+ it sends a SIGKILL.
+
+ Parameters:
+ pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
+
+ sudo: whether the command was run with sudo - careful killing like this.
+
+ Returns:
+
+ Nothing, should have killed the process
+ """
+
+ subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
+ cmd = """
+SUBKILL="%(subkill)s" ;
+%(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
+%(sudo)s kill %(pid)d $SUBKILL || /bin/true
+for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
+ sleep 0.2
+ if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
+ break
+ else
+ %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
+ %(sudo)s kill %(pid)d $SUBKILL || /bin/true
+ fi
+ sleep 1.8
+done
+if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
+ %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
+ %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
+fi
+"""
+ if nowait:
+ cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
+
+ (out,err),proc = lexec(
+ cmd % {
+ 'ppid' : ppid,
+ 'pid' : pid,
+ 'sudo' : 'sudo -S' if sudo else '',
+ 'subkill' : subkill,
+ })
+
+
self.assertEquals(len(ResourceFactory.resource_types()), 2)
-def get_connected(connections, rtype, ec):
- connected = []
- for guid in connections:
- rm = ec.get_resource(guid)
- if rm.rtype() == rtype:
- connected.append(rm)
- return connected
-
class Channel(ResourceManager):
_rtype = "Channel"
super(Interface, self).__init__(ec, guid)
def deploy(self):
- node = get_connected(self.connections, Node.rtype(), self.ec)[0]
- chan = get_connected(self.connections, Channel.rtype(), self.ec)[0]
+ node = self.get_connected(Node.rtype())[0]
+ chan = self.get_connected(Channel.rtype())[0]
if node.state < ResourceState.PROVISIONED:
self.ec.schedule("0.5s", self.deploy)
self.logger.debug(" -------- PROVISIONED ------- ")
self.ec.schedule("3s", self.deploy)
elif self.state == ResourceState.PROVISIONED:
- ifaces = get_connected(self.connections, Interface.rtype(), self.ec)
+ ifaces = self.get_connected(Interface.rtype())
for rm in ifaces:
if rm.state < ResourceState.READY:
self.ec.schedule("0.5s", self.deploy)
super(Application, self).__init__(ec, guid)
def deploy(self):
- node = get_connected(self.connections, Node.rtype(), self.ec)[0]
+ node = self.get_connected(Node.rtype())[0]
if node.state < ResourceState.READY:
self.ec.schedule("0.5s", self.deploy)
else:
--- /dev/null
+#!/usr/bin/env python
+from neco.execution.ec import ExperimentController
+from neco.execution.resource import ResourceState
+from neco.execution.trace import TraceAttr
+from neco.resources.linux.node import LinuxNode
+from neco.resources.linux.application import LinuxApplication
+
+from test_utils import skipIfNotAlive
+
+import os
+import time
+import tempfile
+import unittest
+
+class LinuxApplicationTestCase(unittest.TestCase):
+ def setUp(self):
+ self.fedora_host = 'nepi2.pl.sophia.inria.fr'
+ self.fedora_user = 'inria_nepi'
+
+ self.ubuntu_host = 'roseval.pl.sophia.inria.fr'
+ self.ubuntu_user = 'alina'
+
+ self.target = 'nepi5.pl.sophia.inria.fr'
+
+ @skipIfNotAlive
+ def t_ping(self, host, user):
+ from neco.execution.resource import ResourceFactory
+
+ ResourceFactory.register_type(LinuxNode)
+ ResourceFactory.register_type(LinuxApplication)
+
+ ec = ExperimentController()
+
+ node = ec.register_resource("LinuxNode")
+ ec.set(node, "hostname", host)
+ ec.set(node, "username", user)
+ ec.set(node, "cleanHome", True)
+ ec.set(node, "cleanProcesses", True)
+
+ app = ec.register_resource("LinuxApplication")
+ cmd = "ping -c5 %s" % self.target
+ ec.set(app, "command", cmd)
+
+ ec.register_connection(app, node)
+
+ try:
+ ec.deploy()
+
+ while not ec.state(app) == ResourceState.FINISHED:
+ time.sleep(0.5)
+
+ self.assertTrue(ec.state(node) == ResourceState.STARTED)
+ self.assertTrue(ec.state(app) == ResourceState.FINISHED)
+
+ stdout = ec.trace(app, 'stdout')
+ size = ec.trace(app, 'stdout', attr = TraceAttr.SIZE)
+ self.assertEquals(len(stdout), size)
+
+ block = ec.trace(app, 'stdout', attr = TraceAttr.STREAM, block = 5, offset = 1)
+ self.assertEquals(block, stdout[5:10])
+
+ path = ec.trace(app, 'stdout', attr = TraceAttr.PATH)
+ rm = ec.get_resource(app)
+ p = os.path.join(rm.home, 'stdout')
+ self.assertEquals(path, p)
+
+ finally:
+ ec.shutdown()
+
+ def test_deploy_fedora(self):
+ self.t_ping(self.fedora_host, self.fedora_user)
+
+ def test_deploy_ubuntu(self):
+ self.t_ping(self.ubuntu_host, self.ubuntu_user)
+
+
+if __name__ == '__main__':
+ unittest.main()
+
--- /dev/null
+#!/usr/bin/env python
+from neco.execution.ec import ExperimentController
+from neco.execution.resource import ResourceState
+from neco.resources.linux.node import LinuxNode
+from neco.resources.linux.interface import LinuxInterface
+from neco.resources.linux.channel import LinuxChannel
+from neco.util.sshfuncs import RUNNING, FINISHED
+
+from test_utils import skipIfNotAlive
+
+import os
+import time
+import tempfile
+import unittest
+
+class LinuxInterfaceTestCase(unittest.TestCase):
+ def setUp(self):
+ self.fedora_host = 'nepi2.pl.sophia.inria.fr'
+ self.fedora_user = 'inria_nepi'
+
+ self.ubuntu_host = 'roseval.pl.sophia.inria.fr'
+ self.ubuntu_user = 'alina'
+
+ @skipIfNotAlive
+ def t_deploy(self, host, user):
+ from neco.execution.resource import ResourceFactory
+
+ ResourceFactory.register_type(LinuxNode)
+ ResourceFactory.register_type(LinuxInterface)
+ ResourceFactory.register_type(LinuxChannel)
+
+ ec = ExperimentController()
+
+ node = ec.register_resource("LinuxNode")
+ ec.set(node, "hostname", host)
+ ec.set(node, "username", user)
+
+ iface = ec.register_resource("LinuxInterface")
+ chan = ec.register_resource("LinuxChannel")
+
+ ec.register_connection(iface, node)
+ ec.register_connection(iface, chan)
+
+ try:
+ ec.deploy()
+
+ while not all([ ec.state(guid) == ResourceState.STARTED \
+ for guid in [node, iface]]):
+ time.sleep(0.5)
+
+ self.assertTrue(ec.state(node) == ResourceState.STARTED)
+ self.assertTrue(ec.state(iface) == ResourceState.STARTED)
+ self.assertTrue(ec.get(iface, "deviceName") == "eth0")
+
+ finally:
+ ec.shutdown()
+
+ def test_deploy_fedora(self):
+ self.t_deploy(self.fedora_host, self.fedora_user)
+
+ def test_deploy_ubuntu(self):
+ self.t_deploy(self.ubuntu_host, self.ubuntu_user)
+
+
+if __name__ == '__main__':
+ unittest.main()
+
from neco.resources.linux.node import LinuxNode
from neco.util.sshfuncs import RUNNING, FINISHED
+from test_utils import skipIfNotAlive, skipInteractive, create_node
+
import os
import time
import tempfile
import unittest
-def skipIfNotAlive(func):
- name = func.__name__
- def wrapped(*args, **kwargs):
- node = args[1]
-
- if not node.is_alive():
- print "*** WARNING: Skipping test %s: Node %s is not alive\n" % (
- name, node.get("hostname"))
- return
-
- return func(*args, **kwargs)
-
- return wrapped
-
-def skipInteractive(func):
- name = func.__name__
- def wrapped(*args, **kwargs):
- mode = os.environ.get("NEPI_INTERACTIVE", False) in ['True', 'true', 'yes', 'YES']
- if not mode:
- print "*** WARNING: Skipping test %s: Interactive mode off \n" % name
- return
-
- return func(*args, **kwargs)
-
- return wrapped
-
-class DummyEC(object):
- pass
-
class LinuxNodeTestCase(unittest.TestCase):
def setUp(self):
- host = 'nepi2.pl.sophia.inria.fr'
- user = 'inria_nepi'
- self.node_fedora = self.create_node(host, user)
+ self.fedora_host = 'nepi2.pl.sophia.inria.fr'
+ self.fedora_user = 'inria_nepi'
- host = 'roseval.pl.sophia.inria.fr'
- user = 'alina'
- self.node_ubuntu = self.create_node(host, user)
+ self.ubuntu_host = 'roseval.pl.sophia.inria.fr'
+ self.ubuntu_user = 'alina'
self.target = 'nepi5.pl.sophia.inria.fr'
- self.home = '/tmp/nepi-home/test-app'
-
- def create_node(self, host, user):
- ec = DummyEC()
-
- node = LinuxNode(ec, 1)
- node.set("hostname", host)
- node.set("username", user)
-
- return node
@skipIfNotAlive
- def t_xterm(self, node):
+ def t_xterm(self, host, user):
+ node, ec = create_node(host, user)
+
node.install_packages('xterm')
(out, err), proc = node.execute('xterm', forward_x11 = True)
self.assertEquals(out, "")
@skipIfNotAlive
- def t_execute(self, node):
+ def t_execute(self, host, user):
+ node, ec = create_node(host, user)
+
command = "ping -qc3 %s" % self.target
(out, err), proc = node.execute(command)
self.assertTrue(out.find(expected) > 0)
@skipIfNotAlive
- def t_run(self, node):
- node.mkdir(self.home, clean = True)
+ def t_run(self, host, user):
+ node, ec = create_node(host, user)
- command = "ping %s" % self.target
- dst = os.path.join(self.home, "app.sh")
- node.upload(command, dst)
+ app_home = os.path.join(node.exp_dir, "my-app")
+ node.mkdir(app_home, clean = True)
- cmd = "bash ./app.sh"
- node.run(cmd, self.home)
- pid, ppid = node.checkpid(self.home)
+ command = "ping %s" % self.target
+ node.run(command, app_home)
+ pid, ppid = node.checkpid(app_home)
status = node.status(pid, ppid)
self.assertTrue(status, RUNNING)
status = node.status(pid, ppid)
self.assertTrue(status, FINISHED)
- (out, err), proc = node.check_run_output(self.home)
+ (out, err), proc = node.check_output(app_home, "stdout")
expected = """64 bytes from"""
self.assertTrue(out.find(expected) > 0)
- node.rmdir(self.home)
+ node.rmdir(app_home)
@skipIfNotAlive
- def t_install(self, node):
- node.mkdir(self.home, clean = True)
+ def t_install(self, host, user):
+ node, ec = create_node(host, user)
+
+ app_home = os.path.join(node.exp_dir, "my-app")
+ node.mkdir(app_home, clean = True)
prog = """#include <stdio.h>
}
"""
# upload the test program
- dst = os.path.join(self.home, "hello.c")
- node.upload(prog, dst)
+ dst = os.path.join(app_home, "hello.c")
+ node.upload(prog, dst, text = True)
# install gcc
node.install_packages('gcc')
# compile the program using gcc
- command = "cd %s; gcc -Wall hello.c -o hello" % self.home
+ command = "cd %s; gcc -Wall hello.c -o hello" % app_home
(out, err), proc = node.execute(command)
# execute the program and get the output from stdout
- command = "%s/hello" % self.home
+ command = "%s/hello" % app_home
(out, err), proc = node.execute(command)
self.assertEquals(out, "Hello, world!\n")
# execute the program and get the output from a file
- command = "%(home)s/hello > %(home)s/hello.out" % {'home':self.home}
+ command = "%(home)s/hello > %(home)s/hello.out" % {
+ 'home': app_home}
(out, err), proc = node.execute(command)
# retrieve the output file
- src = os.path.join(self.home, "hello.out")
+ src = os.path.join(app_home, "hello.out")
f = tempfile.NamedTemporaryFile(delete=False)
dst = f.name
node.download(src, dst)
f.close()
node.remove_packages('gcc')
- node.rmdir(self.home)
+ node.rmdir(app_home)
f = open(dst, "r")
out = f.read()
self.assertEquals(out, "Hello, world!\n")
def test_execute_fedora(self):
- self.t_execute(self.node_fedora)
+ self.t_execute(self.fedora_host, self.fedora_user)
def test_execute_ubuntu(self):
- self.t_execute(self.node_ubuntu)
+ self.t_execute(self.ubuntu_host, self.ubuntu_user)
def test_run_fedora(self):
- self.t_run(self.node_fedora)
+ self.t_run(self.fedora_host, self.fedora_user)
def test_run_ubuntu(self):
- self.t_run(self.node_ubuntu)
+ self.t_run(self.ubuntu_host, self.ubuntu_user)
def test_intall_fedora(self):
- self.t_install(self.node_fedora)
+ self.t_install(self.fedora_host, self.fedora_user)
def test_install_ubuntu(self):
- self.t_install(self.node_ubuntu)
+ self.t_install(self.ubuntu_host, self.ubuntu_user)
@skipInteractive
def test_xterm_ubuntu(self):
""" Interactive test. Should not run automatically """
- self.t_xterm(self.node_ubuntu)
+ self.t_xterm(self.ubuntu_host, self.ubuntu_user)
if __name__ == '__main__':
--- /dev/null
+from neco.resources.linux.node import LinuxNode
+
+import os
+
+class DummyEC(object):
+ @property
+ def exp_id(self):
+ return "nepi-1"
+
+def create_node(hostname, username):
+ ec = DummyEC()
+ node = LinuxNode(ec, 1)
+ node.set("hostname", hostname)
+ node.set("username", username)
+
+ # If we don't return the reference to the EC
+ # it will be released by the garbage collector since
+ # the resources only save a weak refernce to it.
+ return node, ec
+
+def skipIfNotAlive(func):
+ name = func.__name__
+ def wrapped(*args, **kwargs):
+ node, ec = create_node(args[1], args[2])
+
+ if not node.is_alive():
+ print "*** WARNING: Skipping test %s: Node %s is not alive\n" % (
+ name, node.get("hostname"))
+ return
+
+ return func(*args, **kwargs)
+
+ return wrapped
+
+def skipInteractive(func):
+ name = func.__name__
+ def wrapped(*args, **kwargs):
+ mode = os.environ.get("NEPI_INTERACTIVE", False) in ['True', 'true', 'yes', 'YES']
+ if not mode:
+ print "*** WARNING: Skipping test %s: Interactive mode off \n" % name
+ return
+
+ return func(*args, **kwargs)
+
+ return wrapped
+
+