# Search for available RMs
populate_factory()
-
- host1 = 'nepi2.pl.sophia.inria.fr'
+
+ #host1 = 'nepi2.pl.sophia.inria.fr'
+ host1 = 'planetlab2.u-strasbg.fr'
host2 = 'roseval.pl.sophia.inria.fr'
ec = ExperimentController(exp_id = exp_id)
--- /dev/null
+#!/usr/bin/env python
+from neco.execution.ec import ExperimentController, ECState
+from neco.execution.resource import ResourceState, ResourceAction, \
+ populate_factory
+
+from optparse import OptionParser, SUPPRESS_HELP
+
+import os
+import time
+
+def add_node(ec, host, user):
+ node = ec.register_resource("LinuxNode")
+ ec.set(node, "hostname", host)
+ ec.set(node, "username", user)
+ ec.set(node, "cleanHome", True)
+ ec.set(node, "cleanProcesses", True)
+ return node
+
+def add_app(ec):
+ app = ec.register_resource("LinuxApplication")
+ ec.set(app, "command", "sleep 30; echo 'HOLA'")
+ return app
+
+def get_options():
+ slicename = os.environ.get("PL_SLICE")
+
+ usage = "usage: %prog -s <pl-slice>"
+
+ parser = OptionParser(usage=usage)
+ parser.add_option("-s", "--pl-slice", dest="pl_slice",
+ help="PlanetLab slicename", default=slicename, type="str")
+ parser.add_option("-l", "--exp-id", dest="exp_id",
+ help="Label to identify experiment", type="str")
+
+ (options, args) = parser.parse_args()
+
+ return (options.pl_slice, options.exp_id)
+
+if __name__ == '__main__':
+ ( pl_slice, exp_id ) = get_options()
+
+ # Search for available RMs
+ populate_factory()
+
+ apps = []
+
+ hostnames = [
+ "planetlab-2.research.netlab.hut.fi",
+ "planetlab2.willab.fi",
+ "planetlab3.hiit.fi",
+ "planetlab4.hiit.fi",
+ "planetlab1.willab.fi",
+ "planetlab1.s3.kth.se",
+ "itchy.comlab.bth.se",
+ "planetlab-1.ida.liu.se",
+ "planetlab2.s3.kth.se",
+ "planetlab1.sics.se",
+ "planetlab1.tlm.unavarra.es",
+ "planetlab2.uc3m.es",
+ "planetlab1.uc3m.es",
+ "planetlab2.um.es",
+ "planet1.servers.ua.pt",
+ "planetlab2.fct.ualg.pt",
+ "planetlab-1.tagus.ist.utl.pt",
+ "planetlab-2.tagus.ist.utl.pt",
+ "planetlab-um00.di.uminho.pt",
+ "planet2.servers.ua.pt",
+ "planetlab1.mini.pw.edu.pl",
+ "roti.mimuw.edu.pl",
+ "planetlab1.ci.pwr.wroc.pl",
+ "planetlab1.pjwstk.edu.pl",
+ "ple2.tu.koszalin.pl",
+ "planetlab2.ci.pwr.wroc.pl",
+ "planetlab2.cyfronet.pl",
+ "plab2.ple.silweb.pl",
+ "planetlab1.cyfronet.pl",
+ "plab4.ple.silweb.pl",
+ "ple2.dmcs.p.lodz.pl",
+ "planetlab2.pjwstk.edu.pl",
+ "ple1.dmcs.p.lodz.pl",
+ "gschembra3.diit.unict.it",
+ "planetlab1.science.unitn.it",
+ "planetlab-1.ing.unimo.it",
+ "gschembra4.diit.unict.it",
+ "iraplab1.iralab.uni-karlsruhe.de",
+ "planetlab-1.fokus.fraunhofer.de",
+ "iraplab2.iralab.uni-karlsruhe.de",
+ "planet2.zib.de",
+ "pl2.uni-rostock.de",
+ "onelab-1.fhi-fokus.de",
+ "planet2.l3s.uni-hannover.de",
+ "planetlab1.exp-math.uni-essen.de",
+ "planetlab-2.fokus.fraunhofer.de",
+ "planetlab02.tkn.tu-berlin.de",
+ "planetlab1.informatik.uni-goettingen.de",
+ "planetlab1.informatik.uni-erlangen.de",
+ "planetlab2.lkn.ei.tum.de",
+ "planetlab1.wiwi.hu-berlin.de",
+ "planet1.l3s.uni-hannover.de",
+ "planetlab1.informatik.uni-wuerzburg.de",
+ "host3-plb.loria.fr",
+ "inriarennes1.irisa.fr",
+ "inriarennes2.irisa.fr",
+ "peeramide.irisa.fr",
+ "planetlab-1.imag.fr",
+ "planetlab-2.imag.fr",
+ "ple2.ipv6.lip6.fr",
+ "planetlab1.u-strasbg.fr",
+ "planetlab1.ionio.gr",
+ "planetlab2.ionio.gr",
+ "stella.planetlab.ntua.gr",
+ "vicky.planetlab.ntua.gr",
+ "planetlab1.cs.uoi.gr",
+ "pl002.ece.upatras.gr",
+ "planetlab04.cnds.unibe.ch",
+ "lsirextpc01.epfl.ch",
+ "planetlab2.csg.uzh.ch",
+ "planetlab1.csg.uzh.ch",
+ "planetlab-2.cs.unibas.ch",
+ "planetlab-1.cs.unibas.ch",
+ "planetlab4.cs.st-andrews.ac.uk",
+ "planetlab3.xeno.cl.cam.ac.uk",
+ "planetlab1.xeno.cl.cam.ac.uk",
+ "planetlab2.xeno.cl.cam.ac.uk",
+ "planetlab3.cs.st-andrews.ac.uk",
+ "planetlab1.aston.ac.uk",
+ "planetlab1.nrl.eecs.qmul.ac.uk",
+ "chimay.infonet.fundp.ac.be",
+ "orval.infonet.fundp.ac.be",
+ "rochefort.infonet.fundp.ac.be",
+ ]
+
+ ec = ExperimentController(exp_id = exp_id)
+
+ for host in hostnames:
+ node = add_node(ec, host, pl_slice)
+ for i in xrange(20):
+ app = add_app(ec)
+ ec.register_connection(app, node)
+ apps.append(app)
+
+ ec.deploy()
+
+ ec.wait_finished(apps)
+
+ ec.shutdown()
import logging
import os
+import random
import sys
import time
import threading
# TODO: use multiprocessing instead of threading
# TODO: Improve speed. Too slow... !!
+# TODO: When something fails during deployment NECO leaves scp and ssh processes running behind!!
class ECState(object):
RUNNING = 1
"""
self.logger.debug(" ------- DEPLOY START ------ ")
+ stop = []
+
def steps(rm):
- rm.deploy()
- rm.start_with_conditions()
+ try:
+ rm.deploy()
+ rm.start_with_conditions()
+
+ # Only if the RM has STOP consitions we
+ # schedule a stop. Otherwise the RM will stop immediately
+ if rm.conditions.get(ResourceAction.STOP):
+ rm.stop_with_conditions()
+ except:
+ import traceback
+ err = traceback.format_exc()
+
+ self._logger.error("Error occurred while deploying resources: %s" % err)
- # Only if the RM has STOP consitions we
- # schedule a stop. Otherwise the RM will stop immediately
- if rm.conditions.get(ResourceAction.STOP):
- rm.stop_with_conditions()
+ # stop deployment
+ stop.append(None)
if not group:
group = self.resources
+ # Before starting deployment we disorder the group list with the
+ # purpose of speeding up the whole deployment process.
+ # It is likely that the user inserted in the 'group' list closely
+ # resources resources one after another (e.g. all applications
+ # connected to the same node can likely appear one after another).
+ # This can originate a slow down in the deployment since the N
+ # threads the parallel runner uses to processes tasks may all
+ # be taken up by the same family of resources waiting for the
+ # same conditions.
+ # If we disorder the group list, this problem can be mitigated
+ random.shuffle(group)
+
threads = []
for guid in group:
rm = self.get_resource(guid)
thread.setDaemon(True)
thread.start()
- while list(threads) and not self.finished:
+ while list(threads) and not self.finished and not stop:
thread = threads[0]
# Time out after 5 seconds to check EC not terminated
- thread.join(5)
+ thread.join(1)
if not thread.is_alive():
threads.remove(thread)
+ if stop:
+ # stop the scheduler
+ self._stop_scheduler()
+
+ if self._thread.is_alive():
+ self._thread.join()
+
+ raise RuntimeError, "Error occurred, interrupting deployment "
+
def release(self, group = None):
if not group:
group = self.resources
thread.join(5)
if not thread.is_alive():
threads.remove(thread)
-
- self._state = ECState.TERMINATED
def shutdown(self):
self.release()
-
- self._cond.acquire()
- self._cond.notify()
- self._cond.release()
+ self._stop_scheduler()
+
if self._thread.is_alive():
self._thread.join()
self._logger.error("Error while processing tasks in the EC: %s" % err)
self._state = ECState.FAILED
- return
+ finally:
+ runner.sync()
# Mark EC state as terminated
if self.ecstate == ECState.RUNNING:
self._logger.error("Error occurred while executing task: %s" % err)
- # Mark the EC as failed
- self._state = ECState.FAILED
-
- # Wake up the EC in case it was sleeping
- self._cond.acquire()
- self._cond.notify()
- self._cond.release()
+ self._stop_scheduler()
# Propage error to the ParallelRunner
raise
+ def _stop_scheduler(self):
+ # Mark the EC as failed
+ self._state = ECState.FAILED
+
+ # Wake up the EC in case it was sleeping
+ self._cond.acquire()
+ self._cond.notify()
+ self._cond.release()
+
+
reschedule = True
self.debug("---- RESCHEDULING START ---- state %s " % self.state )
else:
- self.debug("---- START CONDITIONS ---- %s" %
- self.conditions.get(ResourceAction.START))
+ start_conditions = self.conditions.get(ResourceAction.START, [])
+
+ self.debug("---- START CONDITIONS ---- %s" % start_conditions)
# Verify all start conditions are met
- start_conditions = self.conditions.get(ResourceAction.START, [])
for (group, state, time) in start_conditions:
+ # Uncomment for debug
+ #unmet = []
+ #for guid in group:
+ # rm = self.ec.get_resource(guid)
+ # unmet.append((guid, rm._state))
+ #
+ #self.debug("---- WAITED STATES ---- %s" % unmet )
+
reschedule, delay = self._needs_reschedule(group, state, time)
if reschedule:
break
from neco.execution.resource import ResourceManager, clsinit, ResourceState
from neco.resources.linux.node import LinuxNode
from neco.util import sshfuncs
+from neco.util.timefuncs import strfnow, strfdiff
import logging
import os
reschedule_delay = "0.5s"
+state_check_delay = 1
# TODO: Resolve wildcards in commands!!
self._ppid = None
self._home = "app-%s" % self.guid
+ # timestamp of last state check of the application
+ self._last_state_check = strfnow()
+
self._logger = logging.getLogger("LinuxApplication")
def log_message(self, msg):
raise RuntimeError, msg
def stop(self):
+ command = self.get('command') or ''
state = self.state
+
if state == ResourceState.STARTED:
- self.info("Stopping command %s" % command)
+ self.info("Stopping command '%s'" % command)
(out, err), proc = self.node.kill(self.pid, self.ppid)
@property
def state(self):
if self._state == ResourceState.STARTED:
- (out, err), proc = self.node.check_output(self.app_home, 'stderr')
+ # To avoid overwhelming the remote hosts and the local processor
+ # with too many ssh queries, the state is only requested
+ # every 'state_check_delay' .
+ if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
+ # check if execution errors occurred
+ (out, err), proc = self.node.check_output(self.app_home, 'stderr')
- if out or err:
- if err.find("No such file or directory") >= 0 :
- # The resource is marked as started, but the
- # command was not yet executed
- return ResourceState.READY
+ if out or err:
+ if err.find("No such file or directory") >= 0 :
+ # The resource is marked as started, but the
+ # command was not yet executed
+ return ResourceState.READY
- # check if execution errors occurred
- msg = " Failed to execute command '%s'" % self.get("command")
- self.error(msg, out, err)
- self._state = ResourceState.FAILED
+ msg = " Failed to execute command '%s'" % self.get("command")
+ self.error(msg, out, err)
+ self._state = ResourceState.FAILED
+
+ elif self.pid and self.ppid:
+ status = self.node.status(self.pid, self.ppid)
+
+ if status == sshfuncs.FINISHED:
+ self._state = ResourceState.FINISHED
- elif self.pid and self.ppid:
- status = self.node.status(self.pid, self.ppid)
- if status == sshfuncs.FINISHED:
- self._state = ResourceState.FINISHED
+ self._last_state_check = strfnow()
return self._state
# TODO: Verify files and dirs exists already
# TODO: Blacklist nodes!
# TODO: Unify delays!!
+# TODO: Validate outcome of uploads!!
reschedule_delay = "0.5s"
def provision(self, filters = None):
if not self.is_alive():
self._state = ResourceState.FAILED
- self.error("Deploy failed. Unresponsive node")
- return
+ msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
+ self.error(msg)
+ raise RuntimeError, msg
if self.get("cleanProcesses"):
self.clean_processes()
out = err = ""
try:
- (out, err), proc = self.execute("echo 'ALIVE'", with_lock = True)
+ (out, err), proc = self.execute("echo 'ALIVE'", retry = 5,
+ with_lock = True)
except:
import traceback
trace = traceback.format_exc()
msg = "Unresponsive host "
- self.warn(msg, out, trace)
+ self.error(msg, out, trace)
return False
if out.strip().startswith('ALIVE'):
return True
else:
msg = "Unresponsive host "
- self.warn(msg, out, err)
+ self.error(msg, out, err)
return False
- # TODO!
- #if self.check_bad_host(out,err):
- # self.blacklist()
-
def copy(self, src, dst):
if self.localhost:
(out, err), proc = execfuncs.lcopy(source, dest,
- recursive = True)
+ recursive = True,
+ strict_host_checking = False)
else:
with self._lock:
(out, err), proc = sshfuncs.rcopy(
port = self.get("port"),
identity = self.get("identity"),
server_key = self.get("serverKey"),
- recursive = True)
+ recursive = True,
+ strict_host_checking = False)
return (out, err), proc
retry = 3,
err_on_timeout = True,
connect_timeout = 30,
+ strict_host_checking = False,
persistent = True,
with_lock = False
):
retry = retry,
err_on_timeout = err_on_timeout,
connect_timeout = connect_timeout,
- persistent = persistent
+ persistent = persistent,
+ strict_host_checking = strict_host_checking
)
else:
(out, err), proc = sshfuncs.rexec(
import time
import tempfile
+# TODO: Add retries to rcopy!! rcopy is not being retried!
logger = logging.getLogger("sshfuncs")
hostbyname_cache = dict()
def gethostbyname(host):
+ global hostbyname_cache
+
hostbyname = hostbyname_cache.get(host)
if not hostbyname:
hostbyname = socket.gethostbyname(host)
err_on_timeout = True,
connect_timeout = 30,
persistent = True,
- forward_x11 = False):
+ forward_x11 = False,
+ strict_host_checking = True):
"""
Executes a remote command, returns ((stdout,stderr),process)
"""
'-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
'-o', 'ControlPersist=60' ])
+ if not strict_host_checking:
+ # Do not check for Host key. Unsafe.
+ args.extend(['-o', 'StrictHostKeyChecking=no'])
+
if agent:
args.append('-A')
agent = True,
recursive = False,
identity = None,
- server_key = None):
+ server_key = None,
+ strict_host_checking = True):
"""
Copies from/to remote sites.
tmp_known_hosts = make_server_key_args(server_key, host, port)
args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
+ if not strict_host_checking:
+ # Do not check for Host key. Unsafe.
+ args.extend(['-o', 'StrictHostKeyChecking=no'])
+
if isinstance(source,list):
args.extend(source)
else: