X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Ftestbeds%2Fplanetlab%2Fapplication.py;h=a5170a46f85d30134a9749b3c4cdca3c507082c5;hb=5d8a0f74de55f70c290633a26b931dd503adb345;hp=72f83a84cdd19e5f856202c93a0000f2d81f8bab;hpb=4012656f3b75b0945c502935a7d5d2aa9a42470d;p=nepi.git diff --git a/src/nepi/testbeds/planetlab/application.py b/src/nepi/testbeds/planetlab/application.py index 72f83a84..a5170a46 100644 --- a/src/nepi/testbeds/planetlab/application.py +++ b/src/nepi/testbeds/planetlab/application.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # -*- coding: utf-8 -*- from constants import TESTBED_ID @@ -6,13 +5,21 @@ import plcapi import operator import os import os.path +import sys import nepi.util.server as server import cStringIO import subprocess import rspawn +import random +import time +import socket +import threading +import logging +import re -from nepi.util.constants import STATUS_NOT_STARTED, STATUS_RUNNING, \ - STATUS_FINISHED +from nepi.util.constants import ApplicationStatus as AS + +_ccnre = re.compile("\s*(udp|tcp)\s+(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\s*$") class Dependency(object): """ @@ -41,6 +48,7 @@ class Dependency(object): self.depends = None self.buildDepends = None self.sources = None + self.rpmFusion = False self.env = {} self.stdin = None @@ -64,14 +72,29 @@ class Dependency(object): self._setuper = None self._pid = None self._ppid = None + + # Spanning tree deployment + self._master = None + self._master_passphrase = None + self._master_prk = None + self._master_puk = None + self._master_token = os.urandom(8).encode("hex") + self._build_pid = None + self._build_ppid = None + + # Logging + self._logger = logging.getLogger('nepi.testbeds.planetlab') + def __str__(self): return "%s<%s>" % ( self.__class__.__name__, - ' '.join(list(self.depends or []) - + list(self.sources or [])) + ' '.join(filter(bool,(self.depends, self.sources))) ) - + + def deployed(self): + return self._setup + def validate(self): if self.home_path is None: raise AssertionError, "Misconfigured application: missing home path" @@ -84,6 +107,13 @@ class Dependency(object): if self.node.slicename is None: raise AssertionError, "Misconfigured application: unspecified slice" + def check_bad_host(self, out, err): + """ + Called whenever an operation fails, it's given the output to be checked for + telltale signs of unhealthy hosts. + """ + return False + def remote_trace_path(self, whichtrace): if whichtrace in self.TRACES: tracefile = os.path.join(self.home_path, whichtrace) @@ -91,7 +121,12 @@ class Dependency(object): tracefile = None return tracefile - + + def remote_trace_name(self, whichtrace): + if whichtrace in self.TRACES: + return whichtrace + return None + def sync_trace(self, local_dir, whichtrace): tracefile = self.remote_trace_path(whichtrace) if not tracefile: @@ -109,73 +144,86 @@ class Dependency(object): raise RuntimeError, "Failed to synchronize trace" # sync files - (out,err),proc = server.popen_scp( - '%s@%s:%s' % (self.node.slicename, self.node.hostname, - tracefile), - local_path, - port = None, - agent = None, - ident_key = self.node.ident_path, - server_key = self.node.server_key - ) - - if proc.wait(): - raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,) + try: + self._popen_scp( + '%s@%s:%s' % (self.node.slicename, self.node.hostname, + tracefile), + local_path + ) + except RuntimeError, e: + raise RuntimeError, "Failed to synchronize trace: %s %s" \ + % (e.args[0], e.args[1],) return local_path + def recover(self): + # We assume a correct deployment, so recovery only + # means we mark this dependency as deployed + self._setup = True def setup(self): + self._logger.info("Setting up %s", self) self._make_home() - self._build() + self._launch_build() + self._finish_build() self._setup = True def async_setup(self): if not self._setuper: + def setuper(): + try: + self.setup() + except: + self._setuper._exc.append(sys.exc_info()) self._setuper = threading.Thread( - target = self.setup) + target = setuper) + self._setuper._exc = [] self._setuper.start() def async_setup_wait(self): if not self._setup: + self._logger.info("Waiting for %s to be setup", self) if self._setuper: self._setuper.join() if not self._setup: - raise RuntimeError, "Failed to setup application" + if self._setuper._exc: + exctyp,exval,exctrace = self._setuper._exc[0] + raise exctyp,exval,exctrace + else: + raise RuntimeError, "Failed to setup application" + else: + self._logger.info("Setup ready: %s at %s", self, self.node.hostname) else: self.setup() def _make_home(self): # Make sure all the paths are created where # they have to be created for deployment - (out,err),proc = server.popen_ssh_command( - "mkdir -p %s" % (server.shell_escape(self.home_path),), - host = self.node.hostname, - port = None, - user = self.node.slicename, - agent = None, - ident_key = self.node.ident_path, - server_key = self.node.server_key - ) - - if proc.wait(): - raise RuntimeError, "Failed to set up application: %s %s" % (out,err,) - + # sync files + try: + self._popen_ssh_command( + "mkdir -p %(home)s && ( rm -f %(home)s/{pid,build-pid,nepi-build.sh} >/dev/null 2>&1 || /bin/true )" \ + % { 'home' : server.shell_escape(self.home_path) }, + timeout = 120, + retry = 3 + ) + except RuntimeError, e: + raise RuntimeError, "Failed to set up application %s: %s %s" % (self.home_path, e.args[0], e.args[1],) if self.stdin: + stdin = self.stdin + if not os.path.isfile(stdin): + stdin = cStringIO.StringIO(self.stdin) + # Write program input - (out,err),proc = server.popen_scp( - cStringIO.StringIO(self.stdin), - '%s@%s:%s' % (self.node.slicename, self.node.hostname, - os.path.join(self.home_path, 'stdin') ), - port = None, - agent = None, - ident_key = self.node.ident_path, - server_key = self.node.server_key - ) - - if proc.wait(): - raise RuntimeError, "Failed to set up application: %s %s" % (out,err,) + try: + self._popen_scp(stdin, + '%s@%s:%s' % (self.node.slicename, self.node.hostname, + os.path.join(self.home_path, 'stdin') ), + ) + except RuntimeError, e: + raise RuntimeError, "Failed to set up application %s: %s %s" \ + % (self.home_path, e.args[0], e.args[1],) def _replace_paths(self, command): """ @@ -187,92 +235,445 @@ class Dependency(object): .replace("${SOURCES}", root+server.shell_escape(self.home_path)) .replace("${BUILD}", root+server.shell_escape(os.path.join(self.home_path,'build'))) ) - def _build(self): + def _launch_build(self, trial=0): + if self._master is not None: + if not trial or self._master_prk is not None: + self._do_install_keys() + buildscript = self._do_build_slave() + else: + buildscript = self._do_build_master() + + if buildscript is not None: + self._logger.info("Building %s at %s", self, self.node.hostname) + + # upload build script + try: + self._popen_scp( + buildscript, + '%s@%s:%s' % (self.node.slicename, self.node.hostname, + os.path.join(self.home_path, 'nepi-build.sh') ) + ) + except RuntimeError, e: + raise RuntimeError, "Failed to set up application %s: %s %s" \ + % (self.home_path, e.args[0], e.args[1],) + + # launch build + self._do_launch_build() + + def _finish_build(self): + self._do_wait_build() + self._do_install() + + def _do_build_slave(self): + if not self.sources and not self.build: + return None + + # Create build script + files = set() + if self.sources: sources = self.sources.split(' ') - - # Copy all sources - (out,err),proc = server.popen_scp( - sources, - "%s@%s:%s" % (self.node.slicename, self.node.hostname, - os.path.join(self.home_path,'.'),), - ident_key = self.node.ident_path, - server_key = self.node.server_key - ) + files.update( + "%s@%s:%s" % (self._master.node.slicename, self._master.node.hostip, + os.path.join(self._master.home_path, os.path.basename(source)),) + for source in sources + ) + + if self.build: + files.add( + "%s@%s:%s" % (self._master.node.slicename, self._master.node.hostip, + os.path.join(self._master.home_path, 'build.tar.gz'),) + ) + + sshopts = "-o ConnectTimeout=30 -o ConnectionAttempts=3 -o ServerAliveInterval=30 -o TCPKeepAlive=yes" + + launch_agent = "{ ( echo -e '#!/bin/sh\\ncat' > .ssh-askpass ) && chmod u+x .ssh-askpass"\ + " && export SSH_ASKPASS=$(pwd)/.ssh-askpass "\ + " && ssh-agent > .ssh-agent.sh ; } && . ./.ssh-agent.sh && ( echo $NEPI_MASTER_PASSPHRASE | ssh-add %(prk)s ) && rm -rf %(prk)s %(puk)s" % \ + { + 'prk' : server.shell_escape(self._master_prk_name), + 'puk' : server.shell_escape(self._master_puk_name), + } + + kill_agent = "kill $SSH_AGENT_PID" + + waitmaster = ( + "{ " + "echo 'Checking master reachability' ; " + "if ping -c 3 %(master_host)s && (. ./.ssh-agent.sh > /dev/null ; ssh -o UserKnownHostsFile=%(hostkey)s %(sshopts)s %(master)s echo MASTER SAYS HI ) ; then " + "echo 'Master node reachable' ; " + "else " + "echo 'MASTER NODE UNREACHABLE' && " + "exit 1 ; " + "fi ; " + ". ./.ssh-agent.sh ; " + "while [[ $(. ./.ssh-agent.sh > /dev/null ; ssh -q -o UserKnownHostsFile=%(hostkey)s %(sshopts)s %(master)s cat %(token_path)s.retcode || /bin/true) != %(token)s ]] ; do sleep 5 ; done ; " + "if [[ $(. ./.ssh-agent.sh > /dev/null ; ssh -q -o UserKnownHostsFile=%(hostkey)s %(sshopts)s %(master)s cat %(token_path)s || /bin/true) != %(token)s ]] ; then echo BAD TOKEN ; exit 1 ; fi ; " + "}" + ) % { + 'hostkey' : 'master_known_hosts', + 'master' : "%s@%s" % (self._master.node.slicename, self._master.node.hostip), + 'master_host' : self._master.node.hostip, + 'token_path' : os.path.join(self._master.home_path, 'build.token'), + 'token' : server.shell_escape(self._master._master_token), + 'sshopts' : sshopts, + } + + syncfiles = ". ./.ssh-agent.sh && scp -p -o UserKnownHostsFile=%(hostkey)s %(sshopts)s %(files)s ." % { + 'hostkey' : 'master_known_hosts', + 'files' : ' '.join(files), + 'sshopts' : sshopts, + } + if self.build: + syncfiles += " && tar xzf build.tar.gz" + syncfiles += " && ( echo %s > build.token )" % (server.shell_escape(self._master_token),) + syncfiles += " && ( echo %s > build.token.retcode )" % (server.shell_escape(self._master_token),) + syncfiles = "{ . ./.ssh-agent.sh ; %s ; }" % (syncfiles,) + + cleanup = "{ . ./.ssh-agent.sh ; kill $SSH_AGENT_PID ; rm -rf %(prk)s %(puk)s master_known_hosts .ssh-askpass ; }" % { + 'prk' : server.shell_escape(self._master_prk_name), + 'puk' : server.shell_escape(self._master_puk_name), + } + + slavescript = "( ( %(launch_agent)s && %(waitmaster)s && %(syncfiles)s && %(kill_agent)s && %(cleanup)s ) || %(cleanup)s ) ; echo %(token)s > build.token.retcode" % { + 'waitmaster' : waitmaster, + 'syncfiles' : syncfiles, + 'cleanup' : cleanup, + 'kill_agent' : kill_agent, + 'launch_agent' : launch_agent, + 'home' : server.shell_escape(self.home_path), + 'token' : server.shell_escape(self._master_token), + } - if proc.wait(): - raise RuntimeError, "Failed upload source file %r: %s %s" % (source, out,err,) + return cStringIO.StringIO(slavescript) + + def _do_launch_build(self): + script = "bash ./nepi-build.sh" + if self._master_passphrase: + script = "NEPI_MASTER_PASSPHRASE=%s %s" % ( + server.shell_escape(self._master_passphrase), + script + ) + (out,err),proc = rspawn.remote_spawn( + script, + pidfile = 'build-pid', + home = self.home_path, + stdin = '/dev/null', + stdout = 'buildlog', + stderr = rspawn.STDOUT, - if self.buildDepends: - # Install build dependencies - (out,err),proc = server.popen_ssh_command( - "sudo -S yum -y install %(packages)s" % { - 'packages' : self.buildDepends - }, + host = self.node.hostname, + port = None, + user = self.node.slicename, + agent = None, + ident_key = self.node.ident_path, + server_key = self.node.server_key, + hostip = self.node.hostip, + ) + + if proc.wait(): + if self.check_bad_host(out, err): + self.node.blacklist() + raise RuntimeError, "Failed to set up build slave %s: %s %s" % (self.home_path, out,err,) + + + pid = ppid = None + delay = 1.0 + for i in xrange(5): + pidtuple = rspawn.remote_check_pid( + os.path.join(self.home_path,'build-pid'), host = self.node.hostname, port = None, user = self.node.slicename, agent = None, ident_key = self.node.ident_path, - server_key = self.node.server_key + server_key = self.node.server_key, + hostip = self.node.hostip ) + + if pidtuple: + pid, ppid = pidtuple + self._build_pid, self._build_ppid = pidtuple + break + else: + time.sleep(delay) + delay = min(30,delay*1.2) + else: + raise RuntimeError, "Failed to set up build slave %s: cannot get pid" % (self.home_path,) + + self._logger.info("Deploying %s at %s", self, self.node.hostname) - if proc.wait(): - raise RuntimeError, "Failed instal build dependencies: %s %s" % (out,err,) + def _do_wait_build(self, trial=0): + pid = self._build_pid + ppid = self._build_ppid + if pid and ppid: + delay = 1.0 + first = True + bustspin = 0 + while True: + status = rspawn.remote_status( + pid, ppid, + host = self.node.hostname, + port = None, + user = self.node.slicename, + agent = None, + ident_key = self.node.ident_path, + server_key = self.node.server_key, + hostip = self.node.hostip + ) + + if status is rspawn.FINISHED: + self._build_pid = self._build_ppid = None + break + elif status is not rspawn.RUNNING: + self._logger.warn("Busted waiting for %s to finish building at %s %s", self, self.node.hostname, + "(build slave)" if self._master is not None else "(build master)") + bustspin += 1 + time.sleep(delay*(5.5+random.random())) + if bustspin > 12: + self._build_pid = self._build_ppid = None + break + else: + if first: + self._logger.info("Waiting for %s to finish building at %s %s", self, self.node.hostname, + "(build slave)" if self._master is not None else "(build master)") + + first = False + time.sleep(delay*(0.5+random.random())) + delay = min(30,delay*1.2) + bustspin = 0 - if self.build: - # Build sources - (out,err),proc = server.popen_ssh_command( - "cd %(home)s && mkdir -p build && cd build && ( %(command)s ) > ${HOME}/%(home)s/buildlog 2>&1 || ( tail ${HOME}/%(home)s/buildlog >&2 && false )" % { - 'command' : self._replace_paths(self.build), - 'home' : server.shell_escape(self.home_path), - }, + # check build token + slave_token = "" + for i in xrange(3): + (out, err), proc = self._popen_ssh_command( + "cat %(token_path)s" % { + 'token_path' : os.path.join(self.home_path, 'build.token'), + }, + timeout = 120, + noerrors = True) + if not proc.wait() and out: + slave_token = out.strip() + + if slave_token: + break + else: + time.sleep(2) + + if slave_token != self._master_token: + # Get buildlog for the error message + + (buildlog, err), proc = self._popen_ssh_command( + "cat %(buildlog)s" % { + 'buildlog' : os.path.join(self.home_path, 'buildlog'), + 'buildscript' : os.path.join(self.home_path, 'nepi-build.sh'), + }, + timeout = 120, + noerrors = True) + + proc.wait() + + if self.check_bad_host(buildlog, err): + self.node.blacklist() + elif self._master and trial < 3 and 'BAD TOKEN' in buildlog or 'BAD TOKEN' in err: + # bad sync with master, may try again + # but first wait for master + self._master.async_setup_wait() + self._launch_build(trial+1) + return self._do_wait_build(trial+1) + elif trial < 3: + return self._do_wait_build(trial+1) + else: + # No longer need'em + self._master_prk = None + self._master_puk = None + + raise RuntimeError, "Failed to set up application %s: "\ + "build failed, got wrong token from pid %s/%s "\ + "(expected %r, got %r), see buildlog at %s:\n%s" % ( + self.home_path, pid, ppid, self._master_token, slave_token, self.node.hostname, buildlog) + + # No longer need'em + self._master_prk = None + self._master_puk = None + + self._logger.info("Built %s at %s", self, self.node.hostname) + + def _do_kill_build(self): + pid = self._build_pid + ppid = self._build_ppid + + if pid and ppid: + self._logger.info("Killing build of %s", self) + rspawn.remote_kill( + pid, ppid, host = self.node.hostname, port = None, user = self.node.slicename, agent = None, ident_key = self.node.ident_path, - server_key = self.node.server_key + hostip = self.node.hostip ) - if proc.wait(): - raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,) - - # Make archive - (out,err),proc = server.popen_ssh_command( - "cd %(home)s && tar czf build.tar.gz build" % { + + def _do_build_master(self): + if not self.sources and not self.build and not self.buildDepends: + return None + + if self.sources: + sources = self.sources.split(' ') + + # Copy all sources + try: + self._popen_scp( + sources, + "%s@%s:%s" % (self.node.slicename, self.node.hostname, + os.path.join(self.home_path,'.'),) + ) + except RuntimeError, e: + raise RuntimeError, "Failed upload source file %r: %s %s" \ + % (sources, e.args[0], e.args[1],) + + buildscript = cStringIO.StringIO() + + buildscript.write("(\n") + + if self.buildDepends: + # Install build dependencies + buildscript.write( + "sudo -S yum -y install %(packages)s\n" % { + 'packages' : self.buildDepends + } + ) + + + if self.build: + # Build sources + buildscript.write( + "mkdir -p build && ( cd build && ( %(command)s ) )\n" % { 'command' : self._replace_paths(self.build), 'home' : server.shell_escape(self.home_path), - }, - host = self.node.hostname, - port = None, - user = self.node.slicename, - agent = None, - ident_key = self.node.ident_path, - server_key = self.node.server_key - ) + } + ) - if proc.wait(): - raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,) + # Make archive + buildscript.write("tar czf build.tar.gz build\n") + + # Write token + buildscript.write("echo %(master_token)s > build.token ) ; echo %(master_token)s > build.token.retcode" % { + 'master_token' : server.shell_escape(self._master_token) + }) + + buildscript.seek(0) + + return buildscript + def _do_install(self): if self.install: + self._logger.info("Installing %s at %s", self, self.node.hostname) + # Install application - (out,err),proc = server.popen_ssh_command( - "cd %(home)s && cd build && ( %(command)s ) > ${HOME}/%(home)s/installlog 2>&1 || ( tail ${HOME}/%(home)s/installlog >&2 && false )" % { - 'command' : self._replace_paths(self.install), - 'home' : server.shell_escape(self.home_path), - }, - host = self.node.hostname, - port = None, - user = self.node.slicename, - agent = None, - ident_key = self.node.ident_path, - server_key = self.node.server_key + try: + self._popen_ssh_command( + "cd %(home)s && cd build && ( %(command)s ) > ${HOME}/%(home)s/installlog 2>&1 || ( tail ${HOME}/%(home)s/{install,build}log >&2 && false )" % \ + { + 'command' : self._replace_paths(self.install), + 'home' : server.shell_escape(self.home_path), + }, + ) + except RuntimeError, e: + if self.check_bad_host(e.args[0], e.args[1]): + self.node.blacklist() + raise RuntimeError, "Failed install build sources on node %s: %s %s" % ( + self.node.hostname, e.args[0], e.args[1],) + + def set_master(self, master): + self._master = master + + def install_keys(self, prk, puk, passphrase): + # Install keys + self._master_passphrase = passphrase + self._master_prk = prk + self._master_puk = puk + self._master_prk_name = os.path.basename(prk.name) + self._master_puk_name = os.path.basename(puk.name) + + def _do_install_keys(self): + prk = self._master_prk + puk = self._master_puk + + try: + self._popen_scp( + [ prk.name, puk.name ], + '%s@%s:%s' % (self.node.slicename, self.node.hostname, self.home_path ) ) + except RuntimeError, e: + raise RuntimeError, "Failed to set up application deployment keys: %s %s" \ + % (e.args[0], e.args[1],) + + try: + self._popen_scp( + cStringIO.StringIO('%s,%s %s\n' % ( + self._master.node.hostname, self._master.node.hostip, + self._master.node.server_key)), + '%s@%s:%s' % (self.node.slicename, self.node.hostname, + os.path.join(self.home_path,"master_known_hosts") ) + ) + except RuntimeError, e: + raise RuntimeError, "Failed to set up application deployment keys: %s %s" \ + % (e.args[0], e.args[1],) + + + def cleanup(self): + # make sure there's no leftover build processes + self._do_kill_build() - if proc.wait(): - raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,) + # No longer need'em + self._master_prk = None + self._master_puk = None + + @server.eintr_retry + def _popen_scp(self, src, dst, retry = 3): + while 1: + try: + (out,err),proc = server.popen_scp( + src, + dst, + port = None, + agent = None, + ident_key = self.node.ident_path, + server_key = self.node.server_key + ) + + if server.eintr_retry(proc.wait)(): + raise RuntimeError, (out, err) + return (out, err), proc + except: + if retry <= 0: + raise + else: + retry -= 1 + + + @server.eintr_retry + def _popen_ssh_command(self, command, retry = 0, noerrors=False, timeout=None): + (out,err),proc = server.popen_ssh_command( + command, + host = self.node.hostname, + port = None, + user = self.node.slicename, + agent = None, + ident_key = self.node.ident_path, + server_key = self.node.server_key, + timeout = timeout, + retry = retry + ) + + if server.eintr_retry(proc.wait)(): + if not noerrors: + raise RuntimeError, (out, err) + return (out, err), proc class Application(Dependency): """ @@ -281,7 +682,7 @@ class Application(Dependency): It adds the output of that command as traces. """ - TRACES = ('stdout','stderr','buildlog') + TRACES = ('stdout','stderr','buildlog', 'output') def __init__(self, api=None): super(Application,self).__init__(api) @@ -293,6 +694,7 @@ class Application(Dependency): self.stdin = None self.stdout = None self.stderr = None + self.output = None # Those are filled when the app is started # Having both pid and ppid makes it harder @@ -312,6 +714,8 @@ class Application(Dependency): ) def start(self): + self._logger.info("Starting %s", self) + # Create shell script with the command # This way, complex commands and scripts can be ran seamlessly # sync files @@ -328,19 +732,16 @@ class Application(Dependency): command.write('export %s=%s\n' % (envkey, envval)) command.write(self.command) command.seek(0) - - (out,err),proc = server.popen_scp( - command, - '%s@%s:%s' % (self.node.slicename, self.node.hostname, - os.path.join(self.home_path, "app.sh")), - port = None, - agent = None, - ident_key = self.node.ident_path, - server_key = self.node.server_key - ) - - if proc.wait(): - raise RuntimeError, "Failed to set up application: %s %s" % (out,err,) + + try: + self._popen_scp( + command, + '%s@%s:%s' % (self.node.slicename, self.node.hostname, + os.path.join(self.home_path, "app.sh")) + ) + except RuntimeError, e: + raise RuntimeError, "Failed to set up application: %s %s" \ + % (e.args[0], e.args[1],) # Start process in a "daemonized" way, using nohup and heavy # stdin/out redirection to avoid connection issues @@ -353,7 +754,6 @@ class Application(Dependency): stdout = 'stdout' if self.stdout else '/dev/null', stderr = 'stderr' if self.stderr else '/dev/null', sudo = self.sudo, - host = self.node.hostname, port = None, user = self.node.slicename, @@ -363,9 +763,18 @@ class Application(Dependency): ) if proc.wait(): + if self.check_bad_host(out, err): + self.node.blacklist() raise RuntimeError, "Failed to set up application: %s %s" % (out,err,) self._started = True + + def recover(self): + # Assuming the application is running on PlanetLab, + # proper pidfiles should be present at the app's home path. + # So we mark this application as started, and check the pidfiles + self._started = True + self.checkpid() def checkpid(self): # Get PID/PPID @@ -387,9 +796,9 @@ class Application(Dependency): def status(self): self.checkpid() if not self._started: - return STATUS_NOT_STARTED + return AS.STATUS_NOT_STARTED elif not self._pid or not self._ppid: - return STATUS_NOT_STARTED + return AS.STATUS_NOT_STARTED else: status = rspawn.remote_status( self._pid, self._ppid, @@ -397,22 +806,23 @@ class Application(Dependency): port = None, user = self.node.slicename, agent = None, - ident_key = self.node.ident_path + ident_key = self.node.ident_path, + server_key = self.node.server_key ) if status is rspawn.NOT_STARTED: - return STATUS_NOT_STARTED + return AS.STATUS_NOT_STARTED elif status is rspawn.RUNNING: - return STATUS_RUNNING + return AS.STATUS_RUNNING elif status is rspawn.FINISHED: - return STATUS_FINISHED + return AS.STATUS_FINISHED else: # WTF? - return STATUS_NOT_STARTED + return AS.STATUS_NOT_STARTED def kill(self): status = self.status() - if status == STATUS_RUNNING: + if status == AS.STATUS_RUNNING: # kill by ppid+pid - SIGTERM first, then try SIGKILL rspawn.remote_kill( self._pid, self._ppid, @@ -421,9 +831,12 @@ class Application(Dependency): user = self.node.slicename, agent = None, ident_key = self.node.ident_path, - server_key = self.node.server_key + server_key = self.node.server_key, + sudo = self.sudo ) - + self._logger.info("Killed %s", self) + + class NepiDependency(Dependency): """ This dependency adds nepi itself to the python path, @@ -448,7 +861,7 @@ class NepiDependency(Dependency): tarname = os.path.basename(self.tarball.name) # it's already built - just move the tarball into place - self.build = "mv ${SOURCES}/%s ." % (tarname,) + self.build = "mv -f ${SOURCES}/%s ." % (tarname,) # unpack it into sources, and we're done self.install = "tar xzf ${BUILD}/%s -C .." % (tarname,) @@ -492,26 +905,41 @@ class NS3Dependency(Dependency): def __init__(self, api = None): super(NS3Dependency, self).__init__(api) - self.buildDepends = 'build-essential waf gcc gcc-c++ gccxml unzip' + self.buildDepends = 'make waf gcc gcc-c++ gccxml unzip bzr' # We have to download the sources, untar, build... - pybindgen_source_url = "http://pybindgen.googlecode.com/files/pybindgen-0.15.0.zip" - pygccxml_source_url = "http://leaseweb.dl.sourceforge.net/project/pygccxml/pygccxml/pygccxml-1.0/pygccxml-1.0.0.zip" - ns3_source_url = "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/ns-3-dev/archive/tip.tar.gz" - passfd_source_url = "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-passfd/archive/tip.tar.gz" + #pygccxml_source_url = "http://leaseweb.dl.sourceforge.net/project/pygccxml/pygccxml/pygccxml-1.0/pygccxml-1.0.0.zip" + pygccxml_source_url = "http://yans.pl.sophia.inria.fr/libs/pygccxml-1.0.0.zip" + ns3_source_url = "http://nepi.inria.fr/code/nepi-ns3.13/archive/tip.tar.gz" + passfd_source_url = "http://nepi.inria.fr/code/python-passfd/archive/tip.tar.gz" + + pybindgen_version = "797" + self.build =( " ( " " cd .. && " " python -c 'import pygccxml, pybindgen, passfd' && " - " test -f lib/_ns3.so && " - " test -f lib/libns3.so " + " test -f lib/ns/_core.so && " + " test -f lib/ns/__init__.py && " + " test -f lib/ns/core.py && " + " test -f lib/libns3-core.so && " + " LD_LIBRARY_PATH=lib PYTHONPATH=lib python -c 'import ns.core' " " ) || ( " # Not working, rebuild - "wget -q -c -O pybindgen-src.zip %(pybindgen_source_url)s && " # continue, to exploit the case when it has already been dl'ed - "wget -q -c -O pygccxml-1.0.0.zip %(pygccxml_source_url)s && " - "wget -q -c -O passfd-src.tar.gz %(passfd_source_url)s && " - "wget -q -c -O ns3-src.tar.gz %(ns3_source_url)s && " - "unzip -n pybindgen-src.zip && " # Do not overwrite files, to exploit the case when it has already been built + # Archive SHA1 sums to check + "echo '7158877faff2254e6c094bf18e6b4283cac19137 pygccxml-1.0.0.zip' > archive_sums.txt && " + " ( " # check existing files + " sha1sum -c archive_sums.txt && " + " test -f passfd-src.tar.gz && " + " test -f ns3-src.tar.gz " + " ) || ( " # nope? re-download + " rm -rf pybindgen pygccxml-1.0.0.zip passfd-src.tar.gz ns3-src.tar.gz && " + " bzr checkout lp:pybindgen -r %(pybindgen_version)s && " # continue, to exploit the case when it has already been dl'ed + " wget -q -c -O pygccxml-1.0.0.zip %(pygccxml_source_url)s && " + " wget -q -c -O passfd-src.tar.gz %(passfd_source_url)s && " + " wget -q -c -O ns3-src.tar.gz %(ns3_source_url)s && " + " sha1sum -c archive_sums.txt " # Check SHA1 sums when applicable + " ) && " "unzip -n pygccxml-1.0.0.zip && " "mkdir -p ns3-src && " "mkdir -p passfd-src && " @@ -524,7 +952,7 @@ class NS3Dependency(Dependency): "python setup.py build && " "python setup.py install --install-lib ${BUILD}/target && " "python setup.py clean && " - "cd ../pybindgen-0.15.0 && " + "cd ../pybindgen && " "export PYTHONPATH=$PYTHONPATH:${BUILD}/target && " "./waf configure --prefix=${BUILD}/target -d release && " "./waf && " @@ -537,13 +965,16 @@ class NS3Dependency(Dependency): "python setup.py install --install-lib ${BUILD}/target && " "python setup.py clean && " "cd ../ns3-src && " - "./waf configure --prefix=${BUILD}/target -d release --disable-examples --high-precision-as-double && " + "./waf configure --prefix=${BUILD}/target --with-pybindgen=../pybindgen-src -d release --disable-examples --disable-tests && " "./waf &&" "./waf install && " - "./waf clean" + "rm -f ${BUILD}/target/lib/*.so && " + "cp -a ${BUILD}/ns3-src/build/libns3*.so ${BUILD}/target/lib && " + "cp -a ${BUILD}/ns3-src/build/bindings/python/ns ${BUILD}/target/lib &&" + "./waf clean " " )" % dict( - pybindgen_source_url = server.shell_escape(pybindgen_source_url), + pybindgen_version = server.shell_escape(pybindgen_version), pygccxml_source_url = server.shell_escape(pygccxml_source_url), ns3_source_url = server.shell_escape(ns3_source_url), passfd_source_url = server.shell_escape(passfd_source_url), @@ -554,10 +985,15 @@ class NS3Dependency(Dependency): " ( " " cd .. && " " python -c 'import pygccxml, pybindgen, passfd' && " - " test -f lib/_ns3.so && " - " test -f lib/libns3.so " + " test -f lib/ns/_core.so && " + " test -f lib/ns/__init__.py && " + " test -f lib/ns/core.py && " + " test -f lib/libns3-core.so && " + " LD_LIBRARY_PATH=lib PYTHONPATH=lib python -c 'import ns.core' " " ) || ( " # Not working, reinstall + "test -d ${BUILD}/target && " + "[[ \"x\" != \"x$(find ${BUILD}/target -mindepth 1 -print -quit)\" ]] &&" "( for i in ${BUILD}/target/* ; do rm -rf ${SOURCES}/${i##*/} ; done ) && " # mv doesn't like unclean targets "mv -f ${BUILD}/target/* ${SOURCES}" " )" @@ -565,7 +1001,7 @@ class NS3Dependency(Dependency): # Set extra environment paths self.env['NEPI_NS3BINDINGS'] = "${SOURCES}/lib" - self.env['NEPI_NS3LIBRARY'] = "${SOURCES}/lib/libns3.so" + self.env['NEPI_NS3LIBRARY'] = "${SOURCES}/lib" @property def tarball(self): @@ -595,4 +1031,236 @@ class NS3Dependency(Dependency): return self._tarball +class YumDependency(Dependency): + """ + This dependency is an internal helper class used to + efficiently distribute yum-downloaded rpms. + + It temporarily sets the yum cache as persistent in the + build master, and installs all the required packages. + + The rpm packages left in the yum cache are gathered and + distributed by the underlying Dependency in an efficient + manner. Build slaves will then install those rpms back in + the cache before issuing the install command. + + When packages have been installed already, nothing but an + empty tar is distributed. + """ + + # Class attribute holding a *weak* reference to the shared NEPI tar file + # so that they may share it. Don't operate on the file itself, it would + # be a mess, just use its path. + _shared_nepi_tar = None + + def _build_get(self): + # canonical representation of dependencies + depends = ' '.join( sorted( (self.depends or "").split(' ') ) ) + + # download rpms and pack into a tar archive + return ( + "sudo -S nice yum -y makecache && " + "sudo -S sed -i -r 's/keepcache *= *0/keepcache=1/' /etc/yum.conf && " + " ( ( " + "sudo -S nice yum -y install %s ; " + "rm -f ${BUILD}/packages.tar ; " + "tar -C /var/cache/yum -rf ${BUILD}/packages.tar $(cd /var/cache/yum ; find -iname '*.rpm')" + " ) || /bin/true ) && " + "sudo -S sed -i -r 's/keepcache *= *1/keepcache=0/' /etc/yum.conf && " + "( sudo -S nice yum -y clean packages || /bin/true ) " + ) % ( depends, ) + def _build_set(self, value): + # ignore + return + build = property(_build_get, _build_set) + + def _install_get(self): + # canonical representation of dependencies + depends = ' '.join( sorted( (self.depends or "").split(' ') ) ) + + # unpack cached rpms into yum cache, install, and cleanup + return ( + "sudo -S tar -k --keep-newer-files -C /var/cache/yum -xf packages.tar && " + "sudo -S nice yum -y install %s && " + "( sudo -S nice yum -y clean packages || /bin/true ) " + ) % ( depends, ) + def _install_set(self, value): + # ignore + return + install = property(_install_get, _install_set) + + def check_bad_host(self, out, err): + badre = re.compile(r'(?:' + r'The GPG keys listed for the ".*" repository are already installed but they are not correct for this package' + r'|Error: Cannot retrieve repository metadata (repomd.xml) for repository: .*[.] Please verify its path and try again' + r'|Error: disk I/O error' + r'|MASTER NODE UNREACHABLE' + r')', + re.I) + return badre.search(out) or badre.search(err) or self.node.check_bad_host(out,err) + + +class CCNxDaemon(Application): + """ + An application also has dependencies, but also a command to be ran and monitored. + + It adds the output of that command as traces. + """ + + def __init__(self, api=None): + super(CCNxDaemon,self).__init__(api) + + # Attributes + self.ccnLocalPort = None + self.ccnRoutes = None + self.ccnxVersion = "ccnx-0.6.0" + + #self.ccnx_0_5_1_sources = "http://www.ccnx.org/releases/ccnx-0.5.1.tar.gz" + self.ccnx_0_5_1_sources = "http://yans.pl.sophia.inria.fr/libs/ccnx-0.5.1.tar.gz" + #self.ccnx_0_6_0_sources = "http://www.ccnx.org/releases/ccnx-0.6.0.tar.gz" + self.ccnx_0_6_0_sources = "http://yans.pl.sophia.inria.fr/libs/ccnx-0.6.0.tar.gz" + self.buildDepends = 'make gcc development-tools openssl-devel expat-devel libpcap-devel libxml2-devel' + + self.ccnx_0_5_1_build = ( + " ( " + " cd .. && " + " test -d ccnx-0.5.1-src/build/bin " + " ) || ( " + # Not working, rebuild + "(" + " mkdir -p ccnx-0.5.1-src && " + " wget -q -c -O ccnx-0.5.1-src.tar.gz %(ccnx_source_url)s &&" + " tar xf ccnx-0.5.1-src.tar.gz --strip-components=1 -C ccnx-0.5.1-src " + ") && " + "cd ccnx-0.5.1-src && " + "mkdir -p build/include &&" + "mkdir -p build/lib &&" + "mkdir -p build/bin &&" + "I=$PWD/build && " + "INSTALL_BASE=$I ./configure &&" + "make && make install" + " )") % dict( + ccnx_source_url = server.shell_escape(self.ccnx_0_5_1_sources), + ) + + self.ccnx_0_5_1_install = ( + " ( " + " test -d ${BUILD}/ccnx-0.5.1-src/build/bin && " + " cp -r ${BUILD}/ccnx-0.5.1-src/build/bin ${SOURCES}" + " )" + ) + + self.ccnx_0_6_0_build = ( + " ( " + " cd .. && " + " test -d ccnx-0.6.0-src/build/bin " + " ) || ( " + # Not working, rebuild + "(" + " mkdir -p ccnx-0.6.0-src && " + " wget -q -c -O ccnx-0.6.0-src.tar.gz %(ccnx_source_url)s &&" + " tar xf ccnx-0.6.0-src.tar.gz --strip-components=1 -C ccnx-0.6.0-src " + ") && " + "cd ccnx-0.6.0-src && " + "./configure && make" + " )") % dict( + ccnx_source_url = server.shell_escape(self.ccnx_0_6_0_sources), + ) + + self.ccnx_0_6_0_install = ( + " ( " + " test -d ${BUILD}/ccnx-0.6.0-src/bin && " + " cp -r ${BUILD}/ccnx-0.6.0-src/bin ${SOURCES}" + " )" + ) + + self.env['PATH'] = "$PATH:${SOURCES}/bin" + def setup(self): + # setting ccn sources + if not self.build: + if self.ccnxVersion == 'ccnx-0.6.0': + self.build = self.ccnx_0_6_0_build + elif self.ccnxVersion == 'ccnx-0.5.1': + self.build = self.ccnx_0_5_1_build + + if not self.install: + if self.ccnxVersion == 'ccnx-0.6.0': + self.install = self.ccnx_0_6_0_install + elif self.ccnxVersion == 'ccnx-0.5.1': + self.install = self.ccnx_0_5_1_install + + super(CCNxDaemon, self).setup() + + def start(self): + self.command = "" + if self.ccnLocalPort: + self.command = "export CCN_LOCAL_PORT=%s ; " % self.ccnLocalPort + self.command += " ccndstart " + + # configure ccn routes + if self.ccnRoutes: + routes = self.ccnRoutes.split("|") + + if self.ccnLocalPort: + routes = map(lambda route: "%s %s" %(route, + self.ccnLocalPort) if _ccnre.match(route) else route, + routes) + + routes = map(lambda route: "ccndc add ccnx:/ %s" % route, + routes) + + routescmd = " ; ".join(routes) + self.command += " ; " + self.command += routescmd + + # Start will be invoked in prestart step + super(CCNxDaemon, self).start() + + def kill(self): + self._logger.info("Killing %s", self) + + command = "${SOURCES}/bin/ccndstop" + + if self.ccnLocalPort: + self.command = "export CCN_LOCAL_PORT=%s; %s" % (self.ccnLocalPort, command) + + cmd = self._replace_paths(command) + command = cStringIO.StringIO() + command.write(cmd) + command.seek(0) + + try: + self._popen_scp( + command, + '%s@%s:%s' % (self.node.slicename, self.node.hostname, + os.path.join(self.home_path, "kill.sh")) + ) + except RuntimeError, e: + raise RuntimeError, "Failed to kill ccndxdaemon: %s %s" \ + % (e.args[0], e.args[1],) + + + script = "bash ./kill.sh" + (out,err),proc = rspawn.remote_spawn( + script, + pidfile = 'kill-pid', + home = self.home_path, + stdin = '/dev/null', + stdout = 'killlog', + stderr = rspawn.STDOUT, + + host = self.node.hostname, + port = None, + user = self.node.slicename, + agent = None, + ident_key = self.node.ident_path, + server_key = self.node.server_key, + hostip = self.node.hostip, + ) + + if proc.wait(): + raise RuntimeError, "Failed to kill cnnxdaemon: %s %s" % (out,err,) + + super(CCNxDaemon, self).kill() +