--- /dev/null
+from pexpect import *
+#import os, sys, getopt, shutil
+
+class pxssh (spawn):
+ """This class extends pexpect.spawn to specialize setting up SSH connections.
+ This adds methods for login, logout, and expecting the prompt.
+ It does various hacky things to handle any situation in the SSH login process.
+ For example, if the session is your first login, then it automatically
+ accepts the certificate; or if you have public key authentication setup
+ and you don't need a password then this is OK too.
+
+ Example usage that runs 'ls -l' on server and prints the result:
+ import pxssh
+ s = pxssh.pxssh()
+ if not s.login ('localhost', 'myusername', 'mypassword'):
+ print "SSH session failed on login."
+ print str(s)
+ else:
+ print "SSH session login successful"
+ s.sendline ('ls -l')
+ s.prompt() # match the prompt
+ print s.before # print everything before the prompt.
+ s.logout()
+ """
+
+ def __init__ (self):
+ # SUBTLE HACK ALERT!
+ # Note that the command to set the prompt uses a slightly different string
+ # than the regular expression to match it. This is because when you set the
+ # prompt the command will echo back, but we don't want to match the echoed
+ # command. So if we make the set command slightly different than the regex
+ # we eliminate the problem. To make the set command different we add a
+ # backslash in front of $. The $ doesn't need to be escaped, but it doesn't
+ # hurt and serves to make the set command different than the regex.
+ self.PROMPT = "\[PEXPECT\][\$\#] " # used to match the command-line prompt.
+ # used to set shell command-line prompt to something more unique.
+ self.PROMPT_SET_SH = "PS1='[PEXPECT]\$ '"
+ self.PROMPT_SET_CSH = "set prompt='[PEXPECT]\$ '"
+
+ ### TODO: This is getting messy and I'm pretty sure this isn't perfect.
+ ### TODO: I need to draw a better flow chart for this.
+ def login (self,server,username,password='',ssh_options="",terminal_type='ansi',original_prompts=r"][#$]|~[#$]|bash.*?[#$]|[#$] ",login_timeout=10):
+ """This logs the user into the given server. By default the prompt is
+ rather optimistic and should be considered more of an example. It's
+ better to try to match the prompt as exactly as possible to prevent
+ any false matches by server strings such as a "Message Of The Day" or
+ something. The closer you can make the original_prompt match your real prompt
+ then the better. A timeout will not necessarily cause the login to fail.
+ In the case of a timeout we assume that the prompt was so weird that
+ we could not match it. We still try to reset the prompt to something
+ more unique. If that still fails then we return False.
+ """
+ cmd = "ssh %s -l %s %s" % (ssh_options, username, server)
+ print cmd
+ spawn.__init__(self, cmd, timeout=login_timeout)
+ #, "(?i)no route to host"])
+ i = self.expect(["(?i)are you sure you want to continue connecting", original_prompts, "(?i)password", "(?i)permission denied", "(?i)terminal type", TIMEOUT, "(?i)connection closed by remote host"])
+ if i==0: # New certificate -- always accept it. This is what you if SSH does not have the remote host's public key stored in the cache.
+ self.sendline("yes")
+ i = self.expect(["(?i)are you sure you want to continue connecting", original_prompts, "(?i)password", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
+ if i==2: # password
+ self.sendline(password)
+ i = self.expect(["(?i)are you sure you want to continue connecting", original_prompts, "(?i)password", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
+ if i==4:
+ self.sendline(terminal_type)
+ i = self.expect(["(?i)are you sure you want to continue connecting", original_prompts, "(?i)password", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
+
+ if i==0:
+ # This is weird. This should not happen twice in a row.
+ self.close()
+ return False
+ elif i==1: # can occur if you have a public key pair set to authenticate.
+ ### TODO: May NOT be OK if expect() matched a false prompt.
+ pass
+ elif i==2: # password prompt again
+ # For incorrect passwords, some ssh servers will
+ # ask for the password again, others return 'denied' right away.
+ # If we get the password prompt again then this means
+ # we didn't get the password right the first time.
+ self.close()
+ return False
+ elif i==3: # permission denied -- password was bad.
+ self.close()
+ return False
+ elif i==4: # terminal type again? WTF?
+ self.close()
+ return False
+ elif i==5: # Timeout
+ # This is tricky... presume that we are at the command-line prompt.
+ # It may be that the prompt was so weird that we couldn't match it.
+ pass
+ elif i==6: # Connection closed by remote host
+ self.close()
+ return False
+ else: # Unexpected
+ self.close()
+ return False
+ # We appear to be in -- reset prompt to something more unique.
+ #if not self.set_unique_prompt():
+ # self.close()
+ # return False
+ return True
+
+ def logout (self):
+ """This sends exit. If there are stopped jobs then this sends exit twice.
+ """
+ self.sendline("exit")
+ index = self.expect([EOF, "(?i)there are stopped jobs"])
+ if index==1:
+ self.sendline("exit")
+ self.expect(EOF)
+
+ def prompt (self, timeout=20):
+ """This expects the prompt. This returns True if the prompt was matched.
+ This returns False if there was a timeout.
+ """
+ i = self.expect([self.PROMPT, TIMEOUT], timeout=timeout)
+ if i==1:
+ return False
+ return True
+
+ def set_unique_prompt (self, optional_prompt=None):
+ """This attempts to reset the shell prompt to something more unique.
+ This makes it easier to match unambiguously.
+ """
+ if optional_prompt is not None:
+ self.prompt = optional_prompt
+ self.sendline (self.PROMPT_SET_SH) # sh-style
+ i = self.expect ([TIMEOUT, self.PROMPT], timeout=10)
+ if i == 0: # csh-style
+ self.sendline (self.PROMPT_SET_CSH)
+ i = self.expect ([TIMEOUT, self.PROMPT], timeout=10)
+ if i == 0:
+ return 0
+ return 1
+