Adding Linux Application scalability tests
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Sat, 11 May 2013 16:43:16 +0000 (18:43 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Sat, 11 May 2013 16:43:16 +0000 (18:43 +0200)
examples/linux/ccnx/simple_topo.py
examples/linux/scalability.py [new file with mode: 0644]
src/neco/execution/ec.py
src/neco/execution/resource.py
src/neco/resources/linux/application.py
src/neco/resources/linux/node.py
src/neco/util/sshfuncs.py

index f57ce20..031a529 100644 (file)
@@ -120,8 +120,9 @@ if __name__ == '__main__':
 
     # Search for available RMs
     populate_factory()
 
     # Search for available RMs
     populate_factory()
-
-    host1 = 'nepi2.pl.sophia.inria.fr'
+    
+    #host1 = 'nepi2.pl.sophia.inria.fr'
+    host1 = 'planetlab2.u-strasbg.fr'
     host2 = 'roseval.pl.sophia.inria.fr'
 
     ec = ExperimentController(exp_id = exp_id)
     host2 = 'roseval.pl.sophia.inria.fr'
 
     ec = ExperimentController(exp_id = exp_id)
diff --git a/examples/linux/scalability.py b/examples/linux/scalability.py
new file mode 100644 (file)
index 0000000..110405a
--- /dev/null
@@ -0,0 +1,146 @@
+#!/usr/bin/env python
+from neco.execution.ec import ExperimentController, ECState 
+from neco.execution.resource import ResourceState, ResourceAction, \
+        populate_factory
+
+from optparse import OptionParser, SUPPRESS_HELP
+
+import os
+import time
+
+def add_node(ec, host, user):
+    node = ec.register_resource("LinuxNode")
+    ec.set(node, "hostname", host)
+    ec.set(node, "username", user)
+    ec.set(node, "cleanHome", True)
+    ec.set(node, "cleanProcesses", True)
+    return node
+
+def add_app(ec):
+    app = ec.register_resource("LinuxApplication")
+    ec.set(app, "command", "sleep 30; echo 'HOLA'")
+    return app
+
+def get_options():
+    slicename = os.environ.get("PL_SLICE")
+
+    usage = "usage: %prog -s <pl-slice>"
+
+    parser = OptionParser(usage=usage)
+    parser.add_option("-s", "--pl-slice", dest="pl_slice", 
+            help="PlanetLab slicename", default=slicename, type="str")
+    parser.add_option("-l", "--exp-id", dest="exp_id", 
+            help="Label to identify experiment", type="str")
+
+    (options, args) = parser.parse_args()
+
+    return (options.pl_slice, options.exp_id)
+
+if __name__ == '__main__':
+    ( pl_slice, exp_id ) = get_options()
+
+    # Search for available RMs
+    populate_factory()
+    
+    apps = []
+  
+    hostnames = [
+             "planetlab-2.research.netlab.hut.fi",
+             "planetlab2.willab.fi",
+             "planetlab3.hiit.fi",
+             "planetlab4.hiit.fi",
+             "planetlab1.willab.fi",
+             "planetlab1.s3.kth.se",
+             "itchy.comlab.bth.se",
+             "planetlab-1.ida.liu.se",
+             "planetlab2.s3.kth.se",
+             "planetlab1.sics.se",
+             "planetlab1.tlm.unavarra.es",
+             "planetlab2.uc3m.es",
+             "planetlab1.uc3m.es",
+             "planetlab2.um.es",
+             "planet1.servers.ua.pt",
+             "planetlab2.fct.ualg.pt",
+             "planetlab-1.tagus.ist.utl.pt",
+             "planetlab-2.tagus.ist.utl.pt",
+             "planetlab-um00.di.uminho.pt",
+             "planet2.servers.ua.pt",
+             "planetlab1.mini.pw.edu.pl",
+             "roti.mimuw.edu.pl",
+             "planetlab1.ci.pwr.wroc.pl",
+             "planetlab1.pjwstk.edu.pl",
+             "ple2.tu.koszalin.pl",
+             "planetlab2.ci.pwr.wroc.pl",
+             "planetlab2.cyfronet.pl",
+             "plab2.ple.silweb.pl",
+             "planetlab1.cyfronet.pl",
+             "plab4.ple.silweb.pl",
+             "ple2.dmcs.p.lodz.pl",
+             "planetlab2.pjwstk.edu.pl",
+             "ple1.dmcs.p.lodz.pl",
+             "gschembra3.diit.unict.it",
+             "planetlab1.science.unitn.it",
+             "planetlab-1.ing.unimo.it",
+             "gschembra4.diit.unict.it",
+             "iraplab1.iralab.uni-karlsruhe.de",
+             "planetlab-1.fokus.fraunhofer.de",
+             "iraplab2.iralab.uni-karlsruhe.de",
+             "planet2.zib.de",
+             "pl2.uni-rostock.de",
+             "onelab-1.fhi-fokus.de",
+             "planet2.l3s.uni-hannover.de",
+             "planetlab1.exp-math.uni-essen.de",
+             "planetlab-2.fokus.fraunhofer.de",
+             "planetlab02.tkn.tu-berlin.de",
+             "planetlab1.informatik.uni-goettingen.de",
+             "planetlab1.informatik.uni-erlangen.de",
+             "planetlab2.lkn.ei.tum.de",
+             "planetlab1.wiwi.hu-berlin.de",
+             "planet1.l3s.uni-hannover.de",
+             "planetlab1.informatik.uni-wuerzburg.de",
+             "host3-plb.loria.fr",
+             "inriarennes1.irisa.fr",
+             "inriarennes2.irisa.fr",
+             "peeramide.irisa.fr",
+             "planetlab-1.imag.fr",
+             "planetlab-2.imag.fr",
+             "ple2.ipv6.lip6.fr",
+             "planetlab1.u-strasbg.fr",
+             "planetlab1.ionio.gr",
+             "planetlab2.ionio.gr",
+             "stella.planetlab.ntua.gr",
+             "vicky.planetlab.ntua.gr",
+             "planetlab1.cs.uoi.gr",
+             "pl002.ece.upatras.gr",
+             "planetlab04.cnds.unibe.ch",
+             "lsirextpc01.epfl.ch",
+             "planetlab2.csg.uzh.ch",
+             "planetlab1.csg.uzh.ch",
+             "planetlab-2.cs.unibas.ch",
+             "planetlab-1.cs.unibas.ch",
+             "planetlab4.cs.st-andrews.ac.uk",
+             "planetlab3.xeno.cl.cam.ac.uk",
+             "planetlab1.xeno.cl.cam.ac.uk",
+             "planetlab2.xeno.cl.cam.ac.uk",
+             "planetlab3.cs.st-andrews.ac.uk",
+             "planetlab1.aston.ac.uk",
+             "planetlab1.nrl.eecs.qmul.ac.uk",
+             "chimay.infonet.fundp.ac.be",
+             "orval.infonet.fundp.ac.be",
+             "rochefort.infonet.fundp.ac.be",
+            ]
+    ec = ExperimentController(exp_id = exp_id)
+
+    for host in hostnames:
+        node = add_node(ec, host, pl_slice)
+        for i in xrange(20):
+            app = add_app(ec)
+            ec.register_connection(app, node)
+            apps.append(app)
+
+    ec.deploy()
+
+    ec.wait_finished(apps)
+
+    ec.shutdown()
index b793d0e..65ef175 100644 (file)
@@ -1,5 +1,6 @@
 import logging
 import os
 import logging
 import os
+import random
 import sys
 import time
 import threading
 import sys
 import time
 import threading
@@ -14,6 +15,7 @@ from neco.execution.trace import TraceAttr
 
 # TODO: use multiprocessing instead of threading
 # TODO: Improve speed. Too slow... !!
 
 # TODO: use multiprocessing instead of threading
 # TODO: Improve speed. Too slow... !!
+# TODO: When something fails during deployment NECO leaves scp and ssh processes running behind!!
 
 class ECState(object):
     RUNNING = 1
 
 class ECState(object):
     RUNNING = 1
@@ -265,18 +267,41 @@ class ExperimentController(object):
         """
         self.logger.debug(" ------- DEPLOY START ------ ")
 
         """
         self.logger.debug(" ------- DEPLOY START ------ ")
 
+        stop = []
+
         def steps(rm):
         def steps(rm):
-            rm.deploy()
-            rm.start_with_conditions()
+            try:
+                rm.deploy()
+                rm.start_with_conditions()
+
+                # Only if the RM has STOP consitions we
+                # schedule a stop. Otherwise the RM will stop immediately
+                if rm.conditions.get(ResourceAction.STOP):
+                    rm.stop_with_conditions()
+            except:
+                import traceback
+                err = traceback.format_exc()
+                
+                self._logger.error("Error occurred while deploying resources: %s" % err)
 
 
-            # Only if the RM has STOP consitions we
-            # schedule a stop. Otherwise the RM will stop immediately
-            if rm.conditions.get(ResourceAction.STOP):
-                rm.stop_with_conditions()
+                # stop deployment
+                stop.append(None)
 
         if not group:
             group = self.resources
 
 
         if not group:
             group = self.resources
 
+        # Before starting deployment we disorder the group list with the
+        # purpose of speeding up the whole deployment process.
+        # It is likely that the user inserted in the 'group' list closely
+        # resources resources one after another (e.g. all applications
+        # connected to the same node can likely appear one after another).
+        # This can originate a slow down in the deployment since the N 
+        # threads the parallel runner uses to processes tasks may all
+        # be taken up by the same family of resources waiting for the 
+        # same conditions. 
+        # If we disorder the group list, this problem can be mitigated
+        random.shuffle(group)
+
         threads = []
         for guid in group:
             rm = self.get_resource(guid)
         threads = []
         for guid in group:
             rm = self.get_resource(guid)
@@ -292,13 +317,22 @@ class ExperimentController(object):
             thread.setDaemon(True)
             thread.start()
 
             thread.setDaemon(True)
             thread.start()
 
-        while list(threads) and not self.finished:
+        while list(threads) and not self.finished and not stop:
             thread = threads[0]
             # Time out after 5 seconds to check EC not terminated
             thread = threads[0]
             # Time out after 5 seconds to check EC not terminated
-            thread.join(5)
+            thread.join(1)
             if not thread.is_alive():
                 threads.remove(thread)
 
             if not thread.is_alive():
                 threads.remove(thread)
 
+        if stop:
+            # stop the scheduler
+            self._stop_scheduler()
+
+            if self._thread.is_alive():
+               self._thread.join()
+
+            raise RuntimeError, "Error occurred, interrupting deployment " 
+
     def release(self, group = None):
         if not group:
             group = self.resources
     def release(self, group = None):
         if not group:
             group = self.resources
@@ -317,16 +351,12 @@ class ExperimentController(object):
             thread.join(5)
             if not thread.is_alive():
                 threads.remove(thread)
             thread.join(5)
             if not thread.is_alive():
                 threads.remove(thread)
-        
-        self._state = ECState.TERMINATED
 
     def shutdown(self):
         self.release()
 
     def shutdown(self):
         self.release()
-        
-        self._cond.acquire()
-        self._cond.notify()
-        self._cond.release()
 
 
+        self._stop_scheduler()
+        
         if self._thread.is_alive():
            self._thread.join()
 
         if self._thread.is_alive():
            self._thread.join()
 
@@ -399,7 +429,8 @@ class ExperimentController(object):
             self._logger.error("Error while processing tasks in the EC: %s" % err)
 
             self._state = ECState.FAILED
             self._logger.error("Error while processing tasks in the EC: %s" % err)
 
             self._state = ECState.FAILED
-            return
+        finally:
+            runner.sync()
    
         # Mark EC state as terminated
         if self.ecstate == ECState.RUNNING:
    
         # Mark EC state as terminated
         if self.ecstate == ECState.RUNNING:
@@ -419,14 +450,18 @@ class ExperimentController(object):
             
             self._logger.error("Error occurred while executing task: %s" % err)
 
             
             self._logger.error("Error occurred while executing task: %s" % err)
 
-            # Mark the EC as failed
-            self._state = ECState.FAILED
-
-            # Wake up the EC in case it was sleeping
-            self._cond.acquire()
-            self._cond.notify()
-            self._cond.release()
+            self._stop_scheduler()
 
             # Propage error to the ParallelRunner
             raise
 
 
             # Propage error to the ParallelRunner
             raise
 
+    def _stop_scheduler(self):
+        # Mark the EC as failed
+        self._state = ECState.FAILED
+
+        # Wake up the EC in case it was sleeping
+        self._cond.acquire()
+        self._cond.notify()
+        self._cond.release()
+
+
index 0eb0d64..762e80e 100644 (file)
@@ -464,12 +464,20 @@ class ResourceManager(object):
             reschedule = True
             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
         else:
             reschedule = True
             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
         else:
-            self.debug("---- START CONDITIONS ---- %s" % 
-                    self.conditions.get(ResourceAction.START))
+            start_conditions = self.conditions.get(ResourceAction.START, [])
+            
+            self.debug("---- START CONDITIONS ---- %s" % start_conditions) 
             
             # Verify all start conditions are met
             
             # Verify all start conditions are met
-            start_conditions = self.conditions.get(ResourceAction.START, [])
             for (group, state, time) in start_conditions:
             for (group, state, time) in start_conditions:
+                # Uncomment for debug
+                #unmet = []
+                #for guid in group:
+                #    rm = self.ec.get_resource(guid)
+                #    unmet.append((guid, rm._state))
+                #
+                #self.debug("---- WAITED STATES ---- %s" % unmet )
+
                 reschedule, delay = self._needs_reschedule(group, state, time)
                 if reschedule:
                     break
                 reschedule, delay = self._needs_reschedule(group, state, time)
                 if reschedule:
                     break
index 953c651..e7a34d5 100644 (file)
@@ -3,11 +3,13 @@ from neco.execution.trace import Trace, TraceAttr
 from neco.execution.resource import ResourceManager, clsinit, ResourceState
 from neco.resources.linux.node import LinuxNode
 from neco.util import sshfuncs 
 from neco.execution.resource import ResourceManager, clsinit, ResourceState
 from neco.resources.linux.node import LinuxNode
 from neco.util import sshfuncs 
+from neco.util.timefuncs import strfnow, strfdiff
 
 import logging
 import os
 
 reschedule_delay = "0.5s"
 
 import logging
 import os
 
 reschedule_delay = "0.5s"
+state_check_delay = 1
 
 # TODO: Resolve wildcards in commands!! 
 
 
 # TODO: Resolve wildcards in commands!! 
 
@@ -96,6 +98,9 @@ class LinuxApplication(ResourceManager):
         self._ppid = None
         self._home = "app-%s" % self.guid
 
         self._ppid = None
         self._home = "app-%s" % self.guid
 
+        # timestamp of last state check of the application
+        self._last_state_check = strfnow()
+
         self._logger = logging.getLogger("LinuxApplication")
     
     def log_message(self, msg):
         self._logger = logging.getLogger("LinuxApplication")
     
     def log_message(self, msg):
@@ -401,9 +406,11 @@ class LinuxApplication(ResourceManager):
             raise RuntimeError, msg
 
     def stop(self):
             raise RuntimeError, msg
 
     def stop(self):
+        command = self.get('command') or ''
         state = self.state
         state = self.state
+        
         if state == ResourceState.STARTED:
         if state == ResourceState.STARTED:
-            self.info("Stopping command %s" % command)
+            self.info("Stopping command '%s'" % command)
 
             (out, err), proc = self.node.kill(self.pid, self.ppid)
 
 
             (out, err), proc = self.node.kill(self.pid, self.ppid)
 
@@ -430,24 +437,31 @@ class LinuxApplication(ResourceManager):
     @property
     def state(self):
         if self._state == ResourceState.STARTED:
     @property
     def state(self):
         if self._state == ResourceState.STARTED:
-            (out, err), proc = self.node.check_output(self.app_home, 'stderr')
+            # To avoid overwhelming the remote hosts and the local processor
+            # with too many ssh queries, the state is only requested
+            # every 'state_check_delay' .
+            if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
+                # check if execution errors occurred
+                (out, err), proc = self.node.check_output(self.app_home, 'stderr')
 
 
-            if out or err:
-                if err.find("No such file or directory") >= 0 :
-                    # The resource is marked as started, but the
-                    # command was not yet executed
-                    return ResourceState.READY
+                if out or err:
+                    if err.find("No such file or directory") >= 0 :
+                        # The resource is marked as started, but the
+                        # command was not yet executed
+                        return ResourceState.READY
 
 
-                # check if execution errors occurred
-                msg = " Failed to execute command '%s'" % self.get("command")
-                self.error(msg, out, err)
-                self._state = ResourceState.FAILED
+                    msg = " Failed to execute command '%s'" % self.get("command")
+                    self.error(msg, out, err)
+                    self._state = ResourceState.FAILED
+
+                elif self.pid and self.ppid:
+                    status = self.node.status(self.pid, self.ppid)
+
+                    if status == sshfuncs.FINISHED:
+                        self._state = ResourceState.FINISHED
 
 
-            elif self.pid and self.ppid:
-                status = self.node.status(self.pid, self.ppid)
 
 
-                if status == sshfuncs.FINISHED:
-                    self._state = ResourceState.FINISHED
+                self._last_state_check = strfnow()
 
         return self._state
 
 
         return self._state
 
index 39107cb..38c3865 100644 (file)
@@ -15,6 +15,7 @@ import threading
 # TODO: Verify files and dirs exists already
 # TODO: Blacklist nodes!
 # TODO: Unify delays!!
 # TODO: Verify files and dirs exists already
 # TODO: Blacklist nodes!
 # TODO: Unify delays!!
+# TODO: Validate outcome of uploads!! 
 
 reschedule_delay = "0.5s"
 
 
 reschedule_delay = "0.5s"
 
@@ -130,8 +131,9 @@ class LinuxNode(ResourceManager):
     def provision(self, filters = None):
         if not self.is_alive():
             self._state = ResourceState.FAILED
     def provision(self, filters = None):
         if not self.is_alive():
             self._state = ResourceState.FAILED
-            self.error("Deploy failed. Unresponsive node")
-            return
+            msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
+            self.error(msg)
+            raise RuntimeError, msg
 
         if self.get("cleanProcesses"):
             self.clean_processes()
 
         if self.get("cleanProcesses"):
             self.clean_processes()
@@ -411,29 +413,27 @@ class LinuxNode(ResourceManager):
 
         out = err = ""
         try:
 
         out = err = ""
         try:
-            (out, err), proc = self.execute("echo 'ALIVE'", with_lock = True)
+            (out, err), proc = self.execute("echo 'ALIVE'", retry = 5, 
+                    with_lock = True)
         except:
             import traceback
             trace = traceback.format_exc()
             msg = "Unresponsive host "
         except:
             import traceback
             trace = traceback.format_exc()
             msg = "Unresponsive host "
-            self.warn(msg, out, trace)
+            self.error(msg, out, trace)
             return False
 
         if out.strip().startswith('ALIVE'):
             return True
         else:
             msg = "Unresponsive host "
             return False
 
         if out.strip().startswith('ALIVE'):
             return True
         else:
             msg = "Unresponsive host "
-            self.warn(msg, out, err)
+            self.error(msg, out, err)
             return False
 
             return False
 
-            # TODO!
-            #if self.check_bad_host(out,err):
-            #    self.blacklist()
-
     def copy(self, src, dst):
         if self.localhost:
             (out, err), proc =  execfuncs.lcopy(source, dest, 
     def copy(self, src, dst):
         if self.localhost:
             (out, err), proc =  execfuncs.lcopy(source, dest, 
-                    recursive = True)
+                    recursive = True,
+                    strict_host_checking = False)
         else:
             with self._lock:
                 (out, err), proc = sshfuncs.rcopy(
         else:
             with self._lock:
                 (out, err), proc = sshfuncs.rcopy(
@@ -441,7 +441,8 @@ class LinuxNode(ResourceManager):
                     port = self.get("port"),
                     identity = self.get("identity"),
                     server_key = self.get("serverKey"),
                     port = self.get("port"),
                     identity = self.get("identity"),
                     server_key = self.get("serverKey"),
-                    recursive = True)
+                    recursive = True,
+                    strict_host_checking = False)
 
         return (out, err), proc
 
 
         return (out, err), proc
 
@@ -455,6 +456,7 @@ class LinuxNode(ResourceManager):
             retry = 3,
             err_on_timeout = True,
             connect_timeout = 30,
             retry = 3,
             err_on_timeout = True,
             connect_timeout = 30,
+            strict_host_checking = False,
             persistent = True,
             with_lock = False
             ):
             persistent = True,
             with_lock = False
             ):
@@ -488,7 +490,8 @@ class LinuxNode(ResourceManager):
                         retry = retry,
                         err_on_timeout = err_on_timeout,
                         connect_timeout = connect_timeout,
                         retry = retry,
                         err_on_timeout = err_on_timeout,
                         connect_timeout = connect_timeout,
-                        persistent = persistent
+                        persistent = persistent,
+                        strict_host_checking = strict_host_checking
                         )
             else:
                 (out, err), proc = sshfuncs.rexec(
                         )
             else:
                 (out, err), proc = sshfuncs.rexec(
index 982e23a..13b7b1e 100644 (file)
@@ -12,6 +12,7 @@ import subprocess
 import time
 import tempfile
 
 import time
 import tempfile
 
+# TODO: Add retries to rcopy!! rcopy is not being retried!
 
 logger = logging.getLogger("sshfuncs")
 
 
 logger = logging.getLogger("sshfuncs")
 
@@ -56,6 +57,8 @@ class NOT_STARTED:
 hostbyname_cache = dict()
 
 def gethostbyname(host):
 hostbyname_cache = dict()
 
 def gethostbyname(host):
+    global hostbyname_cache
+    
     hostbyname = hostbyname_cache.get(host)
     if not hostbyname:
         hostbyname = socket.gethostbyname(host)
     hostbyname = hostbyname_cache.get(host)
     if not hostbyname:
         hostbyname = socket.gethostbyname(host)
@@ -191,7 +194,8 @@ def rexec(command, host, user,
         err_on_timeout = True,
         connect_timeout = 30,
         persistent = True,
         err_on_timeout = True,
         connect_timeout = 30,
         persistent = True,
-        forward_x11 = False):
+        forward_x11 = False,
+        strict_host_checking = True):
     """
     Executes a remote command, returns ((stdout,stderr),process)
     """
     """
     Executes a remote command, returns ((stdout,stderr),process)
     """
@@ -214,6 +218,10 @@ def rexec(command, host, user,
             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
             '-o', 'ControlPersist=60' ])
 
             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
             '-o', 'ControlPersist=60' ])
 
+    if not strict_host_checking:
+        # Do not check for Host key. Unsafe.
+        args.extend(['-o', 'StrictHostKeyChecking=no'])
+
     if agent:
         args.append('-A')
 
     if agent:
         args.append('-A')
 
@@ -288,7 +296,8 @@ def rcopy(source, dest,
         agent = True, 
         recursive = False,
         identity = None,
         agent = True, 
         recursive = False,
         identity = None,
-        server_key = None):
+        server_key = None,
+        strict_host_checking = True):
     """
     Copies from/to remote sites.
     
     """
     Copies from/to remote sites.
     
@@ -500,6 +509,10 @@ def rcopy(source, dest,
             tmp_known_hosts = make_server_key_args(server_key, host, port)
             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
 
             tmp_known_hosts = make_server_key_args(server_key, host, port)
             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
 
+        if not strict_host_checking:
+            # Do not check for Host key. Unsafe.
+            args.extend(['-o', 'StrictHostKeyChecking=no'])
+
         if isinstance(source,list):
             args.extend(source)
         else:
         if isinstance(source,list):
             args.extend(source)
         else: