2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
22 from nepi.resources.linux import rpmfuncs, debfuncs
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
34 # TODO: Verify files and dirs exists already
35 # TODO: Blacklist nodes!
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!!
39 reschedule_delay = "0.5s"
43 Error codes that the rexitcode function can return if unable to
44 check the exit code of a spawned process
53 Supported flavors of Linux OS
62 class LinuxNode(ResourceManager):
64 .. class:: Class Args :
66 :param ec: The Experiment controller
67 :type ec: ExperimentController
68 :param guid: guid of the RM
73 There are different ways in which commands can be executed using the
74 LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
79 * 'execute' (blocking mode) :
81 HOW IT WORKS: 'execute', forks a process and run the
82 command, synchronously, attached to the terminal, in
84 The execute method will block until the command returns
85 the result on 'out', 'err' (so until it finishes executing).
87 USAGE: short-lived commands that must be executed attached
88 to a terminal and in foreground, for which it IS necessary
89 to block until the command has finished (e.g. if you want
90 to run 'ls' or 'cat').
92 * 'execute' (NON blocking mode - blocking = False) :
94 HOW IT WORKS: Same as before, except that execute method
95 will return immediately (even if command still running).
97 USAGE: long-lived commands that must be executed attached
98 to a terminal and in foreground, but for which it is not
99 necessary to block until the command has finished. (e.g.
100 start an application using X11 forwarding)
104 HOW IT WORKS: Connects to the host ( using SSH if remote)
105 and launches the command in background, detached from any
106 terminal (daemonized), and returns. The command continues to
107 run remotely, but since it is detached from the terminal,
108 its pipes (stdin, stdout, stderr) can't be redirected to the
109 console (as normal non detached processes would), and so they
110 are explicitly redirected to files. The pidfile is created as
111 part of the process of launching the command. The pidfile
112 holds the pid and ppid of the process forked in background,
113 so later on it is possible to check whether the command is still
116 USAGE: long-lived commands that can run detached in background,
117 for which it is NOT necessary to block (wait) until the command
118 has finished. (e.g. start an application that is not using X11
119 forwarding. It can run detached and remotely in background)
123 HOW IT WORKS: Similar to 'run' except that it 'blocks' until
124 the command has finished execution. It also checks whether
125 errors occurred during runtime by reading the exitcode file,
126 which contains the exit code of the command that was run
127 (checking stderr only is not always reliable since many
128 commands throw debugging info to stderr and the only way to
129 automatically know whether an error really happened is to
130 check the process exit code).
132 Another difference with respect to 'run', is that instead
133 of directly executing the command as a bash command line,
134 it uploads the command to a bash script and runs the script.
135 This allows to use the bash script to debug errors, since
136 it remains at the remote host and can be run manually to
139 USAGE: medium-lived commands that can run detached in
140 background, for which it IS necessary to block (wait) until
141 the command has finished. (e.g. Package installation,
142 source compilation, file download, etc)
148 def _register_attributes(cls):
149 hostname = Attribute("hostname", "Hostname of the machine",
150 flags = Flags.ExecReadOnly)
152 username = Attribute("username", "Local account username",
153 flags = Flags.Credential)
155 port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
157 home = Attribute("home",
158 "Experiment home directory to store all experiment related files",
159 flags = Flags.ExecReadOnly)
161 identity = Attribute("identity", "SSH identity file",
162 flags = Flags.Credential)
164 server_key = Attribute("serverKey", "Server public key",
165 flags = Flags.ExecReadOnly)
167 clean_home = Attribute("cleanHome", "Remove all files and directories " + \
168 " from home folder before starting experiment",
169 flags = Flags.ExecReadOnly)
171 clean_processes = Attribute("cleanProcesses",
172 "Kill all running processes before starting experiment",
173 flags = Flags.ExecReadOnly)
175 tear_down = Attribute("tearDown", "Bash script to be executed before " + \
176 "releasing the resource",
177 flags = Flags.ExecReadOnly)
179 cls._register_attribute(hostname)
180 cls._register_attribute(username)
181 cls._register_attribute(port)
182 cls._register_attribute(home)
183 cls._register_attribute(identity)
184 cls._register_attribute(server_key)
185 cls._register_attribute(clean_home)
186 cls._register_attribute(clean_processes)
187 cls._register_attribute(tear_down)
189 def __init__(self, ec, guid):
190 super(LinuxNode, self).__init__(ec, guid)
193 # lock to avoid concurrency issues on methods used by applications
194 self._lock = threading.Lock()
196 def log_message(self, msg):
197 return " guid %d - host %s - %s " % (self.guid,
198 self.get("hostname"), msg)
202 return self.get("home") or ""
206 return os.path.join(self.home, self.ec.exp_id)
210 node_home = "node-%d" % self.guid
211 return os.path.join(self.exp_home, node_home)
218 if (not self.get("hostname") or not self.get("username")):
219 msg = "Can't resolve OS, insufficient data "
221 raise RuntimeError, msg
223 (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
225 if err and proc.poll():
226 msg = "Error detecting OS "
227 self.error(msg, out, err)
228 raise RuntimeError, "%s - %s - %s" %( msg, out, err )
230 if out.find("Fedora release 12") == 0:
231 self._os = OSType.FEDORA_12
232 elif out.find("Fedora release 14") == 0:
233 self._os = OSType.FEDORA_14
234 elif out.find("Debian") == 0:
235 self._os = OSType.DEBIAN
236 elif out.find("Ubuntu") ==0:
237 self._os = OSType.UBUNTU
239 msg = "Unsupported OS"
241 raise RuntimeError, "%s - %s " %( msg, out )
247 return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
250 if not self.is_alive():
251 self._state = ResourceState.FAILED
252 msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
254 raise RuntimeError, msg
256 if self.get("cleanProcesses"):
257 self.clean_processes()
259 if self.get("cleanHome"):
262 self.mkdir(self.node_home)
264 super(LinuxNode, self).provision()
267 if self.state == ResourceState.NEW:
272 self._state = ResourceState.FAILED
275 # Node needs to wait until all associated interfaces are
276 # ready before it can finalize deployment
277 from nepi.resources.linux.interface import LinuxInterface
278 ifaces = self.get_connected(LinuxInterface.rtype())
280 if iface.state < ResourceState.READY:
281 self.ec.schedule(reschedule_delay, self.deploy)
284 super(LinuxNode, self).deploy()
287 tear_down = self.get("tearDown")
289 self.execute(tear_down)
291 super(LinuxNode, self).release()
293 def valid_connection(self, guid):
297 def clean_processes(self, killer = False):
298 self.info("Cleaning up processes")
302 cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
303 "sudo -S killall python tcpdump || /bin/true ; " +
304 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
305 "sudo -S killall -u root || /bin/true ; " +
306 "sudo -S killall -u root || /bin/true ; ")
309 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
310 "sudo -S killall tcpdump || /bin/true ; " +
311 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
312 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
315 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
317 def clean_home(self):
318 self.info("Cleaning up home")
321 # "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
322 "find . -maxdepth 1 -name 'nepi-*' " +
323 " -execdir rm -rf {} + "
327 cmd = "cd %s ; " % self.home + cmd
330 (out, err), proc = self.execute(cmd, with_lock = True)
332 def upload(self, src, dst, text = False):
333 """ Copy content to destination
335 src content to copy. Can be a local file, directory or a list of files
337 dst destination path on the remote host (remote is always self.host)
339 text src is text input, it must be stored into a temp file before uploading
341 # If source is a string input
343 if text and not os.path.isfile(src):
344 # src is text input that should be uploaded as file
345 # create a temporal file with the content to upload
346 f = tempfile.NamedTemporaryFile(delete=False)
351 if not self.localhost:
352 # Build destination as <user>@<server>:<path>
353 dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
354 result = self.copy(src, dst)
362 def download(self, src, dst):
363 if not self.localhost:
364 # Build destination as <user>@<server>:<path>
365 src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
366 return self.copy(src, dst)
368 def install_packages(self, packages, home):
370 if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
371 command = rpmfuncs.install_packages_command(self.os, packages)
372 elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
373 command = debfuncs.install_packages_command(self.os, packages)
375 msg = "Error installing packages ( OS not known ) "
376 self.error(msg, self.os)
377 raise RuntimeError, msg
380 (out, err), proc = self.run_and_wait(command, home,
381 shfile = "instpkg.sh",
382 pidfile = "instpkg_pidfile",
383 ecodefile = "instpkg_exitcode",
384 stdout = "instpkg_stdout",
385 stderr = "instpkg_stderr",
386 raise_on_error = True)
388 return (out, err), proc
390 def remove_packages(self, packages, home):
392 if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
393 command = rpmfuncs.remove_packages_command(self.os, packages)
394 elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
395 command = debfuncs.remove_packages_command(self.os, packages)
397 msg = "Error removing packages ( OS not known ) "
399 raise RuntimeError, msg
402 (out, err), proc = self.run_and_wait(command, home,
404 pidfile = "rmpkg_pidfile",
405 ecodefile = "rmpkg_exitcode",
406 stdout = "rmpkg_stdout",
407 stderr = "rmpkg_stderr",
408 raise_on_error = True)
410 return (out, err), proc
412 def mkdir(self, path, clean = False):
416 return self.execute("mkdir -p %s" % path, with_lock = True)
418 def rmdir(self, path):
419 return self.execute("rm -rf %s" % path, with_lock = True)
421 def run_and_wait(self, command, home,
425 ecodefile = "exitcode",
431 raise_on_error = False):
433 runs a command in background on the remote host, busy-waiting
434 until the command finishes execution.
435 This is more robust than doing a simple synchronized 'execute',
436 since in the remote host the command can continue to run detached
437 even if network disconnections occur
439 self.upload_command(command, home,
441 ecodefile = ecodefile,
444 command = "bash ./%s" % shfile
445 # run command in background in remote host
446 (out, err), proc = self.run(command, home,
454 # check no errors occurred
456 msg = " Failed to run command '%s' " % command
457 self.error(msg, out, err)
459 raise RuntimeError, msg
461 # Wait for pid file to be generated
462 pid, ppid = self.wait_pid(
465 raise_on_error = raise_on_error)
467 # wait until command finishes to execute
468 self.wait_run(pid, ppid)
470 (out, err), proc = self.check_errors(home,
471 ecodefile = ecodefile,
475 # Out is what was written in the stderr file
477 msg = " Failed to run command '%s' " % command
478 self.error(msg, out, err)
481 raise RuntimeError, msg
483 return (out, err), proc
485 def exitcode(self, home, ecodefile = "exitcode"):
487 Get the exit code of an application.
488 Returns an integer value with the exit code
490 (out, err), proc = self.check_output(home, ecodefile)
492 # Succeeded to open file, return exit code in the file
495 return int(out.strip())
497 # Error in the content of the file!
498 return ExitCode.CORRUPTFILE
500 # No such file or directory
501 if proc.returncode == 1:
502 return ExitCode.FILENOTFOUND
504 # Other error from 'cat'
505 return ExitCode.ERROR
507 def upload_command(self, command, home,
509 ecodefile = "exitcode",
511 """ Saves the command as a bash script file in the remote host, and
512 forces to save the exit code of the command execution to the ecodefile
515 if not (command.strip().endswith(";") or command.strip().endswith("&")):
518 # The exit code of the command will be stored in ecodefile
519 command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
521 'ecodefile': ecodefile,
525 environ = self.format_environment(env)
527 # Add environ to command
528 command = environ + command
530 dst = os.path.join(home, shfile)
531 return self.upload(command, dst, text = True)
533 def format_environment(self, env, inline = False):
534 """Format environmental variables for command to be executed either
536 (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
537 as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
539 if not env: return ""
542 sep = ";" if inline else "\n"
543 return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep
545 def check_errors(self, home,
546 ecodefile = "exitcode",
550 Checks whether errors occurred while running a command.
551 It first checks the exit code for the command, and only if the
552 exit code is an error one it returns the error output.
557 # retrive standard output from the file
558 (out, oerr), oproc = self.check_output(home, stdout)
560 # get exit code saved in the 'exitcode' file
561 ecode = self.exitcode(home, ecodefile)
563 if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
564 err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
565 elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
566 # The process returned an error code or didn't exist.
567 # Check standard error.
568 (err, eerr), proc = self.check_output(home, stderr)
570 # If the stderr file was not found, assume nothing bad happened,
571 # and just ignore the error.
572 # (cat returns 1 for error "No such file or directory")
573 if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
576 return (out, err), proc
578 def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
579 """ Waits until the pid file for the command is generated,
580 and returns the pid and ppid of the process """
585 pidtuple = self.getpid(home = home, pidfile = pidfile)
594 msg = " Failed to get pid for pidfile %s/%s " % (
599 raise RuntimeError, msg
603 def wait_run(self, pid, ppid, trial = 0):
604 """ wait for a remote process to finish execution """
608 status = self.status(pid, ppid)
610 if status is ProcStatus.FINISHED:
612 elif status is not ProcStatus.RUNNING:
615 # If it takes more than 20 seconds to start, then
616 # asume something went wrong
620 # The app is running, just wait...
623 def check_output(self, home, filename):
624 """ Retrives content of file """
625 (out, err), proc = self.execute("cat %s" %
626 os.path.join(home, filename), retry = 1, with_lock = True)
627 return (out, err), proc
635 # TODO: FIX NOT ALIVE!!!!
636 (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5,
640 trace = traceback.format_exc()
641 msg = "Unresponsive host %s " % err
642 self.error(msg, out, trace)
645 if out.strip().startswith('ALIVE'):
648 msg = "Unresponsive host "
649 self.error(msg, out, err)
652 def copy(self, src, dst):
654 (out, err), proc = execfuncs.lcopy(source, dest,
656 strict_host_checking = False)
659 (out, err), proc = sshfuncs.rcopy(
661 port = self.get("port"),
662 identity = self.get("identity"),
663 server_key = self.get("serverKey"),
665 strict_host_checking = False)
667 return (out, err), proc
669 def execute(self, command,
677 err_on_timeout = True,
678 connect_timeout = 30,
679 strict_host_checking = False,
684 """ Notice that this invocation will block until the
685 execution finishes. If this is not the desired behavior,
686 use 'run' instead."""
689 (out, err), proc = execfuncs.lexec(command,
697 (out, err), proc = sshfuncs.rexec(
699 host = self.get("hostname"),
700 user = self.get("username"),
701 port = self.get("port"),
705 identity = self.get("identity"),
706 server_key = self.get("serverKey"),
709 forward_x11 = forward_x11,
712 err_on_timeout = err_on_timeout,
713 connect_timeout = connect_timeout,
714 persistent = persistent,
716 strict_host_checking = strict_host_checking
719 (out, err), proc = sshfuncs.rexec(
721 host = self.get("hostname"),
722 user = self.get("username"),
723 port = self.get("port"),
727 identity = self.get("identity"),
728 server_key = self.get("serverKey"),
731 forward_x11 = forward_x11,
734 err_on_timeout = err_on_timeout,
735 connect_timeout = connect_timeout,
736 persistent = persistent,
738 strict_host_checking = strict_host_checking
741 return (out, err), proc
743 def run(self, command, home,
752 self.debug("Running command '%s'" % command)
755 (out, err), proc = execfuncs.lspawn(command, pidfile,
760 create_home = create_home,
765 (out, err), proc = sshfuncs.rspawn(
769 create_home = create_home,
770 stdin = stdin if stdin is not None else '/dev/null',
771 stdout = stdout if stdout else '/dev/null',
772 stderr = stderr if stderr else '/dev/null',
774 host = self.get("hostname"),
775 user = self.get("username"),
776 port = self.get("port"),
778 identity = self.get("identity"),
779 server_key = self.get("serverKey"),
783 return (out, err), proc
785 def getpid(self, home, pidfile = "pidfile"):
787 pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
790 pidtuple = sshfuncs.rgetpid(
791 os.path.join(home, pidfile),
792 host = self.get("hostname"),
793 user = self.get("username"),
794 port = self.get("port"),
796 identity = self.get("identity"),
797 server_key = self.get("serverKey")
802 def status(self, pid, ppid):
804 status = execfuncs.lstatus(pid, ppid)
807 status = sshfuncs.rstatus(
809 host = self.get("hostname"),
810 user = self.get("username"),
811 port = self.get("port"),
813 identity = self.get("identity"),
814 server_key = self.get("serverKey")
819 def kill(self, pid, ppid, sudo = False):
822 status = self.status(pid, ppid)
824 if status == sshfuncs.ProcStatus.RUNNING:
826 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
829 (out, err), proc = sshfuncs.rkill(
831 host = self.get("hostname"),
832 user = self.get("username"),
833 port = self.get("port"),
836 identity = self.get("identity"),
837 server_key = self.get("serverKey")
840 return (out, err), proc