From 94f4bad47f6b00e8d1e9ad9c55bfbbd6894329fc Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Sat, 11 May 2013 18:43:16 +0200 Subject: [PATCH] Adding Linux Application scalability tests --- examples/linux/ccnx/simple_topo.py | 5 +- examples/linux/scalability.py | 146 ++++++++++++++++++++++++ src/neco/execution/ec.py | 79 +++++++++---- src/neco/execution/resource.py | 14 ++- src/neco/resources/linux/application.py | 44 ++++--- src/neco/resources/linux/node.py | 27 +++-- src/neco/util/sshfuncs.py | 17 ++- 7 files changed, 276 insertions(+), 56 deletions(-) create mode 100644 examples/linux/scalability.py diff --git a/examples/linux/ccnx/simple_topo.py b/examples/linux/ccnx/simple_topo.py index f57ce201..031a5299 100644 --- a/examples/linux/ccnx/simple_topo.py +++ b/examples/linux/ccnx/simple_topo.py @@ -120,8 +120,9 @@ if __name__ == '__main__': # Search for available RMs populate_factory() - - host1 = 'nepi2.pl.sophia.inria.fr' + + #host1 = 'nepi2.pl.sophia.inria.fr' + host1 = 'planetlab2.u-strasbg.fr' host2 = 'roseval.pl.sophia.inria.fr' ec = ExperimentController(exp_id = exp_id) diff --git a/examples/linux/scalability.py b/examples/linux/scalability.py new file mode 100644 index 00000000..110405a1 --- /dev/null +++ b/examples/linux/scalability.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python +from neco.execution.ec import ExperimentController, ECState +from neco.execution.resource import ResourceState, ResourceAction, \ + populate_factory + +from optparse import OptionParser, SUPPRESS_HELP + +import os +import time + +def add_node(ec, host, user): + node = ec.register_resource("LinuxNode") + ec.set(node, "hostname", host) + ec.set(node, "username", user) + ec.set(node, "cleanHome", True) + ec.set(node, "cleanProcesses", True) + return node + +def add_app(ec): + app = ec.register_resource("LinuxApplication") + ec.set(app, "command", "sleep 30; echo 'HOLA'") + return app + +def get_options(): + slicename = os.environ.get("PL_SLICE") + + usage = "usage: %prog -s " + + parser = OptionParser(usage=usage) + parser.add_option("-s", "--pl-slice", dest="pl_slice", + help="PlanetLab slicename", default=slicename, type="str") + parser.add_option("-l", "--exp-id", dest="exp_id", + help="Label to identify experiment", type="str") + + (options, args) = parser.parse_args() + + return (options.pl_slice, options.exp_id) + +if __name__ == '__main__': + ( pl_slice, exp_id ) = get_options() + + # Search for available RMs + populate_factory() + + apps = [] + + hostnames = [ + "planetlab-2.research.netlab.hut.fi", + "planetlab2.willab.fi", + "planetlab3.hiit.fi", + "planetlab4.hiit.fi", + "planetlab1.willab.fi", + "planetlab1.s3.kth.se", + "itchy.comlab.bth.se", + "planetlab-1.ida.liu.se", + "planetlab2.s3.kth.se", + "planetlab1.sics.se", + "planetlab1.tlm.unavarra.es", + "planetlab2.uc3m.es", + "planetlab1.uc3m.es", + "planetlab2.um.es", + "planet1.servers.ua.pt", + "planetlab2.fct.ualg.pt", + "planetlab-1.tagus.ist.utl.pt", + "planetlab-2.tagus.ist.utl.pt", + "planetlab-um00.di.uminho.pt", + "planet2.servers.ua.pt", + "planetlab1.mini.pw.edu.pl", + "roti.mimuw.edu.pl", + "planetlab1.ci.pwr.wroc.pl", + "planetlab1.pjwstk.edu.pl", + "ple2.tu.koszalin.pl", + "planetlab2.ci.pwr.wroc.pl", + "planetlab2.cyfronet.pl", + "plab2.ple.silweb.pl", + "planetlab1.cyfronet.pl", + "plab4.ple.silweb.pl", + "ple2.dmcs.p.lodz.pl", + "planetlab2.pjwstk.edu.pl", + "ple1.dmcs.p.lodz.pl", + "gschembra3.diit.unict.it", + "planetlab1.science.unitn.it", + "planetlab-1.ing.unimo.it", + "gschembra4.diit.unict.it", + "iraplab1.iralab.uni-karlsruhe.de", + "planetlab-1.fokus.fraunhofer.de", + "iraplab2.iralab.uni-karlsruhe.de", + "planet2.zib.de", + "pl2.uni-rostock.de", + "onelab-1.fhi-fokus.de", + "planet2.l3s.uni-hannover.de", + "planetlab1.exp-math.uni-essen.de", + "planetlab-2.fokus.fraunhofer.de", + "planetlab02.tkn.tu-berlin.de", + "planetlab1.informatik.uni-goettingen.de", + "planetlab1.informatik.uni-erlangen.de", + "planetlab2.lkn.ei.tum.de", + "planetlab1.wiwi.hu-berlin.de", + "planet1.l3s.uni-hannover.de", + "planetlab1.informatik.uni-wuerzburg.de", + "host3-plb.loria.fr", + "inriarennes1.irisa.fr", + "inriarennes2.irisa.fr", + "peeramide.irisa.fr", + "planetlab-1.imag.fr", + "planetlab-2.imag.fr", + "ple2.ipv6.lip6.fr", + "planetlab1.u-strasbg.fr", + "planetlab1.ionio.gr", + "planetlab2.ionio.gr", + "stella.planetlab.ntua.gr", + "vicky.planetlab.ntua.gr", + "planetlab1.cs.uoi.gr", + "pl002.ece.upatras.gr", + "planetlab04.cnds.unibe.ch", + "lsirextpc01.epfl.ch", + "planetlab2.csg.uzh.ch", + "planetlab1.csg.uzh.ch", + "planetlab-2.cs.unibas.ch", + "planetlab-1.cs.unibas.ch", + "planetlab4.cs.st-andrews.ac.uk", + "planetlab3.xeno.cl.cam.ac.uk", + "planetlab1.xeno.cl.cam.ac.uk", + "planetlab2.xeno.cl.cam.ac.uk", + "planetlab3.cs.st-andrews.ac.uk", + "planetlab1.aston.ac.uk", + "planetlab1.nrl.eecs.qmul.ac.uk", + "chimay.infonet.fundp.ac.be", + "orval.infonet.fundp.ac.be", + "rochefort.infonet.fundp.ac.be", + ] + + ec = ExperimentController(exp_id = exp_id) + + for host in hostnames: + node = add_node(ec, host, pl_slice) + for i in xrange(20): + app = add_app(ec) + ec.register_connection(app, node) + apps.append(app) + + ec.deploy() + + ec.wait_finished(apps) + + ec.shutdown() diff --git a/src/neco/execution/ec.py b/src/neco/execution/ec.py index b793d0e4..65ef175f 100644 --- a/src/neco/execution/ec.py +++ b/src/neco/execution/ec.py @@ -1,5 +1,6 @@ import logging import os +import random import sys import time import threading @@ -14,6 +15,7 @@ from neco.execution.trace import TraceAttr # TODO: use multiprocessing instead of threading # TODO: Improve speed. Too slow... !! +# TODO: When something fails during deployment NECO leaves scp and ssh processes running behind!! class ECState(object): RUNNING = 1 @@ -265,18 +267,41 @@ class ExperimentController(object): """ self.logger.debug(" ------- DEPLOY START ------ ") + stop = [] + def steps(rm): - rm.deploy() - rm.start_with_conditions() + try: + rm.deploy() + rm.start_with_conditions() + + # Only if the RM has STOP consitions we + # schedule a stop. Otherwise the RM will stop immediately + if rm.conditions.get(ResourceAction.STOP): + rm.stop_with_conditions() + except: + import traceback + err = traceback.format_exc() + + self._logger.error("Error occurred while deploying resources: %s" % err) - # Only if the RM has STOP consitions we - # schedule a stop. Otherwise the RM will stop immediately - if rm.conditions.get(ResourceAction.STOP): - rm.stop_with_conditions() + # stop deployment + stop.append(None) if not group: group = self.resources + # Before starting deployment we disorder the group list with the + # purpose of speeding up the whole deployment process. + # It is likely that the user inserted in the 'group' list closely + # resources resources one after another (e.g. all applications + # connected to the same node can likely appear one after another). + # This can originate a slow down in the deployment since the N + # threads the parallel runner uses to processes tasks may all + # be taken up by the same family of resources waiting for the + # same conditions. + # If we disorder the group list, this problem can be mitigated + random.shuffle(group) + threads = [] for guid in group: rm = self.get_resource(guid) @@ -292,13 +317,22 @@ class ExperimentController(object): thread.setDaemon(True) thread.start() - while list(threads) and not self.finished: + while list(threads) and not self.finished and not stop: thread = threads[0] # Time out after 5 seconds to check EC not terminated - thread.join(5) + thread.join(1) if not thread.is_alive(): threads.remove(thread) + if stop: + # stop the scheduler + self._stop_scheduler() + + if self._thread.is_alive(): + self._thread.join() + + raise RuntimeError, "Error occurred, interrupting deployment " + def release(self, group = None): if not group: group = self.resources @@ -317,16 +351,12 @@ class ExperimentController(object): thread.join(5) if not thread.is_alive(): threads.remove(thread) - - self._state = ECState.TERMINATED def shutdown(self): self.release() - - self._cond.acquire() - self._cond.notify() - self._cond.release() + self._stop_scheduler() + if self._thread.is_alive(): self._thread.join() @@ -399,7 +429,8 @@ class ExperimentController(object): self._logger.error("Error while processing tasks in the EC: %s" % err) self._state = ECState.FAILED - return + finally: + runner.sync() # Mark EC state as terminated if self.ecstate == ECState.RUNNING: @@ -419,14 +450,18 @@ class ExperimentController(object): self._logger.error("Error occurred while executing task: %s" % err) - # Mark the EC as failed - self._state = ECState.FAILED - - # Wake up the EC in case it was sleeping - self._cond.acquire() - self._cond.notify() - self._cond.release() + self._stop_scheduler() # Propage error to the ParallelRunner raise + def _stop_scheduler(self): + # Mark the EC as failed + self._state = ECState.FAILED + + # Wake up the EC in case it was sleeping + self._cond.acquire() + self._cond.notify() + self._cond.release() + + diff --git a/src/neco/execution/resource.py b/src/neco/execution/resource.py index 0eb0d647..762e80e8 100644 --- a/src/neco/execution/resource.py +++ b/src/neco/execution/resource.py @@ -464,12 +464,20 @@ class ResourceManager(object): reschedule = True self.debug("---- RESCHEDULING START ---- state %s " % self.state ) else: - self.debug("---- START CONDITIONS ---- %s" % - self.conditions.get(ResourceAction.START)) + start_conditions = self.conditions.get(ResourceAction.START, []) + + self.debug("---- START CONDITIONS ---- %s" % start_conditions) # Verify all start conditions are met - start_conditions = self.conditions.get(ResourceAction.START, []) for (group, state, time) in start_conditions: + # Uncomment for debug + #unmet = [] + #for guid in group: + # rm = self.ec.get_resource(guid) + # unmet.append((guid, rm._state)) + # + #self.debug("---- WAITED STATES ---- %s" % unmet ) + reschedule, delay = self._needs_reschedule(group, state, time) if reschedule: break diff --git a/src/neco/resources/linux/application.py b/src/neco/resources/linux/application.py index 953c651e..e7a34d58 100644 --- a/src/neco/resources/linux/application.py +++ b/src/neco/resources/linux/application.py @@ -3,11 +3,13 @@ from neco.execution.trace import Trace, TraceAttr from neco.execution.resource import ResourceManager, clsinit, ResourceState from neco.resources.linux.node import LinuxNode from neco.util import sshfuncs +from neco.util.timefuncs import strfnow, strfdiff import logging import os reschedule_delay = "0.5s" +state_check_delay = 1 # TODO: Resolve wildcards in commands!! @@ -96,6 +98,9 @@ class LinuxApplication(ResourceManager): self._ppid = None self._home = "app-%s" % self.guid + # timestamp of last state check of the application + self._last_state_check = strfnow() + self._logger = logging.getLogger("LinuxApplication") def log_message(self, msg): @@ -401,9 +406,11 @@ class LinuxApplication(ResourceManager): raise RuntimeError, msg def stop(self): + command = self.get('command') or '' state = self.state + if state == ResourceState.STARTED: - self.info("Stopping command %s" % command) + self.info("Stopping command '%s'" % command) (out, err), proc = self.node.kill(self.pid, self.ppid) @@ -430,24 +437,31 @@ class LinuxApplication(ResourceManager): @property def state(self): if self._state == ResourceState.STARTED: - (out, err), proc = self.node.check_output(self.app_home, 'stderr') + # To avoid overwhelming the remote hosts and the local processor + # with too many ssh queries, the state is only requested + # every 'state_check_delay' . + if strfdiff(strfnow(), self._last_state_check) > state_check_delay: + # check if execution errors occurred + (out, err), proc = self.node.check_output(self.app_home, 'stderr') - if out or err: - if err.find("No such file or directory") >= 0 : - # The resource is marked as started, but the - # command was not yet executed - return ResourceState.READY + if out or err: + if err.find("No such file or directory") >= 0 : + # The resource is marked as started, but the + # command was not yet executed + return ResourceState.READY - # check if execution errors occurred - msg = " Failed to execute command '%s'" % self.get("command") - self.error(msg, out, err) - self._state = ResourceState.FAILED + msg = " Failed to execute command '%s'" % self.get("command") + self.error(msg, out, err) + self._state = ResourceState.FAILED + + elif self.pid and self.ppid: + status = self.node.status(self.pid, self.ppid) + + if status == sshfuncs.FINISHED: + self._state = ResourceState.FINISHED - elif self.pid and self.ppid: - status = self.node.status(self.pid, self.ppid) - if status == sshfuncs.FINISHED: - self._state = ResourceState.FINISHED + self._last_state_check = strfnow() return self._state diff --git a/src/neco/resources/linux/node.py b/src/neco/resources/linux/node.py index 39107cb5..38c38651 100644 --- a/src/neco/resources/linux/node.py +++ b/src/neco/resources/linux/node.py @@ -15,6 +15,7 @@ import threading # TODO: Verify files and dirs exists already # TODO: Blacklist nodes! # TODO: Unify delays!! +# TODO: Validate outcome of uploads!! reschedule_delay = "0.5s" @@ -130,8 +131,9 @@ class LinuxNode(ResourceManager): def provision(self, filters = None): if not self.is_alive(): self._state = ResourceState.FAILED - self.error("Deploy failed. Unresponsive node") - return + msg = "Deploy failed. Unresponsive node %s" % self.get("hostname") + self.error(msg) + raise RuntimeError, msg if self.get("cleanProcesses"): self.clean_processes() @@ -411,29 +413,27 @@ class LinuxNode(ResourceManager): out = err = "" try: - (out, err), proc = self.execute("echo 'ALIVE'", with_lock = True) + (out, err), proc = self.execute("echo 'ALIVE'", retry = 5, + with_lock = True) except: import traceback trace = traceback.format_exc() msg = "Unresponsive host " - self.warn(msg, out, trace) + self.error(msg, out, trace) return False if out.strip().startswith('ALIVE'): return True else: msg = "Unresponsive host " - self.warn(msg, out, err) + self.error(msg, out, err) return False - # TODO! - #if self.check_bad_host(out,err): - # self.blacklist() - def copy(self, src, dst): if self.localhost: (out, err), proc = execfuncs.lcopy(source, dest, - recursive = True) + recursive = True, + strict_host_checking = False) else: with self._lock: (out, err), proc = sshfuncs.rcopy( @@ -441,7 +441,8 @@ class LinuxNode(ResourceManager): port = self.get("port"), identity = self.get("identity"), server_key = self.get("serverKey"), - recursive = True) + recursive = True, + strict_host_checking = False) return (out, err), proc @@ -455,6 +456,7 @@ class LinuxNode(ResourceManager): retry = 3, err_on_timeout = True, connect_timeout = 30, + strict_host_checking = False, persistent = True, with_lock = False ): @@ -488,7 +490,8 @@ class LinuxNode(ResourceManager): retry = retry, err_on_timeout = err_on_timeout, connect_timeout = connect_timeout, - persistent = persistent + persistent = persistent, + strict_host_checking = strict_host_checking ) else: (out, err), proc = sshfuncs.rexec( diff --git a/src/neco/util/sshfuncs.py b/src/neco/util/sshfuncs.py index 982e23af..13b7b1ed 100644 --- a/src/neco/util/sshfuncs.py +++ b/src/neco/util/sshfuncs.py @@ -12,6 +12,7 @@ import subprocess import time import tempfile +# TODO: Add retries to rcopy!! rcopy is not being retried! logger = logging.getLogger("sshfuncs") @@ -56,6 +57,8 @@ class NOT_STARTED: hostbyname_cache = dict() def gethostbyname(host): + global hostbyname_cache + hostbyname = hostbyname_cache.get(host) if not hostbyname: hostbyname = socket.gethostbyname(host) @@ -191,7 +194,8 @@ def rexec(command, host, user, err_on_timeout = True, connect_timeout = 30, persistent = True, - forward_x11 = False): + forward_x11 = False, + strict_host_checking = True): """ Executes a remote command, returns ((stdout,stderr),process) """ @@ -214,6 +218,10 @@ def rexec(command, host, user, '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),), '-o', 'ControlPersist=60' ]) + if not strict_host_checking: + # Do not check for Host key. Unsafe. + args.extend(['-o', 'StrictHostKeyChecking=no']) + if agent: args.append('-A') @@ -288,7 +296,8 @@ def rcopy(source, dest, agent = True, recursive = False, identity = None, - server_key = None): + server_key = None, + strict_host_checking = True): """ Copies from/to remote sites. @@ -500,6 +509,10 @@ def rcopy(source, dest, tmp_known_hosts = make_server_key_args(server_key, host, port) args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)]) + if not strict_host_checking: + # Do not check for Host key. Unsafe. + args.extend(['-o', 'StrictHostKeyChecking=no']) + if isinstance(source,list): args.extend(source) else: -- 2.43.0