Adding Linux Application scalability tests
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Sat, 11 May 2013 16:43:16 +0000 (18:43 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Sat, 11 May 2013 16:43:16 +0000 (18:43 +0200)
examples/linux/ccnx/simple_topo.py
examples/linux/scalability.py [new file with mode: 0644]
src/neco/execution/ec.py
src/neco/execution/resource.py
src/neco/resources/linux/application.py
src/neco/resources/linux/node.py
src/neco/util/sshfuncs.py

index f57ce20..031a529 100644 (file)
@@ -120,8 +120,9 @@ if __name__ == '__main__':
 
     # Search for available RMs
     populate_factory()
-
-    host1 = 'nepi2.pl.sophia.inria.fr'
+    
+    #host1 = 'nepi2.pl.sophia.inria.fr'
+    host1 = 'planetlab2.u-strasbg.fr'
     host2 = 'roseval.pl.sophia.inria.fr'
 
     ec = ExperimentController(exp_id = exp_id)
diff --git a/examples/linux/scalability.py b/examples/linux/scalability.py
new file mode 100644 (file)
index 0000000..110405a
--- /dev/null
@@ -0,0 +1,146 @@
+#!/usr/bin/env python
+from neco.execution.ec import ExperimentController, ECState 
+from neco.execution.resource import ResourceState, ResourceAction, \
+        populate_factory
+
+from optparse import OptionParser, SUPPRESS_HELP
+
+import os
+import time
+
+def add_node(ec, host, user):
+    node = ec.register_resource("LinuxNode")
+    ec.set(node, "hostname", host)
+    ec.set(node, "username", user)
+    ec.set(node, "cleanHome", True)
+    ec.set(node, "cleanProcesses", True)
+    return node
+
+def add_app(ec):
+    app = ec.register_resource("LinuxApplication")
+    ec.set(app, "command", "sleep 30; echo 'HOLA'")
+    return app
+
+def get_options():
+    slicename = os.environ.get("PL_SLICE")
+
+    usage = "usage: %prog -s <pl-slice>"
+
+    parser = OptionParser(usage=usage)
+    parser.add_option("-s", "--pl-slice", dest="pl_slice", 
+            help="PlanetLab slicename", default=slicename, type="str")
+    parser.add_option("-l", "--exp-id", dest="exp_id", 
+            help="Label to identify experiment", type="str")
+
+    (options, args) = parser.parse_args()
+
+    return (options.pl_slice, options.exp_id)
+
+if __name__ == '__main__':
+    ( pl_slice, exp_id ) = get_options()
+
+    # Search for available RMs
+    populate_factory()
+    
+    apps = []
+  
+    hostnames = [
+             "planetlab-2.research.netlab.hut.fi",
+             "planetlab2.willab.fi",
+             "planetlab3.hiit.fi",
+             "planetlab4.hiit.fi",
+             "planetlab1.willab.fi",
+             "planetlab1.s3.kth.se",
+             "itchy.comlab.bth.se",
+             "planetlab-1.ida.liu.se",
+             "planetlab2.s3.kth.se",
+             "planetlab1.sics.se",
+             "planetlab1.tlm.unavarra.es",
+             "planetlab2.uc3m.es",
+             "planetlab1.uc3m.es",
+             "planetlab2.um.es",
+             "planet1.servers.ua.pt",
+             "planetlab2.fct.ualg.pt",
+             "planetlab-1.tagus.ist.utl.pt",
+             "planetlab-2.tagus.ist.utl.pt",
+             "planetlab-um00.di.uminho.pt",
+             "planet2.servers.ua.pt",
+             "planetlab1.mini.pw.edu.pl",
+             "roti.mimuw.edu.pl",
+             "planetlab1.ci.pwr.wroc.pl",
+             "planetlab1.pjwstk.edu.pl",
+             "ple2.tu.koszalin.pl",
+             "planetlab2.ci.pwr.wroc.pl",
+             "planetlab2.cyfronet.pl",
+             "plab2.ple.silweb.pl",
+             "planetlab1.cyfronet.pl",
+             "plab4.ple.silweb.pl",
+             "ple2.dmcs.p.lodz.pl",
+             "planetlab2.pjwstk.edu.pl",
+             "ple1.dmcs.p.lodz.pl",
+             "gschembra3.diit.unict.it",
+             "planetlab1.science.unitn.it",
+             "planetlab-1.ing.unimo.it",
+             "gschembra4.diit.unict.it",
+             "iraplab1.iralab.uni-karlsruhe.de",
+             "planetlab-1.fokus.fraunhofer.de",
+             "iraplab2.iralab.uni-karlsruhe.de",
+             "planet2.zib.de",
+             "pl2.uni-rostock.de",
+             "onelab-1.fhi-fokus.de",
+             "planet2.l3s.uni-hannover.de",
+             "planetlab1.exp-math.uni-essen.de",
+             "planetlab-2.fokus.fraunhofer.de",
+             "planetlab02.tkn.tu-berlin.de",
+             "planetlab1.informatik.uni-goettingen.de",
+             "planetlab1.informatik.uni-erlangen.de",
+             "planetlab2.lkn.ei.tum.de",
+             "planetlab1.wiwi.hu-berlin.de",
+             "planet1.l3s.uni-hannover.de",
+             "planetlab1.informatik.uni-wuerzburg.de",
+             "host3-plb.loria.fr",
+             "inriarennes1.irisa.fr",
+             "inriarennes2.irisa.fr",
+             "peeramide.irisa.fr",
+             "planetlab-1.imag.fr",
+             "planetlab-2.imag.fr",
+             "ple2.ipv6.lip6.fr",
+             "planetlab1.u-strasbg.fr",
+             "planetlab1.ionio.gr",
+             "planetlab2.ionio.gr",
+             "stella.planetlab.ntua.gr",
+             "vicky.planetlab.ntua.gr",
+             "planetlab1.cs.uoi.gr",
+             "pl002.ece.upatras.gr",
+             "planetlab04.cnds.unibe.ch",
+             "lsirextpc01.epfl.ch",
+             "planetlab2.csg.uzh.ch",
+             "planetlab1.csg.uzh.ch",
+             "planetlab-2.cs.unibas.ch",
+             "planetlab-1.cs.unibas.ch",
+             "planetlab4.cs.st-andrews.ac.uk",
+             "planetlab3.xeno.cl.cam.ac.uk",
+             "planetlab1.xeno.cl.cam.ac.uk",
+             "planetlab2.xeno.cl.cam.ac.uk",
+             "planetlab3.cs.st-andrews.ac.uk",
+             "planetlab1.aston.ac.uk",
+             "planetlab1.nrl.eecs.qmul.ac.uk",
+             "chimay.infonet.fundp.ac.be",
+             "orval.infonet.fundp.ac.be",
+             "rochefort.infonet.fundp.ac.be",
+            ]
+    ec = ExperimentController(exp_id = exp_id)
+
+    for host in hostnames:
+        node = add_node(ec, host, pl_slice)
+        for i in xrange(20):
+            app = add_app(ec)
+            ec.register_connection(app, node)
+            apps.append(app)
+
+    ec.deploy()
+
+    ec.wait_finished(apps)
+
+    ec.shutdown()
index b793d0e..65ef175 100644 (file)
@@ -1,5 +1,6 @@
 import logging
 import os
+import random
 import sys
 import time
 import threading
@@ -14,6 +15,7 @@ from neco.execution.trace import TraceAttr
 
 # TODO: use multiprocessing instead of threading
 # TODO: Improve speed. Too slow... !!
+# TODO: When something fails during deployment NECO leaves scp and ssh processes running behind!!
 
 class ECState(object):
     RUNNING = 1
@@ -265,18 +267,41 @@ class ExperimentController(object):
         """
         self.logger.debug(" ------- DEPLOY START ------ ")
 
+        stop = []
+
         def steps(rm):
-            rm.deploy()
-            rm.start_with_conditions()
+            try:
+                rm.deploy()
+                rm.start_with_conditions()
+
+                # Only if the RM has STOP consitions we
+                # schedule a stop. Otherwise the RM will stop immediately
+                if rm.conditions.get(ResourceAction.STOP):
+                    rm.stop_with_conditions()
+            except:
+                import traceback
+                err = traceback.format_exc()
+                
+                self._logger.error("Error occurred while deploying resources: %s" % err)
 
-            # Only if the RM has STOP consitions we
-            # schedule a stop. Otherwise the RM will stop immediately
-            if rm.conditions.get(ResourceAction.STOP):
-                rm.stop_with_conditions()
+                # stop deployment
+                stop.append(None)
 
         if not group:
             group = self.resources
 
+        # Before starting deployment we disorder the group list with the
+        # purpose of speeding up the whole deployment process.
+        # It is likely that the user inserted in the 'group' list closely
+        # resources resources one after another (e.g. all applications
+        # connected to the same node can likely appear one after another).
+        # This can originate a slow down in the deployment since the N 
+        # threads the parallel runner uses to processes tasks may all
+        # be taken up by the same family of resources waiting for the 
+        # same conditions. 
+        # If we disorder the group list, this problem can be mitigated
+        random.shuffle(group)
+
         threads = []
         for guid in group:
             rm = self.get_resource(guid)
@@ -292,13 +317,22 @@ class ExperimentController(object):
             thread.setDaemon(True)
             thread.start()
 
-        while list(threads) and not self.finished:
+        while list(threads) and not self.finished and not stop:
             thread = threads[0]
             # Time out after 5 seconds to check EC not terminated
-            thread.join(5)
+            thread.join(1)
             if not thread.is_alive():
                 threads.remove(thread)
 
+        if stop:
+            # stop the scheduler
+            self._stop_scheduler()
+
+            if self._thread.is_alive():
+               self._thread.join()
+
+            raise RuntimeError, "Error occurred, interrupting deployment " 
+
     def release(self, group = None):
         if not group:
             group = self.resources
@@ -317,16 +351,12 @@ class ExperimentController(object):
             thread.join(5)
             if not thread.is_alive():
                 threads.remove(thread)
-        
-        self._state = ECState.TERMINATED
 
     def shutdown(self):
         self.release()
-        
-        self._cond.acquire()
-        self._cond.notify()
-        self._cond.release()
 
+        self._stop_scheduler()
+        
         if self._thread.is_alive():
            self._thread.join()
 
@@ -399,7 +429,8 @@ class ExperimentController(object):
             self._logger.error("Error while processing tasks in the EC: %s" % err)
 
             self._state = ECState.FAILED
-            return
+        finally:
+            runner.sync()
    
         # Mark EC state as terminated
         if self.ecstate == ECState.RUNNING:
@@ -419,14 +450,18 @@ class ExperimentController(object):
             
             self._logger.error("Error occurred while executing task: %s" % err)
 
-            # Mark the EC as failed
-            self._state = ECState.FAILED
-
-            # Wake up the EC in case it was sleeping
-            self._cond.acquire()
-            self._cond.notify()
-            self._cond.release()
+            self._stop_scheduler()
 
             # Propage error to the ParallelRunner
             raise
 
+    def _stop_scheduler(self):
+        # Mark the EC as failed
+        self._state = ECState.FAILED
+
+        # Wake up the EC in case it was sleeping
+        self._cond.acquire()
+        self._cond.notify()
+        self._cond.release()
+
+
index 0eb0d64..762e80e 100644 (file)
@@ -464,12 +464,20 @@ class ResourceManager(object):
             reschedule = True
             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
         else:
-            self.debug("---- START CONDITIONS ---- %s" % 
-                    self.conditions.get(ResourceAction.START))
+            start_conditions = self.conditions.get(ResourceAction.START, [])
+            
+            self.debug("---- START CONDITIONS ---- %s" % start_conditions) 
             
             # Verify all start conditions are met
-            start_conditions = self.conditions.get(ResourceAction.START, [])
             for (group, state, time) in start_conditions:
+                # Uncomment for debug
+                #unmet = []
+                #for guid in group:
+                #    rm = self.ec.get_resource(guid)
+                #    unmet.append((guid, rm._state))
+                #
+                #self.debug("---- WAITED STATES ---- %s" % unmet )
+
                 reschedule, delay = self._needs_reschedule(group, state, time)
                 if reschedule:
                     break
index 953c651..e7a34d5 100644 (file)
@@ -3,11 +3,13 @@ from neco.execution.trace import Trace, TraceAttr
 from neco.execution.resource import ResourceManager, clsinit, ResourceState
 from neco.resources.linux.node import LinuxNode
 from neco.util import sshfuncs 
+from neco.util.timefuncs import strfnow, strfdiff
 
 import logging
 import os
 
 reschedule_delay = "0.5s"
+state_check_delay = 1
 
 # TODO: Resolve wildcards in commands!! 
 
@@ -96,6 +98,9 @@ class LinuxApplication(ResourceManager):
         self._ppid = None
         self._home = "app-%s" % self.guid
 
+        # timestamp of last state check of the application
+        self._last_state_check = strfnow()
+
         self._logger = logging.getLogger("LinuxApplication")
     
     def log_message(self, msg):
@@ -401,9 +406,11 @@ class LinuxApplication(ResourceManager):
             raise RuntimeError, msg
 
     def stop(self):
+        command = self.get('command') or ''
         state = self.state
+        
         if state == ResourceState.STARTED:
-            self.info("Stopping command %s" % command)
+            self.info("Stopping command '%s'" % command)
 
             (out, err), proc = self.node.kill(self.pid, self.ppid)
 
@@ -430,24 +437,31 @@ class LinuxApplication(ResourceManager):
     @property
     def state(self):
         if self._state == ResourceState.STARTED:
-            (out, err), proc = self.node.check_output(self.app_home, 'stderr')
+            # To avoid overwhelming the remote hosts and the local processor
+            # with too many ssh queries, the state is only requested
+            # every 'state_check_delay' .
+            if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
+                # check if execution errors occurred
+                (out, err), proc = self.node.check_output(self.app_home, 'stderr')
 
-            if out or err:
-                if err.find("No such file or directory") >= 0 :
-                    # The resource is marked as started, but the
-                    # command was not yet executed
-                    return ResourceState.READY
+                if out or err:
+                    if err.find("No such file or directory") >= 0 :
+                        # The resource is marked as started, but the
+                        # command was not yet executed
+                        return ResourceState.READY
 
-                # check if execution errors occurred
-                msg = " Failed to execute command '%s'" % self.get("command")
-                self.error(msg, out, err)
-                self._state = ResourceState.FAILED
+                    msg = " Failed to execute command '%s'" % self.get("command")
+                    self.error(msg, out, err)
+                    self._state = ResourceState.FAILED
+
+                elif self.pid and self.ppid:
+                    status = self.node.status(self.pid, self.ppid)
+
+                    if status == sshfuncs.FINISHED:
+                        self._state = ResourceState.FINISHED
 
-            elif self.pid and self.ppid:
-                status = self.node.status(self.pid, self.ppid)
 
-                if status == sshfuncs.FINISHED:
-                    self._state = ResourceState.FINISHED
+                self._last_state_check = strfnow()
 
         return self._state
 
index 39107cb..38c3865 100644 (file)
@@ -15,6 +15,7 @@ import threading
 # TODO: Verify files and dirs exists already
 # TODO: Blacklist nodes!
 # TODO: Unify delays!!
+# TODO: Validate outcome of uploads!! 
 
 reschedule_delay = "0.5s"
 
@@ -130,8 +131,9 @@ class LinuxNode(ResourceManager):
     def provision(self, filters = None):
         if not self.is_alive():
             self._state = ResourceState.FAILED
-            self.error("Deploy failed. Unresponsive node")
-            return
+            msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
+            self.error(msg)
+            raise RuntimeError, msg
 
         if self.get("cleanProcesses"):
             self.clean_processes()
@@ -411,29 +413,27 @@ class LinuxNode(ResourceManager):
 
         out = err = ""
         try:
-            (out, err), proc = self.execute("echo 'ALIVE'", with_lock = True)
+            (out, err), proc = self.execute("echo 'ALIVE'", retry = 5, 
+                    with_lock = True)
         except:
             import traceback
             trace = traceback.format_exc()
             msg = "Unresponsive host "
-            self.warn(msg, out, trace)
+            self.error(msg, out, trace)
             return False
 
         if out.strip().startswith('ALIVE'):
             return True
         else:
             msg = "Unresponsive host "
-            self.warn(msg, out, err)
+            self.error(msg, out, err)
             return False
 
-            # TODO!
-            #if self.check_bad_host(out,err):
-            #    self.blacklist()
-
     def copy(self, src, dst):
         if self.localhost:
             (out, err), proc =  execfuncs.lcopy(source, dest, 
-                    recursive = True)
+                    recursive = True,
+                    strict_host_checking = False)
         else:
             with self._lock:
                 (out, err), proc = sshfuncs.rcopy(
@@ -441,7 +441,8 @@ class LinuxNode(ResourceManager):
                     port = self.get("port"),
                     identity = self.get("identity"),
                     server_key = self.get("serverKey"),
-                    recursive = True)
+                    recursive = True,
+                    strict_host_checking = False)
 
         return (out, err), proc
 
@@ -455,6 +456,7 @@ class LinuxNode(ResourceManager):
             retry = 3,
             err_on_timeout = True,
             connect_timeout = 30,
+            strict_host_checking = False,
             persistent = True,
             with_lock = False
             ):
@@ -488,7 +490,8 @@ class LinuxNode(ResourceManager):
                         retry = retry,
                         err_on_timeout = err_on_timeout,
                         connect_timeout = connect_timeout,
-                        persistent = persistent
+                        persistent = persistent,
+                        strict_host_checking = strict_host_checking
                         )
             else:
                 (out, err), proc = sshfuncs.rexec(
index 982e23a..13b7b1e 100644 (file)
@@ -12,6 +12,7 @@ import subprocess
 import time
 import tempfile
 
+# TODO: Add retries to rcopy!! rcopy is not being retried!
 
 logger = logging.getLogger("sshfuncs")
 
@@ -56,6 +57,8 @@ class NOT_STARTED:
 hostbyname_cache = dict()
 
 def gethostbyname(host):
+    global hostbyname_cache
+    
     hostbyname = hostbyname_cache.get(host)
     if not hostbyname:
         hostbyname = socket.gethostbyname(host)
@@ -191,7 +194,8 @@ def rexec(command, host, user,
         err_on_timeout = True,
         connect_timeout = 30,
         persistent = True,
-        forward_x11 = False):
+        forward_x11 = False,
+        strict_host_checking = True):
     """
     Executes a remote command, returns ((stdout,stderr),process)
     """
@@ -214,6 +218,10 @@ def rexec(command, host, user,
             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
             '-o', 'ControlPersist=60' ])
 
+    if not strict_host_checking:
+        # Do not check for Host key. Unsafe.
+        args.extend(['-o', 'StrictHostKeyChecking=no'])
+
     if agent:
         args.append('-A')
 
@@ -288,7 +296,8 @@ def rcopy(source, dest,
         agent = True, 
         recursive = False,
         identity = None,
-        server_key = None):
+        server_key = None,
+        strict_host_checking = True):
     """
     Copies from/to remote sites.
     
@@ -500,6 +509,10 @@ def rcopy(source, dest,
             tmp_known_hosts = make_server_key_args(server_key, host, port)
             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
 
+        if not strict_host_checking:
+            # Do not check for Host key. Unsafe.
+            args.extend(['-o', 'StrictHostKeyChecking=no'])
+
         if isinstance(source,list):
             args.extend(source)
         else: