"""A SSH Interface class. An interface to ssh on posix systems, and plink (part of the Putty suite) on Win32 systems. By Rasjid Wilcox. Copyright (c) 2002. Version: 0.2 Last modified 4 September 2002. Drawing on ideas from work by Julian Schaefer-Jasinski, Guido's telnetlib and version 0.1 of pyssh (http://pyssh.sourceforge.net) by Chuck Esterbrook. Licenced under a Python 2.2 style license. See License.txt. """ DEBUG_LEVEL = 1 import os, getpass import signal # should cause all KeyboardInterrupts to go to the main thread # try for Linux, does not seem to be try under Cygwin import nbpipe import time # Constants SSH_PORT=806 SSH_PATH='' CTRL_C=chr(3) READ_LAZY=0 READ_SOME=1 READ_ALL=2 READ_UNTIL=3 # set the path to ssh / plink, and chose the popen2 funciton to use if os.name=='posix': import fssa # we can look for ssh-agent on posix # XXX Can we on Win32/others? import ptyext # if my patch gets accepted, change this to check for a # sufficiently high version of python, and assign ptyext=pty # if sufficient. sshpopen2=ptyext.popen2 CLOSE_STR='~.' tp=os.popen('/usr/bin/which ssh') SSH_PATH=tp.read().strip() try: tp.close() except IOError: # probably no child process pass if SSH_PATH == '': tp=os.popen('command -v ssh') # works in bash, ash etc, not csh etc. SSH_PATH=tp.read().strip() tp.close() if SSH_PATH == '': check = ['/usr/bin/ssh', '/usr/local/bin/ssh', '/bin/ssh'] for item in check: if os.path.isfile(item): SSH_PATH=item break PORT_STR='-p ' else: sshpopen2=os.popen2 CLOSE_STR=CTRL_C # FIX-ME: This does not work. # I think I need to implement a 'kill' component # to the close function using win32api. SSH_PATH='' PORT_STR='-P ' class mysshError(Exception): """Error class for myssh.""" pass # Helper functions def _prompt(prompt): """Print the message as the prompt for input. Return the text entered.""" noecho = (prompt.lower().find('password:') >= 0) or \ (prompt.lower().find('passphrase:') >=0) print """User input required for ssh connection. (Type Ctrl-C to abort connection.)""" abort = 0 try: if noecho: response = getpass.getpass(prompt) else: response = raw_input(prompt) except KeyboardInterrupt: response = '' abort = 1 return response, abort class Ssh: """A SSH connection class.""" def __init__(self, username=None, host='localhost', port=None): """Constructor. This does not try to connect.""" self.debuglevel = DEBUG_LEVEL self.sshpath = SSH_PATH self.username = username self.host = host self.port = port self.isopen = 0 self.sshpid = 0 # perhaps merge this with isopen #self.old_handler = signal.getsignal(signal.SIGCHLD) #sig_handler = signal.signal(signal.SIGCHLD, self.sig_handler) def __del__(self): """Destructor -- close the connection.""" if self.isopen: self.close() def sig_handler(self, signum, stack): """ Handle SIGCHLD signal """ if signum == signal.SIGCHLD: try: os.waitpid(self.sshpid, 0) except: pass if self.old_handler != signal.SIG_DFL: self.old_handler(signum, stack) def attach_agent(self, key=None): if os.name != 'posix': # only posix support at this time return if 'SSH_AUTH_SOCK' not in os.environ.keys(): fssa.fssa(key) def set_debuglevel(self, debuglevel): """Set the debug level.""" self.debuglevel = debuglevel def set_sshpath(self, sshpath): """Set the ssh path.""" self.sshpath=sshpath # Low level functions def open(self, cmd=None): """Opens a ssh connection. Raises an mysshError if myssh.sshpath is not a file. Raises an error if attempting to open an already open connection. """ #self.attach_agent() if not os.path.isfile(self.sshpath): raise mysshError, \ "Path to ssh or plink is not defined or invalid.\nsshpath='%s'" \ % self.sshpath if self.isopen: raise mysshError, "Connection already open." sshargs = '' if self.sshpath.lower().find('plink') != -1: sshargs = '-ssh ' if self.port and self.port != '': sshargs += PORT_STR + self.port + ' ' sshargs += " -o StrictHostKeyChecking=no -o PasswordAuthentication=yes -o PubkeyAuthentication=no " if self.username and self.username !='': sshargs += self.username + '@' sshargs += self.host if cmd: sshargs += ' ' + cmd if self.debuglevel: print ">> Running %s %s." % (self.sshpath, sshargs) # temporary workaround until I get pid's working under win32 #print sshargs if os.name == 'posix': self.sshin, self.sshoutblocking, self.sshpid = \ sshpopen2(self.sshpath + ' ' + sshargs) else: self.sshin, self.sshoutblocking = \ sshpopen2(self.sshpath + ' ' + sshargs) self.sshout = nbpipe.nbpipe(self.sshoutblocking) self.isopen = 1 if self.debuglevel: print ">> ssh pid is %s." % self.sshpid def close(self, addnewline=1): """Close the ssh connection by closing the input and output pipes. Returns the closing messages. On Posix systems, by default it adds a newline before sending the disconnect escape sequence. Turn this off by setting addnewline=0. """ if os.name == 'posix': try: if addnewline: self.write('\n') self.write(CLOSE_STR) except (OSError, IOError, mysshError): pass output = self.read_lazy() try: self.sshin.close() self.sshoutblocking.close() except: pass if os.name == 'posix': try: os.kill(self.sshpid, signal.SIGHUP) os.waitpid(self.sshpid, 0) except: pass self.isopen = 0 if self.debuglevel: print ">> Connection closed." return output def write(self, text): """Send text to the ssh process.""" # May block?? Probably not in practice, as ssh has a large input buffer. if self.debuglevel: print ">> Sending %s" % text if self.isopen: while len(text): numtaken = os.write(self.sshin.fileno(),text) if self.debuglevel: print ">> %s characters taken" % numtaken text = text[numtaken:] else: raise mysshError, "Attempted to write to closed connection." # There is a question about what to do with connections closed by the other # end. Should write and read check for this, and force proper close? def read_lazy(self): """Lazy read from sshout. Waits a little, but does not block.""" return self.sshout.read_lazy() def read_some(self): """Always read at least one block, unless the connection is closed. My block.""" if self.isopen: return self.sshout.read_some() else: return self.sshout.read_lazy() def read_until(self, match, timeout=None): """Read until a given string is encountered or until timeout. When no match is found, return whatever is available instead, possibly the empty string. Raise EOFError if the connection is closed and no cooked data is available. """ if self.isopen: return self.sshout.read_until(match, timeout) else: return self.sshout.read_lazy() def read_all(self): """Reads until end of file hit. May block.""" if self.isopen: return self.sshout.read_all() else: return self.sshout.read_lazy() # High level funcitons def login(self, logintext='Last login:', prompt_callback=_prompt): """Logs in to the ssh host. Checks for standard prompts, and calls the function passed as promptcb to process them. Returns the login banner, or 'None' if login process aborted. """ self.open() banner = self.read_some() if self.debuglevel: print ">> 1st banner read is: %s" % banner while banner.find(logintext) == -1: response, abort = prompt_callback(banner) if abort: return self.close() self.write(response + '\n') banner = self.read_some() return banner def logout(self): """Logs out the session.""" self.close() def sendcmd(self, cmd, readtype=READ_SOME, expected=None, timeout=None): """Sends the command 'cmd' over the ssh connection, and returns the result. By default it uses read_some, which may block. """ if cmd[-1] != '\n': cmd += '\n' self.write(cmd) if readtype == READ_ALL: return self.read_all() elif readtype == READ_LAZY: return self.read_lazy() elif readtype == READ_UNTIL and expected is not None: return self.read_until(expected, timeout) else: return self.read_some() def test(): """Test routine for myssh. Usage: python myssh.py [-d] [-sshp path-to-ssh] [username@host | host] [port] Default host is localhost, default port is 22. """ import sys debug = 0 if sys.argv[1:] and sys.argv[1] == '-d': debug = 1 del sys.argv[1] testsshpath = SSH_PATH if sys.argv[1:] and sys.argv[1] == '-sshp': testsshpath = sys.argv[2] del sys.argv[1] del sys.argv[1] testusername = None testhost = 'localhost' testport = '22' if sys.argv[1:]: testhost = sys.argv[1] if testhost.find('@') != -1: testusername, testhost = testhost.split('@') if sys.argv[2:]: testport = sys.argv[2] testcon = Ssh(testusername, testhost, testport) testcon.set_debuglevel(debug) testcon.set_sshpath(testsshpath) testcon.login() cmd = None while (cmd != 'exit') and testcon.isopen: cmd = raw_input("Enter command to send: ") print testcon.sendcmd(cmd) testcon.close() if __name__ == '__main__': test()