2 from neco.util.sshfuncs import eintr_retry, rexec, rcopy, rspawn, \
3 rcheckpid, rstatus, rkill, RUNNING, FINISHED
12 def __init__(self, host, user, port, identity, agent, forward_x11):
16 self.identity = identity
21 self.forward_x11 = forward_x11
25 self._logger = logging.getLogger("neco.linux.SSHApi")
27 # TODO: Investigate using http://nixos.org/nix/
33 if (not self.host or not self.user):
34 msg = "Can't resolve package management system. Insufficient data."
35 self._logger.error(msg)
36 raise RuntimeError(msg)
38 out, err = self.execute("cat /etc/issue")
40 if out.find("Fedora") == 0:
42 elif out.find("Debian") == 0 or out.find("Ubuntu") ==0:
45 msg = "Can't resolve package management system. Unknown OS."
46 self._logger.error(msg)
47 raise RuntimeError(msg)
52 def is_localhost(self):
53 return self.host in ['localhost', '127.0.0.7', '::1']
55 # TODO: Investigate using http://nixos.org/nix/
56 def install(self, packages):
57 if not isinstance(packages, list):
61 self.execute("%s -y install %s" % (self.pm, p), sudo = True,
64 # TODO: Investigate using http://nixos.org/nix/
65 def uninstall(self, packages):
66 if not isinstance(packages, list):
70 self.execute("%s -y remove %s" % (self.pm, p), sudo = True,
73 def upload(self, src, dst):
74 """ Copy content to destination
76 src content to copy. Can be a local file, directory or text input
78 dst destination path on the remote host (remote is always self.host)
80 # If source is a string input
81 if not os.path.isfile(src):
82 # src is text input that should be uploaded as file
83 # create a temporal file with the content to upload
84 f = tempfile.NamedTemporaryFile(delete=False)
89 if not self.is_localhost:
90 # Build destination as <user>@<server>:<path>
91 dst = "%s@%s:%s" % (self.user, self.host, dst)
93 ret = self.copy(src, dst)
97 def download(self, src, dst):
98 if not self.is_localhost:
99 # Build destination as <user>@<server>:<path>
100 src = "%s@%s:%s" % (self.user, self.host, src)
101 return self.copy(src, dst)
103 def is_alive(self, verbose = False):
104 if self.is_localhost:
108 (out, err) = self.execute("echo 'ALIVE'",
110 err_on_timeout = False,
114 self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
117 if out.strip().startswith('ALIVE'):
121 self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
124 def mkdir(self, path, clean = True):
129 "mkdir -p %s" % path,
134 def rmdir(self, path):
141 def copy(self, src, dst):
142 if self.is_localhost:
143 command = ["cp", "-R", src, dst]
144 p = subprocess.Popen(command, stdout=subprocess.PIPE,
145 stderr=subprocess.PIPE)
146 out, err = p.communicate()
148 (out, err), proc = eintr_retry(rcopy)(
152 identity = self.identity)
155 msg = "Error uploading to %s got:\n%s%s" %\
156 (self.host, out, err)
157 self._logger.error(msg)
158 raise RuntimeError(msg)
162 def execute(self, command,
169 err_on_timeout = True,
170 connect_timeout = 30,
172 """ Notice that this invocation will block until the
173 execution finishes. If this is not the desired behavior,
174 use 'run' instead."""
176 if self.is_localhost:
179 for envkey, envval in env.iteritems():
180 export += '%s=%s ' % (envkey, envval)
181 command = export + command
184 command = "sudo " + command
186 p = subprocess.Popen(command, stdout=subprocess.PIPE,
187 stderr=subprocess.PIPE)
188 out, err = p.communicate()
190 (out, err), proc = eintr_retry(rexec)(
198 identity = self.identity,
200 x11 = self.forward_x11,
204 err_on_timeout = err_on_timeout,
205 connect_timeout = connect_timeout,
206 persistent = persistent)
209 msg = "Failed to execute command %s at node %s: %s %s" % \
210 (command, self.host, out, err,)
211 self._logger.warn(msg)
212 raise RuntimeError(msg)
215 def run(self, command, home,
220 self._logger.info("Running %s", command)
224 if self.is_localhost:
228 stderr = ' ' + stderr
230 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
239 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c '%(command)s' " % {
240 'command' : daemon_command,
242 'sudo' : 'sudo -S' if sudo else '',
245 'gohome' : 'cd %s ; ' % home if home else '',
246 'create' : 'mkdir -p %s ; ' % home if create_home else '',
248 p = subprocess.Popen(command, stdout=subprocess.PIPE,
249 stderr=subprocess.PIPE)
250 out, err = p.communicate()
252 # Start process in a "daemonized" way, using nohup and heavy
253 # stdin/out redirection to avoid connection issues
254 (out,err), proc = rspawn(
258 stdin = stdin if stdin is not None else '/dev/null',
259 stdout = stdout if stdout else '/dev/null',
260 stderr = stderr if stderr else '/dev/null',
266 identity = self.identity
270 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
274 def checkpid(self, path):
276 # NOTE: wait a bit for the pidfile to be created
277 pidtuple = rcheckpid(
278 os.path.join(path, 'pid'),
283 identity = self.identity
288 def status(self, pid, ppid):
295 identity = self.identity
300 def kill(self, pid, ppid, sudo = False):
301 status = self.status(pid, ppid)
302 if status == RUNNING:
303 # kill by ppid+pid - SIGTERM first, then try SIGKILL
311 identity = self.identity
314 class SSHApiFactory(object):
318 def get_api(cls, host, user, port = 22, identity = None,
319 agent = True, forward_X11 = False):
321 key = cls.make_key(host, user, port, agent, forward_X11)
322 api = cls._apis.get(key)
325 api = SSHApi(host, user, port, identity, agent, forward_X11)
331 def make_key(cls, *args):
332 skey = "".join(map(str, args))
333 return hashlib.md5(skey).hexdigest()