6 def __init__(self, host, user, identity, port, agent, forward_x11):
10 self.identity = identity
15 self.forward_x11 = forward_x11
17 # TODO: Investigate using http://nixos.org/nix/
23 if (not self.host or not self.user):
24 msg = "Can't resolve package management system. Insufficient data."
25 self._logger.error(msg)
26 raise RuntimeError(msg)
28 out = self.execute("cat /etc/issue")
30 if out.find("Fedora") == 0:
32 elif out.find("Debian") == 0 or out.find("Ubuntu") ==0:
35 msg = "Can't resolve package management system. Unknown OS."
36 self._logger.error(msg)
37 raise RuntimeError(msg)
42 def is_localhost(self):
43 return ( self.host or self.ip ) in ['localhost', '127.0.0.7', '::1']
45 # TODO: Investigate using http://nixos.org/nix/
46 def install(self, packages):
47 if not isinstance(packages, list):
51 self.execute("%s -y install %s" % (self.pm, p), sudo = True,
54 # TODO: Investigate using http://nixos.org/nix/
55 def uninstall(self, packages):
56 if not isinstance(packages, list):
60 self.execute("%s -y remove %s" % (self.pm, p), sudo = True,
63 def upload(self, src, dst):
64 """ Copy content to destination
66 src content to copy. Can be a local file, directory or text input
68 dst destination path on the remote host (remote is always self.host)
70 # If src is a string input
71 if not os.path.isfile(src) and not isdir:
72 # src is text input that should be uploaded as file
73 src = cStringIO.StringIO(src)
75 if not self.is_localhost:
76 # Build destination as <user>@<server>:<path>
77 dst = "%s@%s:%s" % (self.user, self.host, dst)
78 return self.copy(src, dst)
80 def download(self, src, dst):
81 if not self.is_localhost:
82 # Build destination as <user>@<server>:<path>
83 src = "%s@%s:%s" % (self.user, self.host or self.ip, src)
84 return self.copy(src, dst)
86 def is_alive(self, verbose = False):
91 out = self.execute("echo 'ALIVE'",
93 err_on_timeout = False,
97 self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
100 if out.strip().startswith('ALIVE'):
104 self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
107 def mkdir(self, path, clean = True):
112 "mkdir -p %s" % path,
117 def rmdir(self, path):
124 def copy(self, src, dst):
125 if self.is_localhost:
126 command = ["cp", "-R", src, dst]
127 p = subprocess.Popen(command, stdout=subprocess.PIPE,
128 stderr=subprocess.PIPE)
129 out, err = p.communicate()
131 (out, err), proc = eintr_retry(rcopy)(
135 identity_file = self.identity_file)
138 msg = "Error uploading to %s got:\n%s%s" %\
139 (self.host or self.ip, out, err)
140 self._logger.error(msg)
141 raise RuntimeError(msg)
145 def execute(self, command,
152 err_on_timeout = True,
153 connect_timeout = 30,
155 """ Notice that this invocation will block until the
156 execution finishes. If this is not the desired behavior,
157 use 'run' instead."""
159 if self.is_localhost:
162 for envkey, envval in env.iteritems():
163 export += '%s=%s ' % (envkey, envval)
164 command = export + command
167 command = "sudo " + command
169 p = subprocess.Popen(command, stdout=subprocess.PIPE,
170 stderr=subprocess.PIPE)
171 out, err = p.communicate()
173 (out, err), proc = eintr_retry(rexec)(
175 self.host or self.ip,
178 agent = self.forward_agent,
181 identity_file = self.identity_file,
183 x11 = self.enable_x11,
187 err_on_timeout = err_on_timeout,
188 connect_timeout = connect_timeout,
189 persistent = persistent)
192 msg = "Failed to execute command %s at node %s: %s %s" % \
193 (command, self.host or self.ip, out, err,)
194 self._logger.warn(msg)
195 raise RuntimeError(msg)
199 def run(self, command, home,
204 self._logger.info("Running %s", command)
208 if self.is_localhost:
212 stderr = ' ' + stderr
214 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
223 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c '%(command)s' " % {
224 'command' : daemon_command,
226 'sudo' : 'sudo -S' if sudo else '',
229 'gohome' : 'cd %s ; ' % home if home else '',
230 'create' : 'mkdir -p %s ; ' % home if create_home else '',
232 p = subprocess.Popen(command, stdout=subprocess.PIPE,
233 stderr=subprocess.PIPE)
234 out, err = p.communicate()
236 # Start process in a "daemonized" way, using nohup and heavy
237 # stdin/out redirection to avoid connection issues
238 (out,err), proc = rspawn(
242 stdin = stdin if stdin is not None else '/dev/null',
243 stdout = stdout if stdout else '/dev/null',
244 stderr = stderr if stderr else '/dev/null',
250 identity_file = self.file
254 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
258 def checkpid(self, path):
260 # NOTE: wait a bit for the pidfile to be created
261 pidtuple = rcheck_pid(
262 os.path.join(path, 'pid'),
267 identity_file = self.identity
272 def status(self, pid, ppid):
279 identity_file = self.identity
284 def kill(self, pid, ppid, sudo = False):
285 status = self.status(pid, ppid)
286 if status == RUNNING:
287 # kill by ppid+pid - SIGTERM first, then try SIGKILL
295 identity_file = self.identity
298 class SSHAPIFactory(object):
302 def get_api(cls, attributes):
303 host = attributes.get("hostname")
304 user = attributes.get("username")
305 identity = attributes.get("identity", "%s/.ssh/id_rsa" % os.environ['HOME'])
306 port = attributes.get("port", 22)
307 agent = attributes.get("agent", True)
308 forward_X11 = attributes.get("forwardX11", False)
310 key = cls.make_key(host, user, identity, port, agent, forward_X11)
311 api = self._apis.get(key)
314 api = SSHAPI(host, user, identity, port, agent, forward_X11)
315 self._apis[key] = api
320 def make_key(cls, *args):
321 skey = "".join(map(str, args))
322 return hashlib.md5(skey).hexdigest()