little changes
[nepi.git] / src / neco / resources / linux / ssh_api.py
1
2 from neco.util.sshfuncs import eintr_retry, rexec, rcopy, rspawn, \
3         rcheckpid, rstatus, rkill, RUNNING, FINISHED 
4
5 import hashlib
6 import logging
7 import os
8 import re
9 import tempfile
10
11 class SSHApi(object):
12     def __init__(self, host, user, port, identity, agent, forward_x11):
13         self.host = host
14         self.user = user
15         # ssh identity file
16         self.identity = identity
17         self.port = port
18         # use ssh agent
19         self.agent = agent
20         # forward X11 
21         self.forward_x11 = forward_x11
22
23         self._pm = None
24         
25         self._logger = logging.getLogger("neco.linux.SSHApi")
26
27     # TODO: Investigate using http://nixos.org/nix/
28     @property
29     def pm(self):
30         if self._pm:
31             return self._pm
32
33         if (not self.host or not self.user):
34             msg = "Can't resolve package management system. Insufficient data."
35             self._logger.error(msg)
36             raise RuntimeError(msg)
37
38         out, err = self.execute("cat /etc/issue")
39
40         if out.find("Fedora") == 0:
41             self._pm = "yum"
42         elif out.find("Debian") == 0 or out.find("Ubuntu") ==0:
43             self._pm = "apt-get"
44         else:
45             msg = "Can't resolve package management system. Unknown OS."
46             self._logger.error(msg)
47             raise RuntimeError(msg)
48
49         return self._pm
50
51     @property
52     def is_localhost(self):
53         return self.host in ['localhost', '127.0.0.7', '::1']
54
55     # TODO: Investigate using http://nixos.org/nix/
56     def install(self, packages):
57         if not isinstance(packages, list):
58             packages = [packages]
59
60         for p in packages:
61             self.execute("%s -y install %s" % (self.pm, p), sudo = True, 
62                     tty = True)
63
64     # TODO: Investigate using http://nixos.org/nix/
65     def uninstall(self, packages):
66         if not isinstance(packages, list):
67             packages = [packages]
68
69         for p in packages:
70             self.execute("%s -y remove %s" % (self.pm, p), sudo = True, 
71                     tty = True)
72
73     def upload(self, src, dst):
74         """ Copy content to destination
75
76            src  content to copy. Can be a local file, directory or text input
77
78            dst  destination path on the remote host (remote is always self.host)
79         """
80         # If source is a string input 
81         if not os.path.isfile(src):
82             # src is text input that should be uploaded as file
83             # create a temporal file with the content to upload
84             f = tempfile.NamedTemporaryFile(delete=False)
85             f.write(src)
86             f.close()
87             src = f.name
88
89         if not self.is_localhost:
90             # Build destination as <user>@<server>:<path>
91             dst = "%s@%s:%s" % (self.user, self.host, dst)
92
93         ret = self.copy(src, dst)
94
95         return ret
96
97     def download(self, src, dst):
98         if not self.is_localhost:
99             # Build destination as <user>@<server>:<path>
100             src = "%s@%s:%s" % (self.user, self.host, src)
101         return self.copy(src, dst)
102         
103     def is_alive(self, verbose = False):
104         if self.is_localhost:
105             return True
106
107         try:
108             (out, err) = self.execute("echo 'ALIVE'",
109                 timeout = 60,
110                 err_on_timeout = False,
111                 persistent = False)
112         except:
113             if verbose:
114                 self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
115             return False
116
117         if out.strip().startswith('ALIVE'):
118             return True
119         else:
120             if verbose:
121                 self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
122             return False
123
124     def mkdir(self, path, clean = True):
125         if clean:
126             self.rmdir(path)
127
128         return self.execute(
129             "mkdir -p %s" % path,
130             timeout = 120,
131             retry = 3
132             )
133
134     def rmdir(self, path):
135         return self.execute(
136             "rm -rf %s" % path,
137             timeout = 120,
138             retry = 3
139             )
140
141     def copy(self, src, dst):
142         if self.is_localhost:
143             command = ["cp", "-R", src, dst]
144             p = subprocess.Popen(command, stdout=subprocess.PIPE, 
145                     stderr=subprocess.PIPE)
146             out, err = p.communicate()
147         else:
148             (out, err), proc = eintr_retry(rcopy)(
149                 src, dst, 
150                 port = self.port,
151                 agent = self.agent,
152                 identity = self.identity)
153
154             if proc.wait():
155                 msg = "Error uploading to %s got:\n%s%s" %\
156                         (self.host, out, err)
157                 self._logger.error(msg)
158                 raise RuntimeError(msg)
159
160         return (out, err)
161
162     def execute(self, command,
163             sudo = False,
164             stdin = None, 
165             tty = False,
166             env = None,
167             timeout = None,
168             retry = 0,
169             err_on_timeout = True,
170             connect_timeout = 30,
171             persistent = True):
172         """ Notice that this invocation will block until the
173         execution finishes. If this is not the desired behavior,
174         use 'run' instead."""
175
176         if self.is_localhost:
177             if env:
178                 export = ''
179                 for envkey, envval in env.iteritems():
180                     export += '%s=%s ' % (envkey, envval)
181                 command = export + command
182
183             if sudo:
184                 command = "sudo " + command
185
186             p = subprocess.Popen(command, stdout=subprocess.PIPE, 
187                     stderr=subprocess.PIPE)
188             out, err = p.communicate()
189         else:
190             (out, err), proc = eintr_retry(rexec)(
191                     command, 
192                     self.host, 
193                     self.user,
194                     port = self.port, 
195                     agent = self.agent,
196                     sudo = sudo,
197                     stdin = stdin, 
198                     identity = self.identity,
199                     tty = tty,
200                     x11 = self.forward_x11,
201                     env = env,
202                     timeout = timeout,
203                     retry = retry,
204                     err_on_timeout = err_on_timeout,
205                     connect_timeout = connect_timeout,
206                     persistent = persistent)
207
208             if proc.wait():
209                 msg = "Failed to execute command %s at node %s: %s %s" % \
210                         (command, self.host, out, err,)
211                 self._logger.warn(msg)
212                 raise RuntimeError(msg)
213         return (out, err)
214
215     def run(self, command, home, 
216             stdin = None, 
217             stdout = 'stdout', 
218             stderr = 'stderr', 
219             sudo = False):
220         self._logger.info("Running %s", command)
221         
222         pidfile = './pid'
223
224         if self.is_localhost:
225             if stderr == stdout:
226                 stderr = '&1'
227             else:
228                 stderr = ' ' + stderr
229             
230             daemon_command = '{ { %(command)s  > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
231                 'command' : command,
232                 'pidfile' : pidfile,
233                 
234                 'stdout' : stdout,
235                 'stderr' : stderr,
236                 'stdin' : stdin,
237             }
238             
239             cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c '%(command)s' " % {
240                     'command' : daemon_command,
241                     
242                     'sudo' : 'sudo -S' if sudo else '',
243                     
244                     'pidfile' : pidfile,
245                     'gohome' : 'cd %s ; ' % home if home else '',
246                     'create' : 'mkdir -p %s ; ' % home if create_home else '',
247                 }
248             p = subprocess.Popen(command, stdout=subprocess.PIPE, 
249                     stderr=subprocess.PIPE)
250             out, err = p.communicate()
251         else:
252             # Start process in a "daemonized" way, using nohup and heavy
253             # stdin/out redirection to avoid connection issues
254             (out,err), proc = rspawn(
255                 command,
256                 pidfile = pidfile,
257                 home = home,
258                 stdin = stdin if stdin is not None else '/dev/null',
259                 stdout = stdout if stdout else '/dev/null',
260                 stderr = stderr if stderr else '/dev/null',
261                 sudo = sudo,
262                 host = self.host,
263                 user = self.user,
264                 port = self.port,
265                 agent = self.agent,
266                 identity = self.identity
267                 )
268             
269             if proc.wait():
270                 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
271
272         return (out, err)
273     
274     def checkpid(self, path):            
275         # Get PID/PPID
276         # NOTE: wait a bit for the pidfile to be created
277         pidtuple = rcheckpid(
278             os.path.join(path, 'pid'),
279             host = self.host,
280             user = self.user,
281             port = self.port,
282             agent = self.agent,
283             identity = self.identity
284             )
285         
286         return pidtuple
287     
288     def status(self, pid, ppid):
289         status = rstatus(
290                 pid, ppid,
291                 host = self.host,
292                 user = self.user,
293                 port = self.port,
294                 agent = self.agent,
295                 identity = self.identity
296                 )
297            
298         return status
299     
300     def kill(self, pid, ppid, sudo = False):
301         status = self.status(pid, ppid)
302         if status == RUNNING:
303             # kill by ppid+pid - SIGTERM first, then try SIGKILL
304             rkill(
305                 pid, ppid,
306                 host = self.host,
307                 user = self.user,
308                 port = self.port,
309                 agent = self.agent,
310                 sudo = sudo,
311                 identity = self.identity
312                 )
313
314 class SSHApiFactory(object):
315     _apis = dict()
316
317     @classmethod 
318     def get_api(cls, host, user, port = 22, identity = None, 
319             agent = True, forward_X11 = False):
320  
321         key = cls.make_key(host, user, port, agent, forward_X11)
322         api = cls._apis.get(key)
323
324         if not api:
325             api = SSHApi(host, user, port, identity, agent, forward_X11)
326             cls._apis[key] = api
327
328         return api
329
330     @classmethod 
331     def make_key(cls, *args):
332         skey = "".join(map(str, args))
333         return hashlib.md5(skey).hexdigest()
334