1 from neco.execution.attribute import Attribute, Flags
2 from neco.execution.resource import ResourceManager, clsinit, ResourceState
3 from neco.resources.linux import rpmfuncs, debfuncs
4 from neco.util import sshfuncs, execfuncs
15 # TODO: Verify files and dirs exists already
16 # TODO: Blacklist node!
21 class LinuxNode(ResourceManager):
25 def _register_attributes(cls):
26 hostname = Attribute("hostname", "Hostname of the machine",
27 flags = Flags.ExecReadOnly)
29 username = Attribute("username", "Local account username",
30 flags = Flags.Credential)
32 port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
34 home = Attribute("home",
35 "Experiment home directory to store all experiment related files",
36 flags = Flags.ExecReadOnly)
38 identity = Attribute("identity", "SSH identity file",
39 flags = Flags.Credential)
41 server_key = Attribute("serverKey", "Server public key",
42 flags = Flags.ExecReadOnly)
44 clean_home = Attribute("cleanHome", "Remove all files and directories " + \
45 " from home folder before starting experiment",
46 flags = Flags.ExecReadOnly)
48 clean_processes = Attribute("cleanProcesses",
49 "Kill all running processes before starting experiment",
50 flags = Flags.ExecReadOnly)
52 tear_down = Attribute("tearDown", "Bash script to be executed before " + \
53 "releasing the resource",
54 flags = Flags.ExecReadOnly)
56 cls._register_attribute(hostname)
57 cls._register_attribute(username)
58 cls._register_attribute(port)
59 cls._register_attribute(home)
60 cls._register_attribute(identity)
61 cls._register_attribute(server_key)
62 cls._register_attribute(clean_home)
63 cls._register_attribute(clean_processes)
64 cls._register_attribute(tear_down)
66 def __init__(self, ec, guid):
67 super(LinuxNode, self).__init__(ec, guid)
70 # lock to avoid concurrency issues on methods used by applications
71 self._lock = threading.Lock()
73 self._logger = logging.getLogger("neco.linux.Node.%d " % self.guid)
77 return self.get("home") or "/tmp"
81 exp_dir = os.path.join(self.home, self.ec.exp_id)
82 return exp_dir if exp_dir.startswith('/') else "${HOME}/"
86 node_dir = "node-%d" % self.guid
87 return os.path.join(self.exp_dir, node_dir)
94 if (not self.get("hostname") or not self.get("username")):
95 msg = "Can't resolve OS for guid %d. Insufficient data." % self.guid
96 self.logger.error(msg)
97 raise RuntimeError, msg
99 (out, err), proc = self.execute("cat /etc/issue")
101 if err and proc.poll():
102 msg = "Error detecting OS for host %s. err: %s " % (self.get("hostname"), err)
103 self.logger.error(msg)
104 raise RuntimeError, msg
106 if out.find("Fedora release 12") == 0:
108 elif out.find("Fedora release 14") == 0:
110 elif out.find("Debian") == 0:
112 elif out.find("Ubuntu") ==0:
115 msg = "Unsupported OS %s for host %s" % (out, self.get("hostname"))
116 self.logger.error(msg)
117 raise RuntimeError, msg
123 return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
125 def provision(self, filters = None):
126 if not self.is_alive():
127 self._state = ResourceState.FAILED
128 self.logger.error("Deploy failed. Unresponsive node")
131 if self.get("cleanProcesses"):
132 self.clean_processes()
134 if self.get("cleanHome"):
137 self.mkdir(self.node_dir)
139 super(LinuxNode, self).provision()
142 if self.state == ResourceState.NEW:
146 # Node needs to wait until all associated interfaces are
147 # ready before it can finalize deployment
148 from neco.resources.linux.interface import LinuxInterface
149 ifaces = self.get_connected(LinuxInterface.rtype())
151 if iface.state < ResourceState.READY:
152 self.ec.schedule(DELAY, self.deploy)
155 super(LinuxNode, self).deploy()
158 tear_down = self.get("tearDown")
160 self.execute(tear_down)
162 super(LinuxNode, self).release()
164 def valid_connection(self, guid):
168 def clean_processes(self):
169 self.logger.info("Cleaning up processes")
171 cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
172 "sudo -S killall python tcpdump || /bin/true ; " +
173 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
174 "sudo -S killall -u root || /bin/true ; " +
175 "sudo -S killall -u root || /bin/true ; ")
179 (out, err), proc = self.execute(cmd)
181 def clean_home(self):
182 self.logger.info("Cleaning up home")
184 cmd = ("cd %s ; " % self.home +
185 "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)"+
186 " -execdir rm -rf {} + ")
190 (out, err), proc = self.execute(cmd)
192 def upload(self, src, dst, text = False):
193 """ Copy content to destination
195 src content to copy. Can be a local file, directory or a list of files
197 dst destination path on the remote host (remote is always self.host)
199 text src is text input, it must be stored into a temp file before uploading
201 # If source is a string input
203 if text and not os.path.isfile(src):
204 # src is text input that should be uploaded as file
205 # create a temporal file with the content to upload
206 f = tempfile.NamedTemporaryFile(delete=False)
211 if not self.localhost:
212 # Build destination as <user>@<server>:<path>
213 dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
215 result = self.copy(src, dst)
223 def download(self, src, dst):
224 if not self.localhost:
225 # Build destination as <user>@<server>:<path>
226 src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
227 return self.copy(src, dst)
229 def install_packages(self, packages, home = None):
230 home = home or self.node_dir
233 if self.os in ["f12", "f14"]:
234 cmd = rpmfuncs.install_packages_command(self.os, packages)
235 elif self.os in ["debian", "ubuntu"]:
236 cmd = debfuncs.install_packages_command(self.os, packages)
238 msg = "Error installing packages. OS not known for host %s " % (
239 self.get("hostname"))
240 self.logger.error(msg)
241 raise RuntimeError, msg
245 (out, err), proc = self.run_and_wait(cmd, home,
246 pidfile = "instpkg_pid",
247 stdout = "instpkg_log",
248 stderr = "instpkg_err",
249 raise_on_error = True)
251 return (out, err), proc
253 def remove_packages(self, packages, home = None):
254 home = home or self.node_dir
257 if self.os in ["f12", "f14"]:
258 cmd = rpmfuncs.remove_packages_command(self.os, packages)
259 elif self.os in ["debian", "ubuntu"]:
260 cmd = debfuncs.remove_packages_command(self.os, packages)
262 msg = "Error removing packages. OS not known for host %s " % (
263 self.get("hostname"))
264 self.logger.error(msg)
265 raise RuntimeError, msg
269 (out, err), proc = self.run_and_wait(cmd, home,
270 pidfile = "rmpkg_pid",
271 stdout = "rmpkg_log",
272 stderr = "rmpkg_err",
273 raise_on_error = True)
275 return (out, err), proc
277 def mkdir(self, path, clean = False):
281 return self.execute("mkdir -p %s" % path)
283 def rmdir(self, path):
284 return self.execute("rm -rf %s" % path)
286 def run_and_wait(self, command,
293 raise_on_error = False):
294 """ runs a command in background on the remote host, but waits
295 until the command finishes execution.
296 This is more robust than doing a simple synchronized 'execute',
297 since in the remote host the command can continue to run detached
298 even if network disconnections occur
300 # run command in background in remote host
301 (out, err), proc = self.run(command, home,
308 # check no errors occurred
309 if proc.poll() and err:
310 msg = " Failed to run command %s on host %s" % (
311 command, self.get("hostname"))
312 self.logger.error(msg)
314 raise RuntimeError, msg
316 # Wait for pid file to be generated
317 pid, ppid = self.wait_pid(
320 raise_on_error = raise_on_error)
322 # wait until command finishes to execute
323 self.wait_run(pid, ppid)
325 # check if execution errors occurred
326 (out, err), proc = self.check_output(home, stderr)
329 msg = "Error while running command %s on host %s. error output: %s" % (
330 command, self.get("hostname"), out)
332 msg += " . err: %s" % err
334 self.logger.error(msg)
336 raise RuntimeError, msg
338 return (out, err), proc
340 def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
341 """ Waits until the pid file for the command is generated,
342 and returns the pid and ppid of the process """
346 pidtuple = self.checkpid(home = home, pidfile = pidfile)
353 delay = min(30,delay*1.2)
355 msg = " Failed to get pid for pidfile %s/%s on host %s" % (
356 home, pidfile, self.get("hostname"))
357 self.logger.error(msg)
359 raise RuntimeError, msg
363 def wait_run(self, pid, ppid, trial = 0):
364 """ wait for a remote process to finish execution """
370 status = self.status(pid, ppid)
372 if status is sshfuncs.FINISHED:
374 elif status is not sshfuncs.RUNNING:
376 time.sleep(delay*(5.5+random.random()))
383 time.sleep(delay*(0.5+random.random()))
384 delay = min(30,delay*1.2)
387 def check_output(self, home, filename):
388 """ checks file content """
389 (out, err), proc = self.execute("cat %s" %
390 os.path.join(home, filename))
391 return (out, err), proc
399 (out, err), proc = self.execute("echo 'ALIVE'")
402 trace = traceback.format_exc()
403 self.logger.warn("Unresponsive host %s. got:\n out: %s err: %s\n traceback: %s",
404 self.get("hostname"), out, err, trace)
407 if out.strip().startswith('ALIVE'):
410 self.logger.warn("Unresponsive host %s. got:\n%s%s",
411 self.get("hostname"), out, err)
415 #if self.check_bad_host(out,err):
418 def copy(self, src, dst):
420 (out, err), proc = execfuncs.lcopy(source, dest,
423 (out, err), proc = self.safe_retry(sshfuncs.rcopy)(
425 port = self.get("port"),
426 identity = self.get("identity"),
427 server_key = self.get("serverKey"),
430 return (out, err), proc
432 def execute(self, command,
440 err_on_timeout = True,
441 connect_timeout = 30,
444 """ Notice that this invocation will block until the
445 execution finishes. If this is not the desired behavior,
446 use 'run' instead."""
449 (out, err), proc = execfuncs.lexec(command,
455 (out, err), proc = self.safe_retry(sshfuncs.rexec)(
457 host = self.get("hostname"),
458 user = self.get("username"),
459 port = self.get("port"),
463 identity = self.get("identity"),
464 server_key = self.get("serverKey"),
467 forward_x11 = forward_x11,
470 err_on_timeout = err_on_timeout,
471 connect_timeout = connect_timeout,
472 persistent = persistent
475 return (out, err), proc
477 def run(self, command,
486 self.logger.info("Running %s", command)
489 (out, err), proc = execfuncs.lspawn(command, pidfile,
494 create_home = create_home,
498 # Start process in a "daemonized" way, using nohup and heavy
499 # stdin/out redirection to avoid connection issues
500 (out,err), proc = self.safe_retry(sshfuncs.rspawn)(
504 create_home = create_home,
505 stdin = stdin if stdin is not None else '/dev/null',
506 stdout = stdout if stdout else '/dev/null',
507 stderr = stderr if stderr else '/dev/null',
509 host = self.get("hostname"),
510 user = self.get("username"),
511 port = self.get("port"),
513 identity = self.get("identity"),
514 server_key = self.get("serverKey")
517 return (out, err), proc
519 def checkpid(self, home = ".", pidfile = "pid"):
521 pidtuple = execfuncs.lcheckpid(os.path.join(home, pidfile))
523 pidtuple = sshfuncs.rcheckpid(
524 os.path.join(home, pidfile),
525 host = self.get("hostname"),
526 user = self.get("username"),
527 port = self.get("port"),
529 identity = self.get("identity"),
530 server_key = self.get("serverKey")
535 def status(self, pid, ppid):
537 status = execfuncs.lstatus(pid, ppid)
539 status = sshfuncs.rstatus(
541 host = self.get("hostname"),
542 user = self.get("username"),
543 port = self.get("port"),
545 identity = self.get("identity"),
546 server_key = self.get("serverKey")
551 def kill(self, pid, ppid, sudo = False):
554 status = self.status(pid, ppid)
556 if status == sshfuncs.RUNNING:
558 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
560 (out, err), proc = self.safe_retry(sshfuncs.rkill)(
562 host = self.get("hostname"),
563 user = self.get("username"),
564 port = self.get("port"),
567 identity = self.get("identity"),
568 server_key = self.get("serverKey")
570 return (out, err), proc
572 def check_bad_host(self, out, err):
573 badre = re.compile(r'(?:'
574 r'|Error: disk I/O error'
577 return badre.search(out) or badre.search(err)
581 self.logger.warn("Blacklisting malfunctioning node %s", self.hostname)
583 #util.appendBlacklist(self.hostname)
585 def safe_retry(self, func):
586 """Retries a function invocation using a lock"""
588 @functools.wraps(func)
590 fail_msg = " Failed to execute function %s(%s, %s) at host %s" % (
591 func.__name__, p, kw, self.get("hostname"))
592 retry = kw.pop("_retry", False)
593 wlock = kw.pop("_with_lock", False)
597 for i in xrange(0 if retry else 4):
601 (out, err), proc = func(*p, **kw)
603 (out, err), proc = func(*p, **kw)
610 self.logger.error("%s. out: %s error: %s", fail_msg, out, err)
612 except RuntimeError, e:
614 self.logger.error("%s. error: %s", fail_msg, e.args)
615 return (out, err), proc