Added Linux Application
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Sun, 5 May 2013 18:49:13 +0000 (20:49 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Sun, 5 May 2013 18:49:13 +0000 (20:49 +0200)
15 files changed:
src/neco/execution/attribute.py
src/neco/execution/ec.py
src/neco/execution/resource.py
src/neco/execution/trace.py [new file with mode: 0644]
src/neco/resources/linux/application.py
src/neco/resources/linux/channel.py [new file with mode: 0644]
src/neco/resources/linux/debfuncs.py [new file with mode: 0644]
src/neco/resources/linux/node.py
src/neco/resources/linux/rpmfuncs.py [new file with mode: 0644]
src/neco/util/execfuncs.py [new file with mode: 0644]
test/execution/resource.py
test/resources/linux/application.py [new file with mode: 0644]
test/resources/linux/interface.py [new file with mode: 0644]
test/resources/linux/node.py
test/resources/linux/test_utils.py [new file with mode: 0644]

index 48d7848..1282dfe 100644 (file)
@@ -13,17 +13,24 @@ class Flags:
     NoFlags         = 0x00
     # Attribute is not modifiable by the user
     ReadOnly        = 0x01
+    # Attribute is not modifiable by the user during runtime
+    ExecReadOnly        = 0x02
     # Attribute is an access credential
-    Credential      = 0x02
+    Credential      = 0x04
 
 class Attribute(object):
     def __init__(self, name, help, type = Types.String,
-            flags = Flags.NoFlags, default = None):
+            flags = Flags.NoFlags, default = None, allowed = None,
+            set_hook = None):
         self._name = name
         self._help = help
         self._type = type
         self._flags = flags
+        self._allowed = allowed
         self._default = self._value = default
+        # callback to be invoked upon changing the 
+        # attribute value
+        self.set_hook = set_hook
 
     @property
     def name(self):
@@ -45,6 +52,10 @@ class Attribute(object):
     def flags(self):
         return self._flags
 
+    @property
+    def allowed(self):
+        return self._allowed
+
     def has_flag(self, flag):
         return (self._flags & flag) == flag
 
@@ -52,7 +63,18 @@ class Attribute(object):
         return self._value
 
     def set_value(self, value):
-        if self.is_valid_value(value):
+        valid = True
+
+        if self.type == Types.Enum:
+            valid = value in self._allowed
+        
+        valid = valid and self.is_valid_value(value)
+
+        if valid: 
+            if self.set_hook:
+                # Hook receives old value, new value
+                value = self.set_hook(self._value, value)
+
             self._value = value
         else:
             raise ValueError("Invalid value %s for attribute %s" %
index 1e55cc0..07a4ee6 100644 (file)
@@ -5,20 +5,24 @@ import time
 import threading
 
 from neco.util import guid
+from neco.util.parallel import ParallelRun
 from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
 from neco.execution.resource import ResourceFactory, ResourceAction, \
         ResourceState
 from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
-from neco.util.parallel import ParallelRun
+from neco.execution.trace import TraceAttr
 
 # TODO: use multiprocessing instead of threading
 
 class ExperimentController(object):
-    def __init__(self, root_dir = "/tmp"): 
+    def __init__(self, exp_id = None, root_dir = "/tmp"): 
         super(ExperimentController, self).__init__()
         # root directory to store files
         self._root_dir = root_dir
 
+        # experiment identifier given by the user
+        self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
+
         # generator of globally unique ids
         self._guid_generator = guid.GuidGenerator()
         
@@ -47,6 +51,12 @@ class ExperimentController(object):
     def logger(self):
         return self._logger
 
+    @property
+    def exp_id(self):
+        exp_id = self._exp_id
+        if not exp_id.startswith("nepi-"):
+            exp_id = "nepi-" + exp_id
+        return exp_id
 
     def get_task(self, tid):
         return self._tasks.get(tid)
@@ -125,6 +135,39 @@ class ExperimentController(object):
             rm = self.get_resource(guid1)
             rm.register_condition(action, group2, state, time)
 
+    def register_trace(self, guid, name):
+        """ Enable trace
+
+        :param name: Name of the trace
+        :type name: str
+        """
+        rm = self.get_resource(guid)
+        rm.register_trace(name)
+
+    def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
+        """ Get information on collected trace
+
+        :param name: Name of the trace
+        :type name: str
+
+        :param attr: Can be one of:
+                         - TraceAttr.ALL (complete trace content), 
+                         - TraceAttr.STREAM (block in bytes to read starting at offset), 
+                         - TraceAttr.PATH (full path to the trace file),
+                         - TraceAttr.SIZE (size of trace file). 
+        :type attr: str
+
+        :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
+        :type name: int
+
+        :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
+        :type name: int
+
+        :rtype: str
+        """
+        rm = self.get_resource(guid)
+        return rm.trace(name, attr, block, offset)
+
     def discover(self, guid, filters):
         rm = self.get_resource(guid)
         return rm.discover(filters)
@@ -198,13 +241,13 @@ class ExperimentController(object):
         rm = self.get_resource(guid)
         return rm.start_with_condition()
 
-    def deploy(self, group = None, wait_all_deployed = True):
+    def deploy(self, group = None, wait_all_ready = True):
         """ Deploy all resource manager in group
 
         :param group: List of guids of RMs to deploy
         :type group: list
 
-        :param wait_all_deployed: Wait until all RMs are deployed in
+        :param wait_all_ready: Wait until all RMs are ready in
             order to start the RMs
         :type guid: int
 
@@ -227,7 +270,7 @@ class ExperimentController(object):
         for guid in group:
             rm = self.get_resource(guid)
 
-            if wait_all_deployed:
+            if wait_all_ready:
                 towait = list(group)
                 towait.remove(guid)
                 self.register_condition(guid, ResourceAction.START, 
index f7010b0..7669804 100644 (file)
@@ -1,4 +1,5 @@
 from neco.util.timefuncs import strfnow, strfdiff, strfvalid
+from neco.execution.trace import TraceAttr
 
 import copy
 import functools
@@ -19,8 +20,9 @@ class ResourceState:
     READY = 3
     STARTED = 4
     STOPPED = 5
-    FAILED = 6
-    RELEASED = 7
+    FINISHED = 6
+    FAILED = 7
+    RELEASED = 8
 
 def clsinit(cls):
     cls._clsinit()
@@ -32,6 +34,7 @@ class ResourceManager(object):
     _rtype = "Resource"
     _filters = None
     _attributes = None
+    _traces = None
 
     @classmethod
     def _register_filter(cls, attr):
@@ -50,17 +53,34 @@ class ResourceManager(object):
         cls._attributes[attr.name] = attr
 
     @classmethod
-    def _register_filters(cls):
+    def _register_trace(cls, trace):
         """ Resource subclasses will invoke this method to add a 
-        filter attribute
+        resource trace
+
+        """
+        cls._traces[trace.name] = trace
+
+
+    @classmethod
+    def _register_filters(cls):
+        """ Resource subclasses will invoke this method to register 
+        resource filters
 
         """
         pass
 
     @classmethod
     def _register_attributes(cls):
-        """ Resource subclasses will invoke this method to add a 
-        resource attribute
+        """ Resource subclasses will invoke this method to register
+        resource attributes
+
+        """
+        pass
+
+    @classmethod
+    def _register_traces(cls):
+        """ Resource subclasses will invoke this method to register
+        resource traces
 
         """
         pass
@@ -81,6 +101,10 @@ class ResourceManager(object):
         cls._attributes = dict()
         cls._register_attributes()
 
+        # static template for resource traces
+        cls._traces = dict()
+        cls._register_traces()
+
     @classmethod
     def rtype(cls):
         return cls._rtype
@@ -99,6 +123,13 @@ class ResourceManager(object):
         """
         return copy.deepcopy(cls._attributes.values())
 
+    @classmethod
+    def get_traces(cls):
+        """ Returns a copy of the traces
+
+        """
+        return copy.deepcopy(cls._traces.values())
+
     def __init__(self, ec, guid):
         self._guid = guid
         self._ec = weakref.ref(ec)
@@ -106,9 +137,11 @@ class ResourceManager(object):
         self._conditions = dict() 
 
         # the resource instance gets a copy of all attributes
-        # that can modify
         self._attrs = copy.deepcopy(self._attributes)
 
+        # the resource instance gets a copy of all traces
+        self._trcs = copy.deepcopy(self._traces)
+
         self._state = ResourceState.NEW
 
         self._start_time = None
@@ -216,7 +249,6 @@ class ResourceManager(object):
         :type name: str
         :param name: Value of the attribute
         :type name: str
-        :rtype:  Boolean
         """
         attr = self._attrs[name]
         attr.value = value
@@ -231,6 +263,38 @@ class ResourceManager(object):
         attr = self._attrs[name]
         return attr.value
 
+    def register_trace(self, name):
+        """ Enable trace
+
+        :param name: Name of the trace
+        :type name: str
+        """
+        trace = self._trcs[name]
+        trace.enabled = True
+
+    def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
+        """ Get information on collected trace
+
+        :param name: Name of the trace
+        :type name: str
+
+        :param attr: Can be one of:
+                         - TraceAttr.ALL (complete trace content), 
+                         - TraceAttr.STREAM (block in bytes to read starting at offset), 
+                         - TraceAttr.PATH (full path to the trace file),
+                         - TraceAttr.SIZE (size of trace file). 
+        :type attr: str
+
+        :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
+        :type name: int
+
+        :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
+        :type name: int
+
+        :rtype: str
+        """
+        pass
+
     def register_condition(self, action, group, state, 
             time = None):
         """ Registers a condition on the resource manager to allow execution 
@@ -259,6 +323,14 @@ class ResourceManager(object):
 
         conditions.append((group, state, time))
 
+    def get_connected(self, rtype):
+        connected = []
+        for guid in self.connections:
+            rm = self.ec.get_resource(guid)
+            if rm.rtype() == rtype:
+                connected.append(rm)
+        return connected
+
     def _needs_reschedule(self, group, state, time):
         """ Internal method that verify if 'time' has elapsed since 
         all elements in 'group' have reached state 'state'.
@@ -361,6 +433,7 @@ class ResourceManager(object):
         # only can start when RM is either STOPPED or READY
         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
             reschedule = True
+            self.logger.debug("---- RESCHEDULING START ---- state %s " % self.state )
         else:
             self.logger.debug("---- START CONDITIONS ---- %s" % 
                     self.conditions.get(ResourceAction.START))
diff --git a/src/neco/execution/trace.py b/src/neco/execution/trace.py
new file mode 100644 (file)
index 0000000..382a6bb
--- /dev/null
@@ -0,0 +1,20 @@
+class TraceAttr:
+    ALL = 'all'
+    STREAM = 'stream'
+    PATH = 'path'
+    SIZE = 'size'
+
+class Trace(object):
+    def __init__(self, name, help):
+        self._name = name
+        self._help = help
+        self.enabled = False
+
+    @property
+    def name(self):
+        return self._name
+
+    @property
+    def help(self):
+        return self._help
+
index befb22d..321c696 100644 (file)
@@ -1,8 +1,15 @@
-from neco.execution.attribute import Attribute, Flags
+from neco.execution.attribute import Attribute, Flags, Types
+from neco.execution.trace import Trace, TraceAttr
 from neco.execution.resource import ResourceManager, clsinit, ResourceState
-from neco.resources.linux.ssh_api import SSHApiFactory
+from neco.resources.linux.node import LinuxNode
+from neco.util import sshfuncs 
 
 import logging
+import os
+
+DELAY ="1s"
+
+# TODO: Resolve wildcards in commands!! 
 
 @clsinit
 class LinuxApplication(ResourceManager):
@@ -11,20 +18,24 @@ class LinuxApplication(ResourceManager):
     @classmethod
     def _register_attributes(cls):
         command = Attribute("command", "Command to execute", 
-                flags = Flags.ReadOnly)
+                flags = Flags.ExecReadOnly)
         forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections", 
-                flags = Flags.ReadOnly)
+                flags = Flags.ExecReadOnly)
         env = Attribute("env", "Environment variables string for command execution",
-                flags = Flags.ReadOnly)
+                flags = Flags.ExecReadOnly)
         sudo = Attribute("sudo", "Run with root privileges", 
-                flags = Flags.ReadOnly)
+                flags = Flags.ExecReadOnly)
         depends = Attribute("depends", 
                 "Space-separated list of packages required to run the application",
-                flags = Flags.ReadOnly)
+                flags = Flags.ExecReadOnly)
         sources = Attribute("sources", 
                 "Space-separated list of regular files to be deployed in the working "
                 "path prior to building. Archives won't be expanded automatically.",
-                flags = Flags.ReadOnly)
+                flags = Flags.ExecReadOnly)
+        code = Attribute("code", 
+                "Plain text source code to be uploaded to the server. It will be stored "
+                "under ${SOURCES}/code",
+                flags = Flags.ExecReadOnly)
         build = Attribute("build", 
                 "Build commands to execute after deploying the sources. "
                 "Sources will be in the ${SOURCES} folder. "
@@ -48,12 +59,18 @@ class LinuxApplication(ResourceManager):
                 "make and other tools to install, be sure to provide them as "
                 "actual dependencies instead.",
                 flags = Flags.ReadOnly)
-        stdin = Attribute("stdin", "Standard input", flags = Flags.ReadOnly)
-        stdout = Attribute("stdout", "Standard output", flags = Flags.ReadOnly)
-        stderr = Attribute("stderr", "Standard error", flags = Flags.ReadOnly)
+        stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
+        stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
+        stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
+        update_home = Attribute("updateHome", "If application hash has changed remove old directory and"
+                "re-upload before starting experiment. If not keep the same directory", 
+                default = True,
+                type = Types.Bool, 
+                flags = Flags.ExecReadOnly)
 
-        tear_down = Attribute("tearDown", "Bash script to be executed before
-                releasing the resource", flags = Flags.ReadOnly)
+        tear_down = Attribute("tearDown", "Bash script to be executed before "
+                "releasing the resource", 
+                flags = Flags.ReadOnly)
 
         cls._register_attribute(command)
         cls._register_attribute(forward_x11)
@@ -61,29 +78,50 @@ class LinuxApplication(ResourceManager):
         cls._register_attribute(sudo)
         cls._register_attribute(depends)
         cls._register_attribute(sources)
+        cls._register_attribute(code)
         cls._register_attribute(build)
         cls._register_attribute(install)
         cls._register_attribute(stdin)
         cls._register_attribute(stdout)
         cls._register_attribute(stderr)
+        cls._register_attribute(update_home)
         cls._register_attribute(tear_down)
 
+    @classmethod
+    def _register_traces(cls):
+        stdout = Trace("stdout", "Standard output stream")
+        stderr = Trace("stderr", "Standard error stream")
+        buildlog = Trace("buildlog", "Output of the build process")
+
+        cls._register_trace(stdout)
+        cls._register_trace(stderr)
+        cls._register_trace(buildlog)
+
     def __init__(self, ec, guid):
         super(LinuxApplication, self).__init__(ec, guid)
         self._pid = None
         self._ppid = None
-        self._home = "app-%s" % self.box.guid
-        self._node = None
+        self._home = "app-%s" % self.guid
 
         self._logger = logging.getLogger("neco.linux.Application.%d" % guid)
 
     @property
     def node(self):
-        self._node
+        node = self.get_connected(LinuxNode.rtype())
+        if node: return node[0]
+        return None
 
     @property
     def home(self):
-        return self._home # + node home
+        return os.path.join(self.node.exp_dir, self._home)
+
+    @property
+    def src_dir(self):
+        return os.path.join(self.home, 'src')
+
+    @property
+    def build_dir(self):
+        return os.path.join(self.home, 'build')
 
     @property
     def pid(self):
@@ -93,62 +131,254 @@ class LinuxApplication(ResourceManager):
     def ppid(self):
         return self._ppid
 
+    def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
+        path = os.path.join(self.home, name)
+        
+        cmd = "(test -f %s && echo 'success') || echo 'error'" % path
+        (out, err), proc = self.node.execute(cmd)
+
+        if (err and proc.poll()) or out.find("error") != -1:
+            err_msg = " Couldn't find trace %s on host %s. Error: %s" % (
+                    name, self.node.get("hostname"), err)
+            self.logger.error(err_msg)
+            return None
+    
+        if attr == TraceAttr.PATH:
+            return path
+
+        if attr == TraceAttr.ALL:
+            (out, err), proc = self.node.check_output(self.home, name)
+            
+            if err and proc.poll():
+                err_msg = " Couldn't read trace %s on host %s. Error: %s" % (
+                            name, self.node.get("hostname"), err)
+                self.logger.error(err_msg)
+                return None
+
+            return out
+
+        if attr == TraceAttr.STREAM:
+            cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
+        elif attr == TraceAttr.SIZE:
+            cmd = "stat -c%%s %s " % path
+
+        (out, err), proc = self.node.execute(cmd)
+
+        if err and proc.poll():
+            err_msg = " Couldn't find trace %s on host %s. Error: %s" % (
+                    name, self.node.get("hostname"), err)
+            self.logger.error(err_msg)
+            return None
+        
+        if attr == TraceAttr.SIZE:
+            out = int(out.strip())
+
+        return out
+            
     def provision(self, filters = None):
-        # verify home hash or clean home
+        # TODO: verify home hash or clean home
+
+        # create home dir for application
+        self.node.mkdir(self.home)
+
         # upload sources
-        # build  
-        # Install stuff!!
-        # upload app command
-        pass
+        self.upload_sources()
+
+        # upload code
+        self.upload_code()
+
+        # install dependencies
+        self.install_dependencies()
+
+        # build
+        self.build()
+
+        # Install
+        self.install()
+
+        super(LinuxApplication, self).provision()
+
+    def upload_sources(self):
+        # check if sources need to be uploaded and upload them
+        sources = self.get("sources")
+        if sources:
+            self.logger.debug(" Uploading sources %s" % sources)
+
+            # create dir for sources
+            self.node.mkdir(self.src_dir)
+
+            sources = self.sources.split(' ')
+
+            http_sources = list()
+            for source in list(sources):
+                if source.startswith("http") or source.startswith("https"):
+                    http_sources.append(source)
+                    sources.remove(source)
+
+            # Download http sources
+            for source in http_sources:
+                dst = os.path.join(self.src_dir, source.split("/")[-1])
+                command = "wget -o %s %s" % (dst, source)
+                self.node.execute(command)
+
+            self.node.upload(sources, self.src_dir)
+
+    def upload_code(self):
+        code = self.get("code")
+        if code:
+            # create dir for sources
+            self.node.mkdir(self.src_dir)
+
+            self.logger.debug(" Uploading code '%s'" % code)
+
+            dst = os.path.join(self.src_dir, "code")
+            self.node.upload(sources, dst, text = True)
+
+    def install_dependencies(self):
+        depends = self.get("depends")
+        if depends:
+            self.logger.debug(" Installing dependencies %s" % depends)
+            self.node.install_packages(depends, home = self.home)
+
+    def build(self):
+        build = self.get("build")
+        if build:
+            self.logger.debug(" Building sources '%s'" % build)
+            
+            # create dir for build
+            self.node.mkdir(self.build_dir)
+
+            cmd = self.replace_paths(build)
+
+            (out, err), proc = self.run_and_wait(cmd, self.home,
+                pidfile = "build_pid",
+                stdout = "build_log", 
+                stderr = "build_err", 
+                raise_on_error = True)
+    def install(self):
+        install = self.get("install")
+        if install:
+            self.logger.debug(" Installing sources '%s'" % install)
+
+            cmd = self.replace_paths(install)
+
+            (out, err), proc = self.run_and_wait(cmd, self.home, 
+                pidfile = "install_pid",
+                stdout = "install_log", 
+                stderr = "install_err", 
+                raise_on_error = True)
 
     def deploy(self):
         # Wait until node is associated and deployed
-        self.provision()
-        pass
+        node = self.node
+        if not node or node.state < ResourceState.READY:
+            self.ec.schedule(DELAY, self.deploy)
+        else:
+            self.discover()
+            self.provision()
+
+            super(LinuxApplication, self).deploy()
 
     def start(self):
-        dst = os.path.join(self.home, "app.sh")
-        
-        # Create shell script with the command
-        # This way, complex commands and scripts can be ran seamlessly
-        # sync files
-        cmd = ""
+        command = self.replace_paths(self.get("command"))
         env = self.get("env")
-        if env:
-            for envkey, envvals in env.iteritems():
-                for envval in envvals:
-                    cmd += 'export %s=%s\n' % (envkey, envval)
+        stdin = 'stdin' if self.get("stdin") else None
+        sudo = self.get('sudo') or False
+        x11 = self.get("forwardX11") or False
+        err_msg = "Failed to run command %s on host %s" % (
+                     command, self.node.get("hostname"))
+        failed = False
 
-        cmd += self.get("command")
-        self.api.upload(cmd, dst)
+        super(LinuxApplication, self).start()
 
-        command = 'bash ./app.sh'
-        stdin = 'stdin' if self.get("stdin") else None
-        self.api.run(command, self.home, stdin = stdin)
-        self._pid, self._ppid = self.api.checkpid(self.app_home)
+        if x11:
+            (out, err), proc = self.node.execute(command,
+                    sudo = sudo,
+                    stdin = stdin,
+                    stdout = 'stdout',
+                    stderr = 'stderr',
+                    env = env,
+                    forward_x11 = x11)
+
+            if proc.poll() and err:
+                failed = True
+        else:
+            (out, err), proc = self.node.run(command, self.home, 
+                stdin = stdin, 
+                sudo = sudo)
+
+            if proc.poll() and err:
+                failed = True
+        
+            if not failed:
+                pid, ppid = self.node.wait_pid(home = self.home)
+                if pid: self._pid = int(pid)
+                if ppid: self._ppid = int(ppid)
+
+            if not self.pid or not self.ppid:
+                failed = True
+        (out, chkerr), proc = self.node.check_output(self.home, 'stderr')
+
+        if failed or out or chkerr:
+            # check if execution errors occurred
+            if err:
+                err_msg = "%s. Proc error: %s" % (err_msg, err)
+
+            err_msg = "%s. Run error: %s " % (err_msg, out)
+
+            if chkerr:
+                err_msg = "%s. Failed to check error: %s" % (err_msg, chkerr)
+
+            self.logger.error(err_msg)
+            self.state = ResourceState.FAILED
 
     def stop(self):
-        # Kill
-        self._state = ResourceState.STOPPED
+        state = self.state
+        if state == ResourceState.STARTED:
+            (out, err), proc = self.node.kill(self.pid, self.ppid)
+
+            if out or err:
+                # check if execution errors occurred
+                err_msg = " Failed to STOP command '%s' on host %s. Check error: %s. Run error: %s" % (
+                     self.get("command"), self.node.get("hostname"), err, out)
+                self.logger.error(err_msg)
+                self._state = ResourceState.FAILED
+                stopped = False
+            else:
+                super(LinuxApplication, self).stop()
 
     def release(self):
         tear_down = self.get("tearDown")
         if tear_down:
-            self.api.execute(tear_down)
+            self.node.execute(tear_down)
 
-        return self.api.kill(self.pid, self.ppid)
+        self.stop()
+        if self.state == ResourceState.STOPPED:
+            super(LinuxApplication, self).release()
+    
+    @property
+    def state(self):
+        if self._state == ResourceState.STARTED:
+            (out, err), proc = self.node.check_output(self.home, 'stderr')
 
-    def status(self):
-        return self.api.status(self.pid, self.ppid)
+            if out or err:
+                # check if execution errors occurred
+                err_msg = " Failed to execute command '%s' on host %s. Check error: %s. Run error: %s" % (
+                        self.get("command"), self.node.get("hostname"), err, out)
+                self.logger.error(err_msg)
+                self._state = ResourceState.FAILED
 
-    def make_app_home(self):
-        self.api.mkdir(self.home)
+            elif self.pid and self.ppid:
+                status = self.node.status(self.pid, self.ppid)
 
-        stdin = self.get("stdin")
-        if stdin:
-            self.api.upload(stdin, os.path.join(self.home, 'stdin'))
+                if status == sshfuncs.FINISHED:
+                    self._state = ResourceState.FINISHED
 
-    def _validate_connection(self, guid):
+        return self._state
+
+    def valid_connection(self, guid):
         # TODO: Validate!
         return True
         # XXX: What if it is connected to more than one node?
@@ -156,3 +386,36 @@ class LinuxApplication(ResourceManager):
         self._node = resources[0] if len(resources) == 1 else None
         return self._node
 
+    def hash_app(self):
+        """ Generates a hash representing univokely the application.
+        Is used to determine whether the home directory should be cleaned
+        or not.
+
+        """
+        command = self.get("command")
+        forwards_x11 = self.get("forwardX11")
+        env = self.get("env")
+        sudo = self.get("sudo")
+        depends = self.get("depends")
+        sources = self.get("sources")
+        cls._register_attribute(sources)
+        cls._register_attribute(build)
+        cls._register_attribute(install)
+        cls._register_attribute(stdin)
+        cls._register_attribute(stdout)
+        cls._register_attribute(stderr)
+        cls._register_attribute(tear_down)
+        skey = "".join(map(str, args))
+        return hashlib.md5(skey).hexdigest()
+
+    def replace_paths(self, command):
+        """
+        Replace all special path tags with shell-escaped actual paths.
+        """
+        return ( command
+            .replace("${SOURCES}", self.src_dir)
+            .replace("${BUILD}", self.build_dir) 
+            .replace("${APPHOME}", self.home) 
+            .replace("${NODEHOME}", self.node.home) )
+
+
diff --git a/src/neco/resources/linux/channel.py b/src/neco/resources/linux/channel.py
new file mode 100644 (file)
index 0000000..23f87b5
--- /dev/null
@@ -0,0 +1,24 @@
+from neco.execution.attribute import Attribute, Flags
+from neco.execution.resource import ResourceManager, clsinit, ResourceState
+from neco.resources.linux.node import LinuxNode
+
+import collections
+import logging
+import os
+import random
+import re
+import tempfile
+import time
+import threading
+
+@clsinit
+class LinuxChannel(ResourceManager):
+    _rtype = "LinuxChannel"
+
+    def __init__(self, ec, guid):
+        super(LinuxChannel, self).__init__(ec, guid)
+        self._logger = logging.getLogger("neco.linux.Channel.%d " % self.guid)
+
+    def valid_connection(self, guid):
+        # TODO: Validate!
+        return True
diff --git a/src/neco/resources/linux/debfuncs.py b/src/neco/resources/linux/debfuncs.py
new file mode 100644 (file)
index 0000000..7cf7260
--- /dev/null
@@ -0,0 +1,26 @@
+# TODO: Investigate using http://nixos.org/nix/
+
+def install_packages_command(os, packages):
+    if not isinstance(packages, list):
+        packages = [packages]
+
+    cmd = ""
+    for p in packages:
+        cmd += " ( dpkg -s %(package)s || sudo apt-get -y install %(package)s ) ; " % {
+                'package': p}
+   
+    #cmd = (dpkg -s vim || sudo dpkg -s install vim) ; (...)
+    return cmd 
+
+def remove_packages_command(os, packages):
+    if not isinstance(packages, list):
+        packages = [packages]
+
+    cmd = ""
+    for p in packages:
+        cmd += " ( dpkg -s %(package)s && sudo apt-get -y purge %(package)s ) ; " % {
+                'package': p}
+    
+    #cmd = (dpkg -s vim || sudo apt-get -y purge vim) ; (...)
+    return cmd 
+
index b482747..a030eb4 100644 (file)
@@ -13,6 +13,9 @@ import time
 import threading
 
 # TODO: Verify files and dirs exists already
+# TODO: Blacklist node!
+
+DELAY ="1s"
 
 @clsinit
 class LinuxNode(ResourceManager):
@@ -20,32 +23,35 @@ class LinuxNode(ResourceManager):
 
     @classmethod
     def _register_attributes(cls):
-        hostname = Attribute("hostname", "Hostname of the machine")
+        hostname = Attribute("hostname", "Hostname of the machine",
+                flags = Flags.ExecReadOnly)
 
         username = Attribute("username", "Local account username", 
                 flags = Flags.Credential)
 
-        port = Attribute("port", "SSH port", flags = Flags.Credential)
+        port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
         
-        home = Attribute("home", 
-                "Experiment home directory to store all experiment related files")
+        home = Attribute("home",
+                "Experiment home directory to store all experiment related files",
+                flags = Flags.ExecReadOnly)
         
         identity = Attribute("identity", "SSH identity file",
                 flags = Flags.Credential)
         
         server_key = Attribute("serverKey", "Server public key", 
-                flags = Flags.Credential)
+                flags = Flags.ExecReadOnly)
         
         clean_home = Attribute("cleanHome", "Remove all files and directories " + \
                 " from home folder before starting experiment", 
-                flags = Flags.ReadOnly)
+                flags = Flags.ExecReadOnly)
         
         clean_processes = Attribute("cleanProcesses", 
-                "Kill all running processes before starting experiment", 
-                flags = Flags.ReadOnly)
+                "Kill all running processes before starting experiment",
+                flags = Flags.ExecReadOnly)
         
         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
-                "releasing the resource", flags = Flags.ReadOnly)
+                "releasing the resource",
+                flags = Flags.ExecReadOnly)
 
         cls._register_attribute(hostname)
         cls._register_attribute(username)
@@ -60,7 +66,6 @@ class LinuxNode(ResourceManager):
     def __init__(self, ec, guid):
         super(LinuxNode, self).__init__(ec, guid)
         self._os = None
-        self._home = "nepi-exp-%s" % os.urandom(8).encode('hex')
         
         # lock to avoid concurrency issues on methods used by applications 
         self._lock = threading.Lock()
@@ -69,10 +74,17 @@ class LinuxNode(ResourceManager):
 
     @property
     def home(self):
-        home = self.get("home")
-        if home and not home.startswith("nepi-"):
-            home = "nepi-" + home
-        return home or self._home
+        return self.get("home") or "/tmp"
+
+    @property
+    def exp_dir(self):
+        exp_dir = os.path.join(self.home, self.ec.exp_id)
+        return exp_dir if exp_dir.startswith('/') else "${HOME}/"
+
+    @property
+    def node_dir(self):
+        node_dir = "node-%d" % self.guid
+        return os.path.join(self.exp_dir, node_dir)
 
     @property
     def os(self):
@@ -116,17 +128,29 @@ class LinuxNode(ResourceManager):
             self.logger.error("Deploy failed. Unresponsive node")
             return
 
-    def deploy(self):
-        self.provision()
-
         if self.get("cleanProcesses"):
             self.clean_processes()
 
         if self.get("cleanHome"):
-            # self.clean_home() -> this is dangerous
-            pass
+            self.clean_home()
+       
+        self.mkdir(self.node_dir)
 
-        self.mkdir(self.home)
+        super(LinuxNode, self).provision()
+
+    def deploy(self):
+        if self.state == ResourceState.NEW:
+            self.discover()
+            self.provision()
+
+        # Node needs to wait until all associated interfaces are 
+        # ready before it can finalize deployment
+        from neco.resources.linux.interface import LinuxInterface
+        ifaces = self.get_connected(LinuxInterface.rtype())
+        for iface in ifaces:
+            if iface.state < ResourceState.READY:
+                self.ec.schedule(DELAY, self.deploy)
+                return 
 
         super(LinuxNode, self).deploy()
 
@@ -137,7 +161,7 @@ class LinuxNode(ResourceManager):
 
         super(LinuxNode, self).release()
 
-    def validate_connection(self, guid):
+    def valid_connection(self, guid):
         # TODO: Validate!
         return True
 
@@ -152,38 +176,31 @@ class LinuxNode(ResourceManager):
 
         out = err = ""
         with self._lock:
-           (out, err), proc = self.run_and_wait(cmd, self.home, 
-                pidfile = "cppid",
-                stdout = "cplog", 
-                stderr = "cperr", 
-                raise_on_error = True)
-
-        return (out, err)   
+           (out, err), proc = self.execute(cmd) 
             
     def clean_home(self):
         self.logger.info("Cleaning up home")
 
-        cmd = "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \) -execdir rm -rf {} + "
+        cmd = ("cd %s ; " % self.home +
+            "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)"+
+            " -execdir rm -rf {} + ")
 
         out = err = ""
         with self._lock:
-            (out, err), proc = self.run_and_wait(cmd, self.home,
-                pidfile = "chpid",
-                stdout = "chlog", 
-                stderr = "cherr", 
-                raise_on_error = True)
-        
-        return (out, err)   
+            (out, err), proc = self.execute(cmd)
 
-    def upload(self, src, dst):
+    def upload(self, src, dst, text = False):
         """ Copy content to destination
 
-           src  content to copy. Can be a local file, directory or text input
+           src  content to copy. Can be a local file, directory or a list of files
 
            dst  destination path on the remote host (remote is always self.host)
+
+           text src is text input, it must be stored into a temp file before uploading
         """
         # If source is a string input 
-        if not os.path.isfile(src):
+        f = None
+        if text and not os.path.isfile(src):
             # src is text input that should be uploaded as file
             # create a temporal file with the content to upload
             f = tempfile.NamedTemporaryFile(delete=False)
@@ -195,7 +212,13 @@ class LinuxNode(ResourceManager):
             # Build destination as <user>@<server>:<path>
             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
 
-        return self.copy(src, dst)
+        result = self.copy(src, dst)
+
+        # clean up temp file
+        if f:
+            os.remove(f.name)
+
+        return result
 
     def download(self, src, dst):
         if not self.localhost:
@@ -203,7 +226,9 @@ class LinuxNode(ResourceManager):
             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
         return self.copy(src, dst)
 
-    def install_packages(self, packages):
+    def install_packages(self, packages, home = None):
+        home = home or self.node_dir
+
         cmd = ""
         if self.os in ["f12", "f14"]:
             cmd = rpmfuncs.install_packages_command(self.os, packages)
@@ -217,15 +242,17 @@ class LinuxNode(ResourceManager):
 
         out = err = ""
         with self._lock:
-            (out, err), proc = self.run_and_wait(cmd, self.home, 
-                pidfile = "instpkgpid",
-                stdout = "instpkglog", 
-                stderr = "instpkgerr", 
+            (out, err), proc = self.run_and_wait(cmd, home, 
+                pidfile = "instpkg_pid",
+                stdout = "instpkg_log", 
+                stderr = "instpkg_err", 
                 raise_on_error = True)
 
         return (out, err), proc 
 
-    def remove_packages(self, packages):
+    def remove_packages(self, packages, home = None):
+        home = home or self.node_dir
+
         cmd = ""
         if self.os in ["f12", "f14"]:
             cmd = rpmfuncs.remove_packages_command(self.os, packages)
@@ -239,10 +266,10 @@ class LinuxNode(ResourceManager):
 
         out = err = ""
         with self._lock:
-            (out, err), proc = self.run_and_wait(cmd, self.home, 
-                pidfile = "rmpkgpid",
-                stdout = "rmpkglog", 
-                stderr = "rmpkgerr", 
+            (out, err), proc = self.run_and_wait(cmd, home, 
+                pidfile = "rmpkg_pid",
+                stdout = "rmpkg_log", 
+                stderr = "rmpkg_err", 
                 raise_on_error = True)
          
         return (out, err), proc 
@@ -264,7 +291,13 @@ class LinuxNode(ResourceManager):
             stderr = 'stderr', 
             sudo = False,
             raise_on_error = False):
-
+        """ runs a command in background on the remote host, but waits
+            until the command finishes execution.
+            This is more robust than doing a simple synchronized 'execute',
+            since in the remote host the command can continue to run detached
+            even if network disconnections occur
+        """
+        # run command in background in remote host
         (out, err), proc = self.run(command, home, 
                 pidfile = pidfile,
                 stdin = stdin, 
@@ -272,21 +305,25 @@ class LinuxNode(ResourceManager):
                 stderr = stderr, 
                 sudo = sudo)
 
+        # check no errors occurred
         if proc.poll() and err:
             msg = " Failed to run command %s on host %s" % (
                     command, self.get("hostname"))
             self.logger.error(msg)
             if raise_on_error:
                 raise RuntimeError, msg
-        
+
+        # Wait for pid file to be generated
         pid, ppid = self.wait_pid(
                 home = home, 
                 pidfile = pidfile, 
                 raise_on_error = raise_on_error)
 
+        # wait until command finishes to execute
         self.wait_run(pid, ppid)
-        
-        (out, err), proc = self.check_run_error(home, stderr)
+       
+        # check if execution errors occurred
+        (out, err), proc = self.check_output(home, stderr)
 
         if err or out:
             msg = "Error while running command %s on host %s. error output: %s" % (
@@ -301,6 +338,8 @@ class LinuxNode(ResourceManager):
         return (out, err), proc
  
     def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
+        """ Waits until the pid file for the command is generated, 
+            and returns the pid and ppid of the process """
         pid = ppid = None
         delay = 1.0
         for i in xrange(5):
@@ -322,6 +361,7 @@ class LinuxNode(ResourceManager):
         return pid, ppid
 
     def wait_run(self, pid, ppid, trial = 0):
+        """ wait for a remote process to finish execution """
         delay = 1.0
         first = True
         bustspin = 0
@@ -344,17 +384,12 @@ class LinuxNode(ResourceManager):
                 delay = min(30,delay*1.2)
                 bustspin = 0
 
-    def check_run_error(self, home, stderr = 'stderr'):
+    def check_output(self, home, filename):
+        """ checks file content """
         (out, err), proc = self.execute("cat %s" % 
-                os.path.join(home, stderr))
+                os.path.join(home, filename))
         return (out, err), proc
 
-    def check_run_output(self, home, stdout = 'stdout'):
-        (out, err), proc = self.execute("cat %s" % 
-                os.path.join(home, stdout))
-        return (out, err), proc
-
-
     def is_alive(self):
         if self.localhost:
             return True
@@ -575,7 +610,7 @@ class LinuxNode(ResourceManager):
                             self.logger.error("%s. out: %s error: %s", fail_msg, out, err)
                     break
                 except RuntimeError, e:
-                    if x >= 3:
+                    if i >= 3:
                         self.logger.error("%s. error: %s", fail_msg, e.args)
             return (out, err), proc
 
diff --git a/src/neco/resources/linux/rpmfuncs.py b/src/neco/resources/linux/rpmfuncs.py
new file mode 100644 (file)
index 0000000..7f44887
--- /dev/null
@@ -0,0 +1,42 @@
+RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
+RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
+
+# TODO: Investigate using http://nixos.org/nix/
+
+def install_packages_command(os, packages):
+    if not isinstance(packages, list):
+        packages = [packages]
+
+    cmd = "( %s )" % install_rpmfusion_command(os)
+    for p in packages:
+        cmd += " ; ( rpm -q %(package)s || sudo yum -y install %(package)s ) " % {
+            'package': p}
+    
+    #cmd = ((rpm -q rpmfusion-free-release || sudo -s rpm -i ...) ; (rpm -q vim || sudo yum -y install vim))
+    return " ( %s )" % cmd 
+
+def remove_packages_command(os, packages):
+    if not isinstance(packages, list):
+        packages = [packages]
+
+    cmd = ""
+    for p in packages:
+        cmd += " ( rpm -q %(package)s && sudo yum -y remove %(package)s ) ; " % {
+                    'package': p}
+    
+    #cmd = (rpm -q vim || sudo yum -y remove vim) ; (...)
+    return cmd 
+
+def install_rpmfusion_command(os):
+    cmd = "rpm -q rpmfusion-free-release || sudo -S rpm -i %(package)s"
+
+    if os == "f12":
+        cmd =  cmd %  {'package': RPM_FUSION_URL_F12}
+    elif os == "f14":
+        # This one works for f13+
+        cmd = cmd %  {'package': RPM_FUSION_URL}
+    else:
+        cmd = ""
+
+    return cmd
diff --git a/src/neco/util/execfuncs.py b/src/neco/util/execfuncs.py
new file mode 100644 (file)
index 0000000..2ff76ee
--- /dev/null
@@ -0,0 +1,221 @@
+from neco.util.sshfuncs import RUNNING, FINISHED, NOT_STARTED, STDOUT 
+
+import subprocess
+
+def lexec(command, 
+        user = None, 
+        sudo = False,
+        stdin = None,
+        env = None):
+    """
+    Executes a local command, returns ((stdout,stderr),process)
+    """
+    if env:
+        export = ''
+        for envkey, envval in env.iteritems():
+            export += '%s=%s ' % (envkey, envval)
+        command = "%s %s" % (export, command)
+
+    if sudo:
+        command = "sudo %s" % command
+    elif user:
+        command = "su %s ; %s " % (user, command)
+
+    p = subprocess.Popen(command, 
+            stdout = subprocess.PIPE, 
+            stderr = subprocess.PIPE,
+            stdin  = stdin)
+
+    out, err = p.communicate()
+    return ((out, err), proc)
+
+def lcopy(source, dest, recursive = False):
+    """
+    Copies from/to localy.
+    """
+    
+    if TRACE:
+        print "scp", source, dest
+    
+    command = ["cp"]
+    if recursive:
+        command.append("-R")
+    
+    command.append(src)
+    command.append(dst)
+    
+    p = subprocess.Popen(command, 
+        stdout=subprocess.PIPE, 
+        stderr=subprocess.PIPE)
+
+    out, err = p.communicate()
+    return ((out, err), proc)
+   
+def lspawn(command, pidfile, 
+        stdout = '/dev/null', 
+        stderr = STDOUT, 
+        stdin = '/dev/null', 
+        home = None, 
+        create_home = False, 
+        sudo = False,
+        user = None): 
+    """
+    Spawn a local command such that it will continue working asynchronously.
+    
+    Parameters:
+        command: the command to run - it should be a single line.
+        
+        pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
+        
+        stdout: path of a file to redirect standard output to - must be a string.
+            Defaults to /dev/null
+        stderr: path of a file to redirect standard error to - string or the special STDOUT value
+            to redirect to the same file stdout was redirected to. Defaults to STDOUT.
+        stdin: path of a file with input to be piped into the command's standard input
+        
+        home: path of a folder to use as working directory - should exist, unless you specify create_home
+        
+        create_home: if True, the home folder will be created first with mkdir -p
+        
+        sudo: whether the command needs to be executed as root
+        
+    Returns:
+        (stdout, stderr), process
+        
+        Of the spawning process, which only captures errors at spawning time.
+        Usually only useful for diagnostics.
+    """
+    # Start process in a "daemonized" way, using nohup and heavy
+    # stdin/out redirection to avoid connection issues
+    if stderr is STDOUT:
+        stderr = '&1'
+    else:
+        stderr = ' ' + stderr
+    
+    daemon_command = '{ { %(command)s  > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
+        'command' : command,
+        'pidfile' : shell_escape(pidfile),
+        'stdout' : stdout,
+        'stderr' : stderr,
+        'stdin' : stdin,
+    }
+    
+    cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
+            'command' : shell_escape(daemon_command),
+            'sudo' : 'sudo -S' if sudo else '',
+            'pidfile' : shell_escape(pidfile),
+            'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
+            'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home else '',
+        }
+
+    (out,err),proc = lexec(cmd)
+    
+    if proc.wait():
+        raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
+
+    return (out,err),proc
+
+def lcheckpid(pidfile):
+    """
+    Check the pidfile of a process spawned with remote_spawn.
+    
+    Parameters:
+        pidfile: the pidfile passed to remote_span
+        
+    Returns:
+        
+        A (pid, ppid) tuple useful for calling remote_status and remote_kill,
+        or None if the pidfile isn't valid yet (maybe the process is still starting).
+    """
+
+    (out,err),proc = lexec("cat %s" % pidfile )
+        
+    if proc.wait():
+        return None
+    
+    if out:
+        try:
+            return map(int,out.strip().split(' ',1))
+        except:
+            # Ignore, many ways to fail that don't matter that much
+            return None
+
+def lstatus(pid, ppid): 
+    """
+    Check the status of a process spawned with remote_spawn.
+    
+    Parameters:
+        pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
+        
+    Returns:
+        
+        One of NOT_STARTED, RUNNING, FINISHED
+    """
+
+    (out,err),proc = lexec(
+        # Check only by pid. pid+ppid does not always work (especially with sudo) 
+        " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
+            'ppid' : ppid,
+            'pid' : pid,
+        })
+    
+    if proc.wait():
+        return NOT_STARTED
+    
+    status = False
+    if out:
+        status = (out.strip() == 'wait')
+    else:
+        return NOT_STARTED
+    return RUNNING if status else FINISHED
+
+def lkill(pid, ppid, sudo = False):
+    """
+    Kill a process spawned with lspawn.
+    
+    First tries a SIGTERM, and if the process does not end in 10 seconds,
+    it sends a SIGKILL.
+    
+    Parameters:
+        pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
+        
+        sudo: whether the command was run with sudo - careful killing like this.
+    
+    Returns:
+        
+        Nothing, should have killed the process
+    """
+    
+    subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
+    cmd = """
+SUBKILL="%(subkill)s" ;
+%(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
+%(sudo)s kill %(pid)d $SUBKILL || /bin/true
+for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
+    sleep 0.2 
+    if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
+        break
+    else
+        %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
+        %(sudo)s kill %(pid)d $SUBKILL || /bin/true
+    fi
+    sleep 1.8
+done
+if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
+    %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
+    %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
+fi
+"""
+    if nowait:
+        cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
+
+    (out,err),proc = lexec(
+        cmd % {
+            'ppid' : ppid,
+            'pid' : pid,
+            'sudo' : 'sudo -S' if sudo else '',
+            'subkill' : subkill,
+        })
+    
+
index d90b137..8f5a2b5 100755 (executable)
@@ -43,14 +43,6 @@ class ResourceFactoryTestCase(unittest.TestCase):
 
         self.assertEquals(len(ResourceFactory.resource_types()), 2)
 
-def get_connected(connections, rtype, ec):
-    connected = []
-    for guid in connections:
-        rm = ec.get_resource(guid)
-        if rm.rtype() == rtype:
-            connected.append(rm)
-    return connected
-
 class Channel(ResourceManager):
     _rtype = "Channel"
 
@@ -69,8 +61,8 @@ class Interface(ResourceManager):
         super(Interface, self).__init__(ec, guid)
 
     def deploy(self):
-        node = get_connected(self.connections, Node.rtype(), self.ec)[0]
-        chan = get_connected(self.connections, Channel.rtype(), self.ec)[0]
+        node = self.get_connected(Node.rtype())[0]
+        chan = self.get_connected(Channel.rtype())[0]
 
         if node.state < ResourceState.PROVISIONED:
             self.ec.schedule("0.5s", self.deploy)
@@ -94,7 +86,7 @@ class Node(ResourceManager):
             self.logger.debug(" -------- PROVISIONED ------- ")
             self.ec.schedule("3s", self.deploy)
         elif self.state == ResourceState.PROVISIONED:
-            ifaces = get_connected(self.connections, Interface.rtype(), self.ec)
+            ifaces = self.get_connected(Interface.rtype())
             for rm in ifaces:
                 if rm.state < ResourceState.READY:
                     self.ec.schedule("0.5s", self.deploy)
@@ -110,7 +102,7 @@ class Application(ResourceManager):
         super(Application, self).__init__(ec, guid)
 
     def deploy(self):
-        node = get_connected(self.connections, Node.rtype(), self.ec)[0]
+        node = self.get_connected(Node.rtype())[0]
         if node.state < ResourceState.READY:
             self.ec.schedule("0.5s", self.deploy)
         else:
diff --git a/test/resources/linux/application.py b/test/resources/linux/application.py
new file mode 100644 (file)
index 0000000..17beb5a
--- /dev/null
@@ -0,0 +1,79 @@
+#!/usr/bin/env python
+from neco.execution.ec import ExperimentController 
+from neco.execution.resource import ResourceState
+from neco.execution.trace import TraceAttr
+from neco.resources.linux.node import LinuxNode
+from neco.resources.linux.application import LinuxApplication
+
+from test_utils import skipIfNotAlive
+
+import os
+import time
+import tempfile
+import unittest
+
+class LinuxApplicationTestCase(unittest.TestCase):
+    def setUp(self):
+        self.fedora_host = 'nepi2.pl.sophia.inria.fr'
+        self.fedora_user = 'inria_nepi'
+
+        self.ubuntu_host = 'roseval.pl.sophia.inria.fr'
+        self.ubuntu_user = 'alina'
+        
+        self.target = 'nepi5.pl.sophia.inria.fr'
+
+    @skipIfNotAlive
+    def t_ping(self, host, user):
+        from neco.execution.resource import ResourceFactory
+        
+        ResourceFactory.register_type(LinuxNode)
+        ResourceFactory.register_type(LinuxApplication)
+
+        ec = ExperimentController()
+        
+        node = ec.register_resource("LinuxNode")
+        ec.set(node, "hostname", host)
+        ec.set(node, "username", user)
+        ec.set(node, "cleanHome", True)
+        ec.set(node, "cleanProcesses", True)
+
+        app = ec.register_resource("LinuxApplication")
+        cmd = "ping -c5 %s" % self.target 
+        ec.set(app, "command", cmd)
+        
+        ec.register_connection(app, node)
+
+        try:
+            ec.deploy()
+
+            while not ec.state(app) == ResourceState.FINISHED:
+                time.sleep(0.5)
+
+            self.assertTrue(ec.state(node) == ResourceState.STARTED)
+            self.assertTrue(ec.state(app) == ResourceState.FINISHED)
+
+            stdout = ec.trace(app, 'stdout')
+            size = ec.trace(app, 'stdout', attr = TraceAttr.SIZE)
+            self.assertEquals(len(stdout), size)
+            
+            block = ec.trace(app, 'stdout', attr = TraceAttr.STREAM, block = 5, offset = 1)
+            self.assertEquals(block, stdout[5:10])
+
+            path = ec.trace(app, 'stdout', attr = TraceAttr.PATH)
+            rm = ec.get_resource(app)
+            p = os.path.join(rm.home, 'stdout')
+            self.assertEquals(path, p)
+
+        finally:
+            ec.shutdown()
+
+    def test_deploy_fedora(self):
+        self.t_ping(self.fedora_host, self.fedora_user)
+
+    def test_deploy_ubuntu(self):
+        self.t_ping(self.ubuntu_host, self.ubuntu_user)
+
+
+if __name__ == '__main__':
+    unittest.main()
+
diff --git a/test/resources/linux/interface.py b/test/resources/linux/interface.py
new file mode 100644 (file)
index 0000000..7b914d2
--- /dev/null
@@ -0,0 +1,67 @@
+#!/usr/bin/env python
+from neco.execution.ec import ExperimentController 
+from neco.execution.resource import ResourceState
+from neco.resources.linux.node import LinuxNode
+from neco.resources.linux.interface import LinuxInterface
+from neco.resources.linux.channel import LinuxChannel
+from neco.util.sshfuncs import RUNNING, FINISHED
+
+from test_utils import skipIfNotAlive
+
+import os
+import time
+import tempfile
+import unittest
+
+class LinuxInterfaceTestCase(unittest.TestCase):
+    def setUp(self):
+        self.fedora_host = 'nepi2.pl.sophia.inria.fr'
+        self.fedora_user = 'inria_nepi'
+
+        self.ubuntu_host = 'roseval.pl.sophia.inria.fr'
+        self.ubuntu_user = 'alina'
+
+    @skipIfNotAlive
+    def t_deploy(self, host, user):
+        from neco.execution.resource import ResourceFactory
+        
+        ResourceFactory.register_type(LinuxNode)
+        ResourceFactory.register_type(LinuxInterface)
+        ResourceFactory.register_type(LinuxChannel)
+
+        ec = ExperimentController()
+        
+        node = ec.register_resource("LinuxNode")
+        ec.set(node, "hostname", host)
+        ec.set(node, "username", user)
+
+        iface = ec.register_resource("LinuxInterface")
+        chan = ec.register_resource("LinuxChannel")
+
+        ec.register_connection(iface, node)
+        ec.register_connection(iface, chan)
+
+        try:
+            ec.deploy()
+
+            while not all([ ec.state(guid) == ResourceState.STARTED \
+                    for guid in [node, iface]]):
+                time.sleep(0.5)
+
+            self.assertTrue(ec.state(node) == ResourceState.STARTED)
+            self.assertTrue(ec.state(iface) == ResourceState.STARTED)
+            self.assertTrue(ec.get(iface, "deviceName") == "eth0")
+
+        finally:
+            ec.shutdown()
+
+    def test_deploy_fedora(self):
+        self.t_deploy(self.fedora_host, self.fedora_user)
+
+    def test_deploy_ubuntu(self):
+        self.t_deploy(self.ubuntu_host, self.ubuntu_user)
+
+
+if __name__ == '__main__':
+    unittest.main()
+
index 7133e1c..075c353 100644 (file)
@@ -2,64 +2,27 @@
 from neco.resources.linux.node import LinuxNode
 from neco.util.sshfuncs import RUNNING, FINISHED
 
+from test_utils import skipIfNotAlive, skipInteractive, create_node
+
 import os
 import time
 import tempfile
 import unittest
 
-def skipIfNotAlive(func):
-    name = func.__name__
-    def wrapped(*args, **kwargs):
-        node = args[1]
-
-        if not node.is_alive():
-            print "*** WARNING: Skipping test %s: Node %s is not alive\n" % (
-                name, node.get("hostname"))
-            return
-
-        return func(*args, **kwargs)
-    
-    return wrapped
-
-def skipInteractive(func):
-    name = func.__name__
-    def wrapped(*args, **kwargs):
-        mode = os.environ.get("NEPI_INTERACTIVE", False) in ['True', 'true', 'yes', 'YES']
-        if not mode:
-            print "*** WARNING: Skipping test %s: Interactive mode off \n" % name
-            return
-
-        return func(*args, **kwargs)
-    
-    return wrapped
-
-class DummyEC(object):
-    pass
-
 class LinuxNodeTestCase(unittest.TestCase):
     def setUp(self):
-        host = 'nepi2.pl.sophia.inria.fr'
-        user = 'inria_nepi'
-        self.node_fedora = self.create_node(host, user)
+        self.fedora_host = 'nepi2.pl.sophia.inria.fr'
+        self.fedora_user = 'inria_nepi'
 
-        host = 'roseval.pl.sophia.inria.fr'
-        user = 'alina'
-        self.node_ubuntu = self.create_node(host, user)
+        self.ubuntu_host = 'roseval.pl.sophia.inria.fr'
+        self.ubuntu_user = 'alina'
         
         self.target = 'nepi5.pl.sophia.inria.fr'
-        self.home = '/tmp/nepi-home/test-app'
-
-    def create_node(self, host, user):
-        ec = DummyEC()
-
-        node = LinuxNode(ec, 1)
-        node.set("hostname", host)
-        node.set("username", user)
-
-        return node
 
     @skipIfNotAlive
-    def t_xterm(self, node):
+    def t_xterm(self, host, user):
+        node, ec = create_node(host, user)
+
         node.install_packages('xterm')
 
         (out, err), proc = node.execute('xterm', forward_x11 = True)
@@ -71,7 +34,9 @@ class LinuxNodeTestCase(unittest.TestCase):
         self.assertEquals(out, "")
 
     @skipIfNotAlive
-    def t_execute(self, node):
+    def t_execute(self, host, user):
+        node, ec = create_node(host, user)
+
         command = "ping -qc3 %s" % self.target
         
         (out, err), proc = node.execute(command)
@@ -81,16 +46,15 @@ class LinuxNodeTestCase(unittest.TestCase):
         self.assertTrue(out.find(expected) > 0)
 
     @skipIfNotAlive
-    def t_run(self, node):
-        node.mkdir(self.home, clean = True)
+    def t_run(self, host, user):
+        node, ec = create_node(host, user)
         
-        command = "ping %s" % self.target
-        dst = os.path.join(self.home, "app.sh")
-        node.upload(command, dst)
+        app_home = os.path.join(node.exp_dir, "my-app")
+        node.mkdir(app_home, clean = True)
         
-        cmd = "bash ./app.sh"
-        node.run(cmd, self.home)
-        pid, ppid = node.checkpid(self.home)
+        command = "ping %s" % self.target
+        node.run(command, app_home)
+        pid, ppid = node.checkpid(app_home)
 
         status = node.status(pid, ppid)
         self.assertTrue(status, RUNNING)
@@ -99,17 +63,20 @@ class LinuxNodeTestCase(unittest.TestCase):
         status = node.status(pid, ppid)
         self.assertTrue(status, FINISHED)
         
-        (out, err), proc = node.check_run_output(self.home)
+        (out, err), proc = node.check_output(app_home, "stdout")
 
         expected = """64 bytes from"""
 
         self.assertTrue(out.find(expected) > 0)
 
-        node.rmdir(self.home)
+        node.rmdir(app_home)
 
     @skipIfNotAlive
-    def t_install(self, node):
-        node.mkdir(self.home, clean = True)
+    def t_install(self, host, user):
+        node, ec = create_node(host, user)
+
+        app_home = os.path.join(node.exp_dir, "my-app")
+        node.mkdir(app_home, clean = True)
 
         prog = """#include <stdio.h>
 
@@ -121,35 +88,36 @@ main (void)
 }
 """
         # upload the test program
-        dst = os.path.join(self.home, "hello.c")
-        node.upload(prog, dst)
+        dst = os.path.join(app_home, "hello.c")
+        node.upload(prog, dst, text = True)
 
         # install gcc
         node.install_packages('gcc')
 
         # compile the program using gcc
-        command = "cd %s; gcc -Wall hello.c -o hello" % self.home
+        command = "cd %s; gcc -Wall hello.c -o hello" % app_home
         (out, err), proc = node.execute(command)
 
         # execute the program and get the output from stdout
-        command = "%s/hello" % self.home
+        command = "%s/hello" % app_home 
         (out, err), proc = node.execute(command)
 
         self.assertEquals(out, "Hello, world!\n")
 
         # execute the program and get the output from a file
-        command = "%(home)s/hello > %(home)s/hello.out" % {'home':self.home}
+        command = "%(home)s/hello > %(home)s/hello.out" % {
+                'home': app_home}
         (out, err), proc = node.execute(command)
 
         # retrieve the output file 
-        src = os.path.join(self.home, "hello.out")
+        src = os.path.join(app_home, "hello.out")
         f = tempfile.NamedTemporaryFile(delete=False)
         dst = f.name
         node.download(src, dst)
         f.close()
 
         node.remove_packages('gcc')
-        node.rmdir(self.home)
+        node.rmdir(app_home)
 
         f = open(dst, "r")
         out = f.read()
@@ -158,27 +126,27 @@ main (void)
         self.assertEquals(out, "Hello, world!\n")
 
     def test_execute_fedora(self):
-        self.t_execute(self.node_fedora)
+        self.t_execute(self.fedora_host, self.fedora_user)
 
     def test_execute_ubuntu(self):
-        self.t_execute(self.node_ubuntu)
+        self.t_execute(self.ubuntu_host, self.ubuntu_user)
 
     def test_run_fedora(self):
-        self.t_run(self.node_fedora)
+        self.t_run(self.fedora_host, self.fedora_user)
 
     def test_run_ubuntu(self):
-        self.t_run(self.node_ubuntu)
+        self.t_run(self.ubuntu_host, self.ubuntu_user)
 
     def test_intall_fedora(self):
-        self.t_install(self.node_fedora)
+        self.t_install(self.fedora_host, self.fedora_user)
 
     def test_install_ubuntu(self):
-        self.t_install(self.node_ubuntu)
+        self.t_install(self.ubuntu_host, self.ubuntu_user)
     
     @skipInteractive
     def test_xterm_ubuntu(self):
         """ Interactive test. Should not run automatically """
-        self.t_xterm(self.node_ubuntu)
+        self.t_xterm(self.ubuntu_host, self.ubuntu_user)
 
 
 if __name__ == '__main__':
diff --git a/test/resources/linux/test_utils.py b/test/resources/linux/test_utils.py
new file mode 100644 (file)
index 0000000..885af79
--- /dev/null
@@ -0,0 +1,47 @@
+from neco.resources.linux.node import LinuxNode
+
+import os
+
+class DummyEC(object):
+    @property
+    def exp_id(self):
+        return "nepi-1"
+
+def create_node(hostname, username):
+    ec = DummyEC()
+    node = LinuxNode(ec, 1)
+    node.set("hostname", hostname)
+    node.set("username", username)
+
+    # If we don't return the reference to the EC
+    # it will be released by the garbage collector since 
+    # the resources only save a weak refernce to it.
+    return node, ec
+
+def skipIfNotAlive(func):
+    name = func.__name__
+    def wrapped(*args, **kwargs):
+        node, ec = create_node(args[1], args[2])
+
+        if not node.is_alive():
+            print "*** WARNING: Skipping test %s: Node %s is not alive\n" % (
+                name, node.get("hostname"))
+            return
+
+        return func(*args, **kwargs)
+    
+    return wrapped
+
+def skipInteractive(func):
+    name = func.__name__
+    def wrapped(*args, **kwargs):
+        mode = os.environ.get("NEPI_INTERACTIVE", False) in ['True', 'true', 'yes', 'YES']
+        if not mode:
+            print "*** WARNING: Skipping test %s: Interactive mode off \n" % name
+            return
+
+        return func(*args, **kwargs)
+    
+    return wrapped
+
+