This is a better module for dealing with SSH logins using 'expect' like
[monitor.git] / ssh / pxssh.py
1 from pexpect import *
2 #import os, sys, getopt, shutil
3
4 class pxssh (spawn):
5     """This class extends pexpect.spawn to specialize setting up SSH connections.
6     This adds methods for login, logout, and expecting the prompt.
7     It does various hacky things to handle any situation in the SSH login process.
8     For example, if the session is your first login, then it automatically
9     accepts the certificate; or if you have public key authentication setup
10     and you don't need a password then this is OK too.
11
12     Example usage that runs 'ls -l' on server and prints the result:
13         import pxssh
14         s = pxssh.pxssh()
15         if not s.login ('localhost', 'myusername', 'mypassword'):
16             print "SSH session failed on login."
17             print str(s)
18         else:
19             print "SSH session login successful"
20             s.sendline ('ls -l')
21             s.prompt()           # match the prompt
22             print s.before     # print everything before the prompt.
23             s.logout()
24     """
25
26     def __init__ (self):
27         # SUBTLE HACK ALERT!
28         # Note that the command to set the prompt uses a slightly different string
29         # than the regular expression to match it. This is because when you set the
30         # prompt the command will echo back, but we don't want to match the echoed
31         # command. So if we make the set command slightly different than the regex
32         # we eliminate the problem. To make the set command different we add a
33         # backslash in front of $. The $ doesn't need to be escaped, but it doesn't
34         # hurt and serves to make the set command different than the regex.
35         self.PROMPT = "\[PEXPECT\][\$\#] " # used to match the command-line prompt.
36         # used to set shell command-line prompt to something more unique.
37         self.PROMPT_SET_SH = "PS1='[PEXPECT]\$ '"
38         self.PROMPT_SET_CSH = "set prompt='[PEXPECT]\$ '"
39
40     ### TODO: This is getting messy and I'm pretty sure this isn't perfect.
41     ### TODO: I need to draw a better flow chart for this.
42     def login (self,server,username,password='',ssh_options="",terminal_type='ansi',original_prompts=r"][#$]|~[#$]|bash.*?[#$]|[#$] ",login_timeout=10):
43         """This logs the user into the given server. By default the prompt is
44         rather optimistic and should be considered more of an example. It's
45         better to try to match the prompt as exactly as possible to prevent
46         any false matches by server strings such as a "Message Of The Day" or
47         something. The closer you can make the original_prompt match your real prompt
48         then the better. A timeout will not necessarily cause the login to fail.
49         In the case of a timeout we assume that the prompt was so weird that
50         we could not match it. We still try to reset the prompt to something
51         more unique. If that still fails then we return False.
52         """
53         cmd = "ssh %s -l %s %s" % (ssh_options, username, server)
54         print cmd
55         spawn.__init__(self, cmd, timeout=login_timeout)
56         #, "(?i)no route to host"])
57         i = self.expect(["(?i)are you sure you want to continue connecting", original_prompts, "(?i)password", "(?i)permission denied", "(?i)terminal type", TIMEOUT, "(?i)connection closed by remote host"])
58         if i==0: # New certificate -- always accept it. This is what you if SSH does not have the remote host's public key stored in the cache.
59             self.sendline("yes")
60             i = self.expect(["(?i)are you sure you want to continue connecting", original_prompts, "(?i)password", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
61         if i==2: # password
62             self.sendline(password)
63             i = self.expect(["(?i)are you sure you want to continue connecting", original_prompts, "(?i)password", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
64         if i==4:
65             self.sendline(terminal_type)
66             i = self.expect(["(?i)are you sure you want to continue connecting", original_prompts, "(?i)password", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
67
68         if i==0:
69             # This is weird. This should not happen twice in a row.
70             self.close()
71             return False
72         elif i==1: # can occur if you have a public key pair set to authenticate. 
73             ### TODO: May NOT be OK if expect() matched a false prompt.
74             pass
75         elif i==2: # password prompt again
76             # For incorrect passwords, some ssh servers will
77             # ask for the password again, others return 'denied' right away.
78             # If we get the password prompt again then this means
79             # we didn't get the password right the first time. 
80             self.close()
81             return False
82         elif i==3: # permission denied -- password was bad.
83             self.close()
84             return False
85         elif i==4: # terminal type again? WTF?
86             self.close()
87             return False
88         elif i==5: # Timeout
89             # This is tricky... presume that we are at the command-line prompt.
90             # It may be that the prompt was so weird that we couldn't match it.
91             pass
92         elif i==6: # Connection closed by remote host
93             self.close()
94             return False
95         else: # Unexpected 
96             self.close()
97             return False
98         # We appear to be in -- reset prompt to something more unique.
99         #if not self.set_unique_prompt():
100         #    self.close()
101         #    return False
102         return True
103
104     def logout (self):
105         """This sends exit. If there are stopped jobs then this sends exit twice.
106         """
107         self.sendline("exit")
108         index = self.expect([EOF, "(?i)there are stopped jobs"])
109         if index==1:
110             self.sendline("exit")
111             self.expect(EOF)
112
113     def prompt (self, timeout=20):
114         """This expects the prompt. This returns True if the prompt was matched.
115         This returns False if there was a timeout.
116         """
117         i = self.expect([self.PROMPT, TIMEOUT], timeout=timeout)
118         if i==1:
119             return False
120         return True
121         
122     def set_unique_prompt (self, optional_prompt=None):
123         """This attempts to reset the shell prompt to something more unique.
124             This makes it easier to match unambiguously.
125         """
126         if optional_prompt is not None:
127             self.prompt = optional_prompt
128         self.sendline (self.PROMPT_SET_SH) # sh-style
129         i = self.expect ([TIMEOUT, self.PROMPT], timeout=10)
130         if i == 0: # csh-style
131             self.sendline (self.PROMPT_SET_CSH)
132             i = self.expect ([TIMEOUT, self.PROMPT], timeout=10)
133             if i == 0:
134                 return 0
135         return 1
136