93e895ec4987409ed34a60997c184f77b00ffe55
[nepi.git] / src / neco / resources / linux / ssh_api.py
1 import hashlib
2 import os
3 import re
4
5 class SSHAPI(object):
6     def __init__(self, host, user, identity, port, agent, forward_x11):
7         self.host = host
8         self.user = user
9         # ssh identity file
10         self.identity = identity
11         self.port = port
12         # use ssh agent
13         self.agent = agent
14         # forward X11 
15         self.forward_x11 = forward_x11
16
17     # TODO: Investigate using http://nixos.org/nix/
18     @property
19     def pm(self):
20         if self._pm:
21             return self._pm
22
23         if (not self.host or not self.user):
24             msg = "Can't resolve package management system. Insufficient data."
25             self._logger.error(msg)
26             raise RuntimeError(msg)
27
28         out = self.execute("cat /etc/issue")
29
30         if out.find("Fedora") == 0:
31             self._pm = "yum"
32         elif out.find("Debian") == 0 or out.find("Ubuntu") ==0:
33             self._pm = "apt-get"
34         else:
35             msg = "Can't resolve package management system. Unknown OS."
36             self._logger.error(msg)
37             raise RuntimeError(msg)
38
39         return self._pm
40
41     @property
42     def is_localhost(self):
43         return ( self.host or self.ip ) in ['localhost', '127.0.0.7', '::1']
44
45     # TODO: Investigate using http://nixos.org/nix/
46     def install(self, packages):
47         if not isinstance(packages, list):
48             packages = [packages]
49
50         for p in packages:
51             self.execute("%s -y install %s" % (self.pm, p), sudo = True, 
52                     tty = True)
53
54     # TODO: Investigate using http://nixos.org/nix/
55     def uninstall(self, packages):
56         if not isinstance(packages, list):
57             packages = [packages]
58
59         for p in packages:
60             self.execute("%s -y remove %s" % (self.pm, p), sudo = True, 
61                     tty = True)
62
63     def upload(self, src, dst):
64         """ Copy content to destination
65
66            src  content to copy. Can be a local file, directory or text input
67
68            dst  destination path on the remote host (remote is always self.host)
69         """
70         # If src is a string input 
71         if not os.path.isfile(src) and not isdir:
72             # src is text input that should be uploaded as file           
73             src = cStringIO.StringIO(src)
74
75         if not self.is_localhost:
76             # Build destination as <user>@<server>:<path>
77             dst = "%s@%s:%s" % (self.user, self.host, dst)
78         return self.copy(src, dst)
79
80     def download(self, src, dst):
81         if not self.is_localhost:
82             # Build destination as <user>@<server>:<path>
83             src = "%s@%s:%s" % (self.user, self.host or self.ip, src)
84         return self.copy(src, dst)
85         
86     def is_alive(self, verbose = False):
87         if self.is_localhost:
88             return True
89
90         try:
91             out = self.execute("echo 'ALIVE'",
92                 timeout = 60,
93                 err_on_timeout = False,
94                 persistent = False)
95         except:
96             if verbose:
97                 self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
98             return False
99
100         if out.strip().startswith('ALIVE'):
101             return True
102         else:
103             if verbose:
104                 self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
105             return False
106
107     def mkdir(self, path, clean = True):
108         if clean:
109             self.rmdir(path)
110
111         return self.execute(
112             "mkdir -p %s" % path,
113             timeout = 120,
114             retry = 3
115             )
116
117     def rmdir(self, path):
118         return self.execute(
119             "rm -rf %s" % path,
120             timeout = 120,
121             retry = 3
122             )
123
124     def copy(self, src, dst):
125         if self.is_localhost:
126             command = ["cp", "-R", src, dst]
127             p = subprocess.Popen(command, stdout=subprocess.PIPE, 
128                     stderr=subprocess.PIPE)
129             out, err = p.communicate()
130         else:
131             (out, err), proc = eintr_retry(rcopy)(
132                 src, dst, 
133                 port = self.port,
134                 agent = self.agent,
135                 identity_file = self.identity_file)
136
137             if proc.wait():
138                 msg = "Error uploading to %s got:\n%s%s" %\
139                         (self.host or self.ip, out, err)
140                 self._logger.error(msg)
141                 raise RuntimeError(msg)
142
143         return (out, err)
144
145     def execute(self, command,
146             sudo = False,
147             stdin = None, 
148             tty = False,
149             env = None,
150             timeout = None,
151             retry = 0,
152             err_on_timeout = True,
153             connect_timeout = 30,
154             persistent = True):
155         """ Notice that this invocation will block until the
156         execution finishes. If this is not the desired behavior,
157         use 'run' instead."""
158
159         if self.is_localhost:
160             if env:
161                 export = ''
162                 for envkey, envval in env.iteritems():
163                     export += '%s=%s ' % (envkey, envval)
164                 command = export + command
165
166             if sudo:
167                 command = "sudo " + command
168
169             p = subprocess.Popen(command, stdout=subprocess.PIPE, 
170                     stderr=subprocess.PIPE)
171             out, err = p.communicate()
172         else:
173             (out, err), proc = eintr_retry(rexec)(
174                     command, 
175                     self.host or self.ip, 
176                     self.user,
177                     port = self.port, 
178                     agent = self.forward_agent,
179                     sudo = sudo,
180                     stdin = stdin, 
181                     identity_file = self.identity_file,
182                     tty = tty,
183                     x11 = self.enable_x11,
184                     env = env,
185                     timeout = timeout,
186                     retry = retry,
187                     err_on_timeout = err_on_timeout,
188                     connect_timeout = connect_timeout,
189                     persistent = persistent)
190
191             if proc.wait():
192                 msg = "Failed to execute command %s at node %s: %s %s" % \
193                         (command, self.host or self.ip, out, err,)
194                 self._logger.warn(msg)
195                 raise RuntimeError(msg)
196
197         return (out, err)
198
199     def run(self, command, home, 
200             stdin = None, 
201             stdout = 'stdout', 
202             stderr = 'stderr', 
203             sudo = False):
204         self._logger.info("Running %s", command)
205         
206         pidfile = './pid',
207
208         if self.is_localhost:
209             if stderr == stdout:
210                 stderr = '&1'
211             else:
212                 stderr = ' ' + stderr
213             
214             daemon_command = '{ { %(command)s  > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
215                 'command' : command,
216                 'pidfile' : pidfile,
217                 
218                 'stdout' : stdout,
219                 'stderr' : stderr,
220                 'stdin' : stdin,
221             }
222             
223             cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c '%(command)s' " % {
224                     'command' : daemon_command,
225                     
226                     'sudo' : 'sudo -S' if sudo else '',
227                     
228                     'pidfile' : pidfile,
229                     'gohome' : 'cd %s ; ' % home if home else '',
230                     'create' : 'mkdir -p %s ; ' % home if create_home else '',
231                 }
232             p = subprocess.Popen(command, stdout=subprocess.PIPE, 
233                     stderr=subprocess.PIPE)
234             out, err = p.communicate()
235         else:
236             # Start process in a "daemonized" way, using nohup and heavy
237             # stdin/out redirection to avoid connection issues
238             (out,err), proc = rspawn(
239                 command,
240                 pidfile = pidfile,
241                 home = home,
242                 stdin = stdin if stdin is not None else '/dev/null',
243                 stdout = stdout if stdout else '/dev/null',
244                 stderr = stderr if stderr else '/dev/null',
245                 sudo = sudo,
246                 host = self.host,
247                 user = self.user,
248                 port = self.port,
249                 agent = self.agent,
250                 identity_file = self.file
251                 )
252             
253             if proc.wait():
254                 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
255
256         return (out, err)
257     
258     def checkpid(self, path):            
259         # Get PID/PPID
260         # NOTE: wait a bit for the pidfile to be created
261         pidtuple = rcheck_pid(
262             os.path.join(path, 'pid'),
263             host = self.host,
264             user = self.user,
265             port = self.port,
266             agent = self.agent,
267             identity_file = self.identity
268             )
269         
270         return pidtuple
271     
272     def status(self, pid, ppid):
273         status = rstatus(
274                 pid, ppid,
275                 host = self.host,
276                 user = self.user,
277                 port = self.port,
278                 agent = self.agent,
279                 identity_file = self.identity
280                 )
281            
282         return status
283     
284     def kill(self, pid, ppid, sudo = False):
285         status = self.status(pid, ppid)
286         if status == RUNNING:
287             # kill by ppid+pid - SIGTERM first, then try SIGKILL
288             rkill(
289                 pid, ppid,
290                 host = self.host,
291                 user = self.user,
292                 port = self.port,
293                 agent = self.agent,
294                 sudo = sudo,
295                 identity_file = self.identity
296                 )
297
298 class SSHAPIFactory(object):
299     _apis = dict()
300
301     @classmethod 
302     def get_api(cls, attributes):
303         host = attributes.get("hostname")
304         user = attributes.get("username")
305         identity = attributes.get("identity", "%s/.ssh/id_rsa" % os.environ['HOME'])
306         port = attributes.get("port", 22)
307         agent = attributes.get("agent", True)
308         forward_X11 = attributes.get("forwardX11", False)
309
310         key = cls.make_key(host, user, identity, port, agent, forward_X11)
311         api = self._apis.get(key)
312
313         if no api:
314             api = SSHAPI(host, user, identity, port, agent, forward_X11)
315             self._apis[key] = api
316
317         return api
318
319     @classmethod 
320     def make_key(cls, *args):
321         skey = "".join(map(str, args))
322         return hashlib.md5(skey).hexdigest()
323