- remote_kill can be told not to wait, for increase parallelism if you'll be waiting later with remote_status
- somehow TunProto.checkpid gets called before the PID is ready (maybe before the tun_connect script is spawned).
In those cases, old pids get fetched, instead of simply retrying later.
To solve this, TunProto._make_home removes old pidfiles
- tests that didn't perform shutdown on exception conditions now do
raise NotImplementedError
def shutdown(self):
- for trace in self._traces.values():
+ for trace in self._traces.itervalues():
trace.close()
- for element in self._elements.values():
+ for element in self._elements.itervalues():
# invoke cleanup hooks
if hasattr(element, 'cleanup'):
element.cleanup()
+ for element in self._elements.itervalues():
+ # invoke destroy hooks
+ if hasattr(element, 'destroy'):
+ element.destroy()
+ self._elements.clear()
+ self._traces.clear()
def trace(self, guid, trace_id, attribute='value'):
app = self._elements[guid]
def cleanup(self):
if self.peer_proto_impl:
self.peer_proto_impl.shutdown()
+
+ def destroy(self):
+ if self.peer_proto_impl:
+ self.peer_proto_impl.destroy()
self.peer_proto_impl = None
def async_launch_wait(self):
def remote_kill(pid, ppid, sudo = False,
host = None, port = None, user = None, agent = None,
- ident_key = None, server_key = None):
+ ident_key = None, server_key = None,
+ nowait = False):
"""
Kill a process spawned with remote_spawn.
Nothing, should have killed the process
"""
-
- (out,err),proc = server.popen_ssh_command(
- """
+
+ cmd = """
%(sudo)s kill %(pid)d
-for x in 1 2 3 4 5 6 7 8 9 0 ; do
- sleep 0.1
+for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
+ sleep 0.2
if [ `ps --ppid %(ppid)d -o pid | grep -c %(pid)d` == '0' ]; then
break
fi
- sleep 0.9
+ sleep 1.8
done
if [ `ps --ppid %(ppid)d -o pid | grep -c %(pid)d` != '0' ]; then
%(sudo)s kill -9 %(pid)d
fi
-""" % {
+"""
+ if nowait:
+ cmd = "{ %s } >/dev/null 2>/dev/null </dev/null &" % (cmd,)
+
+ (out,err),proc = server.popen_ssh_command(
+ cmd % {
'ppid' : ppid,
'pid' : pid,
'sudo' : 'sudo -S' if sudo else ''
import passfd
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+ retrydelay = 1.0
for i in xrange(30):
try:
sock.connect(options.pass_fd)
break
except socket.error:
# wait a while, retry
- print >>sys.stderr, "Could not connect. Retrying in a sec..."
- time.sleep(1)
+ print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),)
+ time.sleep(min(30.0,retrydelay))
+ retrydelay *= 1.1
else:
sock.connect(options.pass_fd)
passfd.sendfd(sock, tun.fileno(), '0')
print >>sys.stderr, "Listening at: %s:%d" % (hostaddr,options.udp)
print >>sys.stderr, "Connecting to: %s:%d" % (remaining_args[0],options.port)
rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
+ retrydelay = 1.0
for i in xrange(30):
try:
rsock.bind((hostaddr,options.udp))
break
except socket.error:
# wait a while, retry
- print >>sys.stderr, "Could not bind. Retrying in a sec..."
- time.sleep(1)
+ print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
+ time.sleep(min(30.0,retrydelay))
+ retrydelay *= 1.1
else:
rsock.bind((hostaddr,options.udp))
rsock.connect((remaining_args[0],options.port))
if remaining_args and not remaining_args[0].startswith('-'):
print >>sys.stderr, "Connecting to: %s:%d" % (remaining_args[0],options.port)
rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+ retrydelay = 1.0
for i in xrange(30):
try:
rsock.connect((remaining_args[0],options.port))
break
except socket.error:
# wait a while, retry
- print >>sys.stderr, "Could not connect. Retrying in a sec..."
- time.sleep(1)
+ print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),)
+ time.sleep(min(30.0,retrydelay))
+ retrydelay *= 1.1
else:
rsock.connect((remaining_args[0],options.port))
else:
print >>sys.stderr, "Listening at: %s:%d" % (hostaddr,options.port)
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+ retrydelay = 1.0
for i in xrange(30):
try:
lsock.bind((hostaddr,options.port))
break
except socket.error:
# wait a while, retry
- print >>sys.stderr, "Could not bind. Retrying in a sec..."
- time.sleep(1)
+ print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
+ time.sleep(min(30.0,retrydelay))
+ retrydelay *= 1.1
else:
lsock.bind((hostaddr,options.port))
lsock.listen(1)
self._launcher = None
self._started = False
+ self._starting = False
self._pid = None
self._ppid = None
self._if_name = None
# Make sure all the paths are created where
# they have to be created for deployment
- cmd = "mkdir -p %s" % (server.shell_escape(self.home_path),)
+ # Also remove pidfile, if there is one.
+ # Old pidfiles from previous runs can be troublesome.
+ cmd = "mkdir -p %(home)s ; rm -f %(home)s/pid" % {
+ 'home' : server.shell_escape(self.home_path)
+ }
(out,err),proc = server.popen_ssh_command(
cmd,
host = local.node.hostname,
raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
def launch(self, check_proto, listen, extra_args=[]):
+ if self._starting:
+ raise AssertionError, "Double start"
+
+ self._starting = True
+
peer = self.peer()
local = self.local()
agent = None,
ident_key = local.node.ident_path,
server_key = local.node.server_key,
- sudo = True
+ sudo = True,
+ nowait = True
)
+
+ def waitkill(self):
+ interval = 1.0
+ for i in xrange(30):
+ status = self.status()
+ if status != rspawn.RUNNING:
+ break
+ time.sleep(interval)
+ interval = min(30.0, interval * 1.1)
def sync_trace(self, local_dir, whichtrace):
if whichtrace != 'packets':
Cleanup
"""
raise NotImplementedError
+
+ def destroy(self):
+ """
+ Second-phase cleanup
+ """
+ pass
class TunProtoUDP(TunProtoBase):
def shutdown(self):
self.kill()
+ def destroy(self):
+ self.waitkill()
+
def launch(self, check_proto='udp', listen=False, extra_args=None):
if extra_args is None:
extra_args = ("-u",str(self.port))
def shutdown(self):
self.kill()
+ def destroy(self):
+ self.waitkill()
+
def launch(self, check_proto='fd', listen=False, extra_args=[]):
super(TunProtoFD, self).launch(check_proto, listen, extra_args)
peer.peer_proto_impl.async_launch_wait()
if not self._started:
- self.launch('tcp', False)
- else:
- # make sure WE are ready
- self.async_launch_wait()
+ self.async_launch('tcp', False)
self.checkpid()
def shutdown(self):
self.kill()
+ def destroy(self):
+ self.waitkill()
+
def launch(self, check_proto='tcp', listen=None, extra_args=[]):
if listen is None:
listen = self.listening
import struct
import socket
import threading
+import errno
def ipfmt(ip):
ipbytes = map(ord,ip.decode("hex"))
wset.append(tun)
if packetReady(fwbuf, ether_mode):
wset.append(remote)
- rdrdy, wrdy, errs = select.select((tun,remote),wset,(tun,remote),1)
+
+ try:
+ rdrdy, wrdy, errs = select.select((tun,remote),wset,(tun,remote),1)
+ except select.error, e:
+ if e.args[0] == errno.EINTR:
+ # just retry
+ continue
# check for errors
if errs:
xml = exp.to_xml()
- controller = ExperimentController(xml, self.root_dir)
- controller.start()
- # just test that it starts...
- controller.stop()
- controller.shutdown()
+ try:
+ controller = ExperimentController(xml, self.root_dir)
+ controller.start()
+ # just test that it starts...
+ finally:
+ controller.stop()
+ controller.shutdown()
@test_util.skipUnless(test_util.pl_auth() is not None,
"Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
xml = exp.to_xml()
- controller = ExperimentController(xml, self.root_dir)
- controller.start()
+ try:
+ controller = ExperimentController(xml, self.root_dir)
+ controller.start()
- while not controller.is_finished(ping.guid):
- time.sleep(0.5)
-
- ping_result = controller.trace(ping.guid, "stdout")
- tap_trace = controller.trace(tap1.guid, "packets")
+ while not controller.is_finished(ping.guid):
+ time.sleep(0.5)
+
+ ping_result = controller.trace(ping.guid, "stdout")
+ tap_trace = controller.trace(tap1.guid, "packets")
- controller.stop()
- controller.shutdown()
+ finally:
+ controller.stop()
+ controller.shutdown()
# asserts at the end, to make sure there's proper cleanup
self.assertTrue(re.match(comp_result, ping_result, re.MULTILINE),
xml = exp.to_xml()
- controller = ExperimentController(xml, self.root_dir)
- controller.start()
+ try:
+ controller = ExperimentController(xml, self.root_dir)
+ controller.start()
- while not controller.is_finished(ping.guid):
- time.sleep(0.5)
-
- tap_trace = controller.trace(tap1.guid, "packets")
+ while not controller.is_finished(ping.guid):
+ time.sleep(0.5)
+
+ tap_trace = controller.trace(tap1.guid, "packets")
- controller.stop()
- controller.shutdown()
+ finally:
+ controller.stop()
+ controller.shutdown()
# asserts at the end, to make sure there's proper cleanup
sent = 0