from neco.execution.resource import Resource
from neco.util.sshfuncs import eintr_retry, rexec, rcopy, \
- rspawn, rcheck_pid, rstatus, rkill, RUNNING
+ rspawn, rcheck_pid, rstatus, rkill, make_control_path, RUNNING
import cStringIO
import logging
self.user = None
self.port = None
self.identity_file = None
+ self.enable_x11 = False
+ self.forward_agent = True
+
# packet management system - either yum or apt for now...
self._pm = None
self.box.guid)
self._logger.setLevel(getattr(logging, loglevel.upper()))
+ # For ssh connections we use the ControlMaster option which
+ # allows us to decrease the number of open ssh network connections.
+ # Subsequent ssh connections will reuse a same master connection.
+ # This might pose a problem when using X11 and ssh-agent, since
+ # display and agent forwarded will be those of the first connection,
+ # which created the master.
+ # To avoid reusing a master created by a previous LinuxNode instance,
+ # we explicitly erase the ControlPath socket.
+ control_path = make_control_path(self.user, self.host, self.port)
+ try:
+ os.remove(control_path)
+ except:
+ pass
+
@property
def pm(self):
if self._pm:
if not os.path.isfile(src):
src = cStringIO.StringIO(src)
+ # Build destination as <user>@<server>:<path>
+ dst = "%s@%s:%s" % (self.user, self.host or self.ip, dst)
+
+ (out, err), proc = eintr_retry(rcopy)(
+ src, dst,
+ port = self.port,
+ identity_file = self.identity_file)
+
+ if proc.wait():
+ msg = "Error uploading to %s got:\n%s%s" %\
+ (self.host or self.ip, out, err)
+ self._logger.error(msg)
+ raise RuntimeError(msg)
+
+ def download(self, src, dst):
+ # Build destination as <user>@<server>:<path>
+ src = "%s@%s:%s" % (self.user, self.host or self.ip, src)
+
(out, err), proc = eintr_retry(rcopy)(
src, dst,
- self.host or self.ip,
- self.user,
port = self.port,
identity_file = self.identity_file)
self.host or self.ip,
self.user,
port = self.port,
+ agent = self.forward_agent,
identity_file = self.identity_file,
+ x11 = self.enable_x11,
timeout = 60,
err_on_timeout = False,
persistent = False)
)
def execute(self, command,
- agent = True,
sudo = False,
stdin = "",
tty = False,
+ env = None,
timeout = None,
retry = 0,
err_on_timeout = True,
self.host or self.ip,
self.user,
port = self.port,
- agent = agent,
+ agent = self.forward_agent,
sudo = sudo,
stdin = stdin,
identity_file = self.identity_file,
tty = tty,
+ x11 = self.enable_x11,
+ env = env,
timeout = timeout,
retry = retry,
err_on_timeout = err_on_timeout,
host = self.host,
user = self.user,
port = self.port,
+ agent = self.forward_agent,
identity_file = self.identity_file
)
host = self.host,
user = self.user,
port = self.port,
+ agent = self.forward_agent,
identity_file = self.identity_file
)
host = self.host,
user = self.user,
port = self.port,
+ agent = self.forward_agent,
identity_file = self.identity_file
)
host = self.host,
user = self.user,
port = self.port,
+ agent = self.forward_agent,
sudo = sudo,
identity_file = self.identity_file
)
import hashlib
OPENSSH_HAS_PERSIST = None
-CONTROL_PATH = "yyyyy_ssh_control_path"
+CONTROL_PATH = "yyy_ssh_ctrl_path"
if hasattr(os, "devnull"):
DEV_NULL = os.devnull
connkey = hashlib.sha1(connkey).hexdigest()
return connkey
+def make_control_path(user, host, port):
+ connkey = make_connkey(user, host, port)
+ return '/tmp/%s_%s' % ( CONTROL_PATH, connkey, )
+
def rexec(command, host, user,
port = None,
agent = True,
sudo = False,
- stdin = "",
+ stdin = "",
identity_file = None,
+ env = None,
tty = False,
+ x11 = False,
timeout = None,
retry = 0,
err_on_timeout = True,
"""
Executes a remote command, returns ((stdout,stderr),process)
"""
- connkey = make_connkey(user, host, port)
args = ['ssh', '-C',
# Don't bother with localhost. Makes test easier
'-o', 'NoHostAuthenticationForLocalhost=yes',
'-l', user, host]
if persistent and openssh_has_persist():
+ control_path = make_control_path(user, host, port)
args.extend([
'-o', 'ControlMaster=auto',
- '-o', 'ControlPath=/tmp/%s_%s' % ( CONTROL_PATH, connkey, ),
+ '-o', 'ControlPath=%s' % control_path,
'-o', 'ControlPersist=60' ])
if agent:
args.append('-A')
args.append('-t')
if sudo:
args.append('-t')
+ if x11:
+ args.append('-X')
+
+ if env:
+ export = ''
+ for envkey, envval in env.iteritems():
+ export += '%s=%s ' % (envkey, envval)
+ command = export + command
if sudo:
command = "sudo " + command
+
args.append(command)
for x in xrange(retry or 3):
return ((out, err), proc)
-def rcopy(source, dest, host, user,
+def rcopy(source, dest,
port = None,
agent = True,
recursive = False,
if isinstance(source, file) and source.tell() == 0:
source = source.name
-
elif hasattr(source, 'read'):
tmp = tempfile.NamedTemporaryFile()
while True:
if isinstance(source, file) or isinstance(dest, file) \
or hasattr(source, 'read') or hasattr(dest, 'write'):
assert not recursive
+
+ # Parse source/destination as <user>@<server>:<path>
+ if isinstance(dest, basestring) and ':' in dest:
+ remspec, path = dest.split(':',1)
+ elif isinstance(source, basestring) and ':' in source:
+ remspec, path = source.split(':',1)
+ else:
+ raise ValueError, "Both endpoints cannot be local"
+ user,host = remspec.rsplit('@',1)
+ tmp_known_hosts = None
- connkey = make_connkey(user,host,port)
args = ['ssh', '-l', user, '-C',
# Don't bother with localhost. Makes test easier
'-o', 'NoHostAuthenticationForLocalhost=yes',
'-o', 'ServerAliveInterval=30',
'-o', 'TCPKeepAlive=yes',
host ]
+
if openssh_has_persist():
+ control_path = make_control_path(user, host, port)
args.extend([
'-o', 'ControlMaster=auto',
- '-o', 'ControlPath=/tmp/%s_%s' % ( CONTROL_PATH, connkey, ),
+ '-o', 'ControlPath=%s' % control_path,
'-o', 'ControlPersist=60' ])
if port:
args.append('-P%d' % port)
args.extend(('-i', identity_file))
if isinstance(source, file) or hasattr(source, 'read'):
- args.append('cat > %s' % dest)
+ args.append('cat > %s' % (shell_escape(path),))
elif isinstance(dest, file) or hasattr(dest, 'write'):
- args.append('cat %s' % dest)
+ args.append('cat %s' % (shell_escape(path),))
else:
raise AssertionError, "Unreachable code reached! :-Q"
-
+
# connects to the remote host and starts a remote connection
if isinstance(source, file):
proc = subprocess.Popen(args,
else:
raise AssertionError, "Unreachable code reached! :-Q"
else:
+ # Parse destination as <user>@<server>:<path>
+ if isinstance(dest, basestring) and ':' in dest:
+ remspec, path = dest.split(':',1)
+ elif isinstance(source, basestring) and ':' in source:
+ remspec, path = source.split(':',1)
+ else:
+ raise ValueError, "Both endpoints cannot be local"
+ user,host = remspec.rsplit('@',1)
+
# plain scp
args = ['scp', '-q', '-p', '-C',
# Don't bother with localhost. Makes test easier
args.extend(source)
else:
if openssh_has_persist():
- connkey = make_connkey(user,host,port)
+ control_path = make_control_path(user, host, port)
args.extend([
'-o', 'ControlMaster=no',
- '-o', 'ControlPath=/tmp/%s_%s' % ( CONTROL_PATH, connkey, )])
+ '-o', 'ControlPath=%s' % control_path ])
args.append(source)
- args.append("%s@%s:%s" %(user, host, dest))
+
+ args.append(dest)
# connects to the remote host and starts a remote connection
proc = subprocess.Popen(args,
return node
+ def t_xterm(self, node, target):
+ if not node.is_alive():
+ print "*** WARNING: Skipping test: Node %s is not alive\n" % (node.host)
+ return
+
+ node.enable_x11 = True
+
+ node.install('xterm')
+
+ out = node.execute('xterm')
+
+ node.uninstall('xterm')
+
+ self.assertEquals(out, "")
+
def t_execute(self, node, target):
if not node.is_alive():
print "*** WARNING: Skipping test: Node %s is not alive\n" % (node.host)
command = "%s/hello" % self.home
out = node.execute(command)
- self.assertEquals(out, "Hello, world!\n")
-
node.uninstall('gcc')
node.rmdir(self.home)
+ self.assertEquals(out, "Hello, world!\n")
+
def test_execute_fedora(self):
self.t_execute(self.node_fedora, self.target)
def test_install_ubuntu(self):
self.t_install(self.node_ubuntu, self.target)
+ def xtest_xterm_fedora(self):
+ """ PlanetLab doesn't currently support X11 forwarding.
+ Interactive test. Should not run automatically """
+ self.t_xterm(self.node_fedora, self.target)
+
+ def xtest_xterm_ubuntu(self):
+ """ Interactive test. Should not run automatically """
+ self.t_xterm(self.node_ubuntu, self.target)
+
+
if __name__ == '__main__':
unittest.main()