* Some refactoring, modularizing daemonized remote spawning with log capture and...
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Fri, 22 Apr 2011 13:30:37 +0000 (15:30 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Fri, 22 Apr 2011 13:30:37 +0000 (15:30 +0200)
* UNTESTED dependencies support (no building yet)

src/nepi/testbeds/planetlab/application.py
src/nepi/testbeds/planetlab/metadata_v01.py
src/nepi/testbeds/planetlab/node.py
src/nepi/testbeds/planetlab/rspawn.py [new file with mode: 0644]
test/testbeds/planetlab/execute.py
test/testbeds/planetlab/integration.py

index ff022c9..efe2cf8 100644 (file)
@@ -9,6 +9,7 @@ import os.path
 import nepi.util.server as server
 import cStringIO
 import subprocess
+import rspawn
 
 from nepi.util.constants import STATUS_NOT_STARTED, STATUS_RUNNING, \
         STATUS_FINISHED
@@ -23,9 +24,15 @@ class Application(object):
         self.command = None
         self.sudo = False
         
+        self.build = None
+        self.depends = None
+        self.buildDepends = None
+        self.sources = None
+        
         self.stdin = None
         self.stdout = None
         self.stderr = None
+        self.buildlog = None
         
         # Those are filled when the app is configured
         self.home_path = None
@@ -64,15 +71,16 @@ class Application(object):
     def start(self):
         # Start process in a "daemonized" way, using nohup and heavy
         # stdin/out redirection to avoid connection issues
-        (out,err),proc = server.popen_ssh_command(
-            "cd %(home)s ; rm -f ./pid ; ( echo $$ $PPID > ./pid ; %(sudo)s nohup %(command)s > %(stdout)s 2> %(stderr)s < %(stdin)s ) &" % {
-                'home' : server.shell_escape(self.home_path),
-                'command' : self.command,
-                'stdout' : 'stdout' if self.stdout else '/dev/null' ,
-                'stderr' : 'stderr' if self.stderr else '/dev/null' ,
-                'stdin' : 'stdin' if self.stdin is not None else '/dev/null' ,
-                'sudo' : 'sudo' if self.sudo else '',
-            },
+        (out,err),proc = rspawn.remote_spawn(
+            self.command,
+            
+            pidfile = './pid',
+            home = self.home_path,
+            stdin = 'stdin' if self.stdin is not None else '/dev/null',
+            stdout = 'stdout' if self.stdout else '/dev/null',
+            stderr = 'stderr' if self.stderr else '/dev/null',
+            sudo = self.sudo,
+            
             host = self.node.hostname,
             port = None,
             user = self.slicename,
@@ -89,22 +97,17 @@ class Application(object):
         # Get PID/PPID
         # NOTE: wait a bit for the pidfile to be created
         if self._started and not self._pid or not self._ppid:
-            (out,err),proc = server.popen_ssh_command(
-                "cat %(pidfile)s" % {
-                    'pidfile' : server.shell_escape(os.path.join(self.home_path,'pid')),
-                },
+            pidtuple = rspawn.remote_check_pid(
+                os.path.join(self.home_path,'pid'),
                 host = self.node.hostname,
                 port = None,
                 user = self.slicename,
                 agent = None,
                 ident_key = self.ident_path
                 )
-            if out:
-                try:
-                    self._pid, self._ppid = map(int,out.strip().split(' ',1))
-                except:
-                    # Ignore, many ways to fail that don't matter that much
-                    pass
+            
+            if pidtuple:
+                self._pid, self._ppid = pidtuple
     
     def status(self):
         self.checkpid()
@@ -113,11 +116,8 @@ class Application(object):
         elif not self._pid or not self._ppid:
             return STATUS_NOT_STARTED
         else:
-            (out,err),proc = server.popen_ssh_command(
-                "ps --ppid $(ppid)d -o pid | grep -c $(pid)d" % {
-                    'ppid' : self._ppid,
-                    'pid' : self._pid,
-                },
+            status = rspawn.remote_status(
+                self._pid, self._ppid,
                 host = self.node.hostname,
                 port = None,
                 user = self.slicename,
@@ -125,51 +125,28 @@ class Application(object):
                 ident_key = self.ident_path
                 )
             
-            status = False
-            if out:
-                try:
-                    status = bool(int(out.strip()))
-                except:
-                    # Ignore, many ways to fail that don't matter that much
-                    pass
-            return STATUS_RUNNING if status else STATUS_FINISHED
+            if status is rspawn.NOT_STARTED:
+                return STATUS_NOT_STARTED
+            elif status is rspawn.RUNNING:
+                return STATUS_RUNNING
+            elif status is rspawn.FINISHED:
+                return STATUS_FINISHED
+            else:
+                # WTF?
+                return STATUS_NOT_STARTED
     
     def kill(self):
         status = self.status()
         if status == STATUS_RUNNING:
             # kill by ppid+pid - SIGTERM first, then try SIGKILL
-            (out,err),proc = server.popen_ssh_command(
-                """
-kill $(pid)d $(ppid)d 
-for x in 1 2 3 4 5 6 7 8 9 0 ; do 
-    sleep 0.1 
-    if [ `ps --pid $(ppid)d -o pid | grep -c $(pid)d` == `0` ]; then
-        break
-    fi
-    sleep 0.9
-done
-if [ `ps --pid $(ppid)d -o pid | grep -c $(pid)d` != `0` ]; then
-    kill -9 $(pid)d $(ppid)d
-fi
-""" % {
-                    'ppid' : self._ppid,
-                    'pid' : self._pid,
-                },
+            rspawn.remote_kill(
+                self._pid, self._ppid,
                 host = self.node.hostname,
                 port = None,
                 user = self.slicename,
                 agent = None,
                 ident_key = self.ident_path
                 )
-            
-            status = False
-            if out:
-                try:
-                    status = bool(int(out.strip()))
-                except:
-                    # Ignore, many ways to fail that don't matter that much
-                    pass
-            return STATUS_RUNNING if status else STATUS_FINISHED
     
     def remote_trace_path(self, whichtrace):
         if whichtrace in ('stdout','stderr'):
index 3aacdbf..0080436 100644 (file)
@@ -29,6 +29,10 @@ def connect_tun_iface_node(testbed_instance, node, iface):
 
 def connect_app(testbed_instance, node, app):
     app.node = node
+    
+    if app.depends:
+        node.required_packages.update(set(
+            app.depends.split() ))
 
 ### Creation functions ###
 
@@ -77,6 +81,7 @@ def start_application(testbed_instance, guid):
     
     app.stdout = "stdout" in traces
     app.stderr = "stderr" in traces
+    app.buildlog = "buildlog" in traces
     
     app.start()
 
@@ -139,6 +144,9 @@ def configure_node(testbed_instance, guid):
     
     # Do some validations
     node.validate()
+    
+    # TODO: this should be done in parallel in all nodes
+    node.install_dependencies()
 
 def configure_application(testbed_instance, guid):
     app = testbed_instance._elements[guid]
@@ -151,6 +159,9 @@ def configure_application(testbed_instance, guid):
     # Do some validations
     app.validate()
     
+    # Wait for dependencies
+    app.node.wait_dependencies()
+    
     # Install stuff
     app.setup()
 
@@ -362,6 +373,39 @@ attributes = dict({
                 "flags": Attribute.DesignOnly,
                 "validation_function": validation.is_string
             }),
+            
+    "depends": dict({
+                "name": "depends",
+                "help": "Space-separated list of packages required to run the application",
+                "type": Attribute.STRING,
+                "flags": Attribute.DesignOnly,
+                "validation_function": validation.is_string
+            }),
+    "build-depends": dict({
+                "name": "buildDepends",
+                "help": "Space-separated list of packages required to build the application",
+                "type": Attribute.STRING,
+                "flags": Attribute.DesignOnly,
+                "validation_function": validation.is_string
+            }),
+    "sources": dict({
+                "name": "sources",
+                "help": "Space-separated list of regular files to be deployed in the working path prior to building. "
+                        "Archives won't be expanded automatically.",
+                "type": Attribute.STRING,
+                "flags": Attribute.DesignOnly,
+                "validation_function": validation.is_string
+            }),
+    "build": dict({
+                "name": "build",
+                "help": "Build commands to execute after deploying the sources. "
+                        "Sources will be in the initial working folder. "
+                        "Example: cd my-app && ./configure && make && make install.\n"
+                        "Try to make the commands return with a nonzero exit code on error.",
+                "type": Attribute.STRING,
+                "flags": Attribute.DesignOnly,
+                "validation_function": validation.is_string
+            }),
     })
 
 traces = dict({
@@ -372,7 +416,11 @@ traces = dict({
     "stderr": dict({
                 "name": "stderr",
                 "help": "Application standard error",
-              }) 
+              }),
+    "buildlog": dict({
+                "name": "buildlog",
+                "help": "Output of the build process",
+              }), 
     })
 
 create_order = [ INTERNET, NODE, NODEIFACE, TUNIFACE, APPLICATION ]
index e5fd483..dd5e416 100644 (file)
@@ -4,6 +4,8 @@
 from constants import TESTBED_ID
 import plcapi
 import operator
+import rspawn
+import time
 
 class Node(object):
     BASEFILTERS = {
@@ -44,6 +46,13 @@ class Node(object):
         self.max_num_external_ifaces = None
         self.timeframe = 'm'
         
+        # Applications add requirements to connected nodes
+        self.required_packages = set()
+        
+        # Testbed-derived attributes
+        self.slicename = None
+        self.ident_path = None
+        
         # Those are filled when an actual node is allocated
         self._node_id = None
     
@@ -151,3 +160,64 @@ class Node(object):
     def validate(self):
         pass
 
+    def install_dependencies(self):
+        if self.required_packages:
+            # TODO: make dependant on the experiment somehow...
+            pidfile = '/tmp/nepi-depends.pid'
+            logfile = '/tmp/nepi-depends.log'
+            
+            # Start process in a "daemonized" way, using nohup and heavy
+            # stdin/out redirection to avoid connection issues
+            (out,err),proc = rspawn.remote_spawn(
+                "yum -y install %(packages)s" % {
+                    'packages' : ' '.join(self.required_packages),
+                },
+                pidfile = pidfile,
+                stdout = logfile,
+                stderr = rspawn.STDOUT,
+                
+                host = self.hostname,
+                port = None,
+                user = self.slicename,
+                agent = None,
+                ident_key = self.ident_path,
+                sudo = True
+                )
+            
+            if proc.wait():
+                raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
+    
+    def wait_dependencies(self, pidprobe=1, probe=10, pidmax=10):
+        if self.required_packages:
+            # get PID
+            pid = ppid = None
+            for probenum in xrange(pidmax):
+                pidtuple = rspawn.remote_check_pid(
+                    pidfile = pidfile,
+                    host = self.hostname,
+                    port = None,
+                    user = self.slicename,
+                    agent = None,
+                    ident_key = self.ident_path
+                    )
+                if pidtuple:
+                    pid, ppid = pidtuple
+                    break
+                else:
+                    time.sleep(pidprobe)
+            else:
+                raise RuntimeError, "Failed to obtain pidfile for dependency installer"
+        
+            # wait for it to finish
+            while rspawn.RUNNING is rspawn.remote_status(
+                    pid, ppid,
+                    host = self.hostname,
+                    port = None,
+                    user = self.slicename,
+                    agent = None,
+                    ident_key = self.ident_path
+                    ):
+                time.sleep(probe)
+        
+
+
diff --git a/src/nepi/testbeds/planetlab/rspawn.py b/src/nepi/testbeds/planetlab/rspawn.py
new file mode 100644 (file)
index 0000000..55ac8ff
--- /dev/null
@@ -0,0 +1,215 @@
+# Utility library for spawning remote asynchronous tasks
+from nepi.util import server
+import getpass
+
+class STDOUT: 
+    """
+    Special value that when given to remote_spawn in stderr causes stderr to 
+    redirect to whatever stdout was redirected to.
+    """
+
+class RUNNING:
+    """
+    Process is still running
+    """
+
+class FINISHED:
+    """
+    Process is finished
+    """
+
+class NOT_STARTED:
+    """
+    Process hasn't started running yet (this should be very rare)
+    """
+
+def remote_spawn(command, pidfile, stdout='/dev/null', stderr=STDOUT, stdin='/dev/null', home=None, create_home=False, sudo=False,
+        host = None, port = None, user = None, agent = None, ident_key = None):
+    """
+    Spawn a remote command such that it will continue working asynchronously.
+    
+    Parameters:
+        command: the command to run - it should be a single line.
+        
+        pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
+        
+        stdout: path of a file to redirect standard output to - must be a string.
+            Defaults to /dev/null
+        stderr: path of a file to redirect standard error to - string or the special STDOUT value
+            to redirect to the same file stdout was redirected to. Defaults to STDOUT.
+        stdin: path of a file with input to be piped into the command's standard input
+        
+        home: path of a folder to use as working directory - should exist, unless you specify create_home
+        
+        create_home: if True, the home folder will be created first with mkdir -p
+        
+        sudo: whether the command needs to be executed as root
+        
+        host/port/user/agent/ident_key: see nepi.util.server.popen_ssh_command
+    
+    Returns:
+        (stdout, stderr), process
+        
+        Of the spawning process, which only captures errors at spawning time.
+        Usually only useful for diagnostics.
+    """
+    # Start process in a "daemonized" way, using nohup and heavy
+    # stdin/out redirection to avoid connection issues
+    if stderr is STDOUT:
+        stderr = '&1'
+    else:
+        stderr = ' ' + stderr
+    (out,err),proc = server.popen_ssh_command(
+        "%(create)s%(gohome)s rm -f %(pidfile)s ; ( echo $$ $PPID > %(pidfile)s ; %(sudo)s nohup %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s ) &" % {
+            'command' : command,
+            
+            'stdout' : stdout,
+            'stderr' : stderr,
+            'stdin' : stdin,
+            
+            'sudo' : 'sudo' if sudo else '',
+            
+            'pidfile' : server.shell_escape(pidfile),
+            'gohome' : 'cd %s ; ' % (server.shell_escape(home),) if home else '',
+            'create' : 'mkdir -p %s ; ' % (server.shell_escape,) if create_home else '',
+        },
+        host = host,
+        port = port,
+        user = user,
+        agent = agent,
+        ident_key = ident_key
+        )
+    
+    if proc.wait():
+        raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
+
+    return (out,err),proc
+
+def remote_check_pid(pidfile,
+        host = None, port = None, user = None, agent = None, ident_key = None):
+    """
+    Check the pidfile of a process spawned with remote_spawn.
+    
+    Parameters:
+        pidfile: the pidfile passed to remote_span
+        
+        host/port/user/agent/ident_key: see nepi.util.server.popen_ssh_command
+    
+    Returns:
+        
+        A (pid, ppid) tuple useful for calling remote_status and remote_kill,
+        or None if the pidfile isn't valid yet (maybe the process is still starting).
+    """
+
+    (out,err),proc = server.popen_ssh_command(
+        "cat %(pidfile)s" % {
+            'pidfile' : pidfile,
+        },
+        host = host,
+        port = port,
+        user = user,
+        agent = agent,
+        ident_key = ident_key
+        )
+        
+    if proc.wait():
+        return None
+    
+    if out:
+        try:
+            return map(int,out.strip().split(' ',1))
+        except:
+            # Ignore, many ways to fail that don't matter that much
+            return None
+
+
+def remote_status(pid, ppid, 
+        host = None, port = None, user = None, agent = None, ident_key = None):
+    """
+    Check the status of a process spawned with remote_spawn.
+    
+    Parameters:
+        pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
+        
+        host/port/user/agent/ident_key: see nepi.util.server.popen_ssh_command
+    
+    Returns:
+        
+        One of NOT_STARTED, RUNNING, FINISHED
+    """
+
+    (out,err),proc = server.popen_ssh_command(
+        "ps --ppid %(ppid)d -o pid | grep -c %(pid)d ; true" % {
+            'ppid' : ppid,
+            'pid' : pid,
+        },
+        host = host,
+        port = port,
+        user = user,
+        agent = agent,
+        ident_key = ident_key
+        )
+    
+    if proc.wait():
+        return NOT_STARTED
+    
+    status = False
+    if out:
+        try:
+            status = bool(int(out.strip()))
+        except:
+            # Ignore, many ways to fail that don't matter that much
+            return NOT_STARTED
+    return RUNNING if status else FINISHED
+    
+
+def remote_kill(pid, ppid, sudo = False,
+        host = None, port = None, user = None, agent = None, ident_key = None):
+    """
+    Kill a process spawned with remote_spawn.
+    
+    First tries a SIGTERM, and if the process does not end in 10 seconds,
+    it sends a SIGKILL.
+    
+    Parameters:
+        pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
+        
+        sudo: whether the command was run with sudo - careful killing like this.
+        
+        host/port/user/agent/ident_key: see nepi.util.server.popen_ssh_command
+    
+    Returns:
+        
+        Nothing, should have killed the process
+    """
+
+    (out,err),proc = server.popen_ssh_command(
+        """
+%(sudo)s kill %(pid)d %(ppid)d 
+for x in 1 2 3 4 5 6 7 8 9 0 ; do 
+    sleep 0.1 
+    if [ `ps --pid %(ppid)d -o pid | grep -c %(pid)d` == `0` ]; then
+        break
+    fi
+    sleep 0.9
+done
+if [ `ps --pid %(ppid)d -o pid | grep -c %(pid)d` != `0` ]; then
+    %(sudo)s kill -9 %(pid)d %(ppid)d
+fi
+""" % {
+            'ppid' : ppid,
+            'pid' : pid,
+            'sudo' : 'sudo' if sudo else ''
+        },
+        host = host,
+        port = port,
+        user = user,
+        agent = agent,
+        ident_key = ident_key
+        )
+    
+    # wait, don't leave zombies around
+    proc.wait()
+    
+
+
index 9653aed..85ff6e3 100755 (executable)
@@ -12,7 +12,7 @@ import unittest
 import re
 import test_util
 
-class NetnsExecuteTestCase(unittest.TestCase):
+class PlanetLabExecuteTestCase(unittest.TestCase):
     def setUp(self):
         self.root_dir = tempfile.mkdtemp()
         
index 8fa5deb..692bebc 100755 (executable)
@@ -12,7 +12,7 @@ import test_util
 import time
 import unittest
 
-class NetnsIntegrationTestCase(unittest.TestCase):
+class PlanetLabIntegrationTestCase(unittest.TestCase):
     def setUp(self):
         self.root_dir = tempfile.mkdtemp()