1 from neco.execution.resource import Resource
2 from neco.util.sshfuncs import eintr_retry, rexec, rcopy, \
3 rspawn, rcheck_pid, rstatus, rkill, make_control_path, RUNNING
10 class LinuxNode(Resource):
11 def __init__(self, ec, guif):
12 super(LinuxNode, self).__init__(ec, guid)
17 self.identity_file = None
18 self.enable_x11 = False
19 self.forward_agent = True
21 # packet management system - either yum or apt for now...
26 self._logger = logging.getLogger("neco.resources.base.LinuxNode.%s" %\
28 self._logger.setLevel(getattr(logging, loglevel.upper()))
30 # For ssh connections we use the ControlMaster option which
31 # allows us to decrease the number of open ssh network connections.
32 # Subsequent ssh connections will reuse a same master connection.
33 # This might pose a problem when using X11 and ssh-agent, since
34 # display and agent forwarded will be those of the first connection,
35 # which created the master.
36 # To avoid reusing a master created by a previous LinuxNode instance,
37 # we explicitly erase the ControlPath socket.
38 control_path = make_control_path(self.user, self.host, self.port)
40 os.remove(control_path)
49 if (not (self.host or self.ip) or not self.user):
50 msg = "Can't resolve package management system. Insufficient data."
51 self._logger.error(msg)
52 raise RuntimeError(msg)
54 out = self.execute("cat /etc/issue")
56 if out.find("Fedora") == 0:
58 elif out.find("Debian") == 0 or out.find("Ubuntu") ==0:
61 msg = "Can't resolve package management system. Unknown OS."
62 self._logger.error(msg)
63 raise RuntimeError(msg)
68 def is_localhost(self):
69 return ( self.host or self.ip ) in ['localhost', '127.0.0.7', '::1']
71 def install(self, packages):
72 if not isinstance(packages, list):
76 self.execute("%s -y install %s" % (self.pm, p), sudo = True,
79 def uninstall(self, packages):
80 if not isinstance(packages, list):
84 self.execute("%s -y remove %s" % (self.pm, p), sudo = True,
87 def upload(self, src, dst):
88 if not os.path.isfile(src):
89 src = cStringIO.StringIO(src)
91 if not self.is_localhost:
92 # Build destination as <user>@<server>:<path>
93 dst = "%s@%s:%s" % (self.user, self.host or self.ip, dst)
94 return self.copy(src, dst)
96 def download(self, src, dst):
97 if not self.is_localhost:
98 # Build destination as <user>@<server>:<path>
99 src = "%s@%s:%s" % (self.user, self.host or self.ip, src)
100 return self.copy(src, dst)
102 def is_alive(self, verbose = False):
103 if self.is_localhost:
107 out = self.execute("echo 'ALIVE'",
109 err_on_timeout = False,
113 self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
116 if out.strip().startswith('ALIVE'):
120 self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
123 def mkdir(self, path, clean = True):
128 "mkdir -p %s" % path,
133 def rmdir(self, path):
140 def copy(self, src, dst):
141 if self.is_localhost:
142 command = ["cp", "-R", src, dst]
143 p = subprocess.Popen(command, stdout=subprocess.PIPE,
144 stderr=subprocess.PIPE)
145 out, err = p.communicate()
147 (out, err), proc = eintr_retry(rcopy)(
151 identity_file = self.identity_file)
154 msg = "Error uploading to %s got:\n%s%s" %\
155 (self.host or self.ip, out, err)
156 self._logger.error(msg)
157 raise RuntimeError(msg)
161 def execute(self, command,
168 err_on_timeout = True,
169 connect_timeout = 30,
171 """ Notice that this invocation will block until the
172 execution finishes. If this is not the desired behavior,
173 use 'run' instead."""
175 if self.is_localhost:
178 for envkey, envval in env.iteritems():
179 export += '%s=%s ' % (envkey, envval)
180 command = export + command
183 command = "sudo " + command
185 p = subprocess.Popen(command, stdout=subprocess.PIPE,
186 stderr=subprocess.PIPE)
187 out, err = p.communicate()
189 (out, err), proc = eintr_retry(rexec)(
191 self.host or self.ip,
194 agent = self.forward_agent,
197 identity_file = self.identity_file,
199 x11 = self.enable_x11,
203 err_on_timeout = err_on_timeout,
204 connect_timeout = connect_timeout,
205 persistent = persistent)
208 msg = "Failed to execute command %s at node %s: %s %s" % \
209 (command, self.host or self.ip, out, err,)
210 self._logger.warn(msg)
211 raise RuntimeError(msg)
215 def run(self, command, home,
220 self._logger.info("Running %s", command)
224 if self.is_localhost:
228 stderr = ' ' + stderr
230 daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
239 cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c '%(command)s' " % {
240 'command' : daemon_command,
242 'sudo' : 'sudo -S' if sudo else '',
245 'gohome' : 'cd %s ; ' % home if home else '',
246 'create' : 'mkdir -p %s ; ' % home if create_home else '',
248 p = subprocess.Popen(command, stdout=subprocess.PIPE,
249 stderr=subprocess.PIPE)
250 out, err = p.communicate()
252 # Start process in a "daemonized" way, using nohup and heavy
253 # stdin/out redirection to avoid connection issues
254 (out,err), proc = rspawn(
258 stdin = stdin if stdin is not None else '/dev/null',
259 stdout = stdout if stdout else '/dev/null',
260 stderr = stderr if stderr else '/dev/null',
265 agent = self.forward_agent,
266 identity_file = self.identity_file
270 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
274 def checkpid(self, path):
276 # NOTE: wait a bit for the pidfile to be created
277 pidtuple = rcheck_pid(
278 os.path.join(path, 'pid'),
282 agent = self.forward_agent,
283 identity_file = self.identity_file
288 def status(self, pid, ppid):
294 agent = self.forward_agent,
295 identity_file = self.identity_file
300 def kill(self, pid, ppid, sudo = False):
301 status = self.status(pid, ppid)
302 if status == RUNNING:
303 # kill by ppid+pid - SIGTERM first, then try SIGKILL
309 agent = self.forward_agent,
311 identity_file = self.identity_file