2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
23 from nepi.resources.linux import rpmfuncs, debfuncs
24 from nepi.util import sshfuncs, execfuncs
25 from nepi.util.sshfuncs import ProcStatus
35 # TODO: Verify files and dirs exists already
36 # TODO: Blacklist nodes!
37 # TODO: Unify delays!!
38 # TODO: Validate outcome of uploads!!
43 Error codes that the rexitcode function can return if unable to
44 check the exit code of a spawned process
53 Supported flavors of Linux OS
63 class LinuxNode(ResourceManager):
65 .. class:: Class Args :
67 :param ec: The Experiment controller
68 :type ec: ExperimentController
69 :param guid: guid of the RM
74 There are different ways in which commands can be executed using the
75 LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
80 * 'execute' (blocking mode) :
82 HOW IT WORKS: 'execute', forks a process and run the
83 command, synchronously, attached to the terminal, in
85 The execute method will block until the command returns
86 the result on 'out', 'err' (so until it finishes executing).
88 USAGE: short-lived commands that must be executed attached
89 to a terminal and in foreground, for which it IS necessary
90 to block until the command has finished (e.g. if you want
91 to run 'ls' or 'cat').
93 * 'execute' (NON blocking mode - blocking = False) :
95 HOW IT WORKS: Same as before, except that execute method
96 will return immediately (even if command still running).
98 USAGE: long-lived commands that must be executed attached
99 to a terminal and in foreground, but for which it is not
100 necessary to block until the command has finished. (e.g.
101 start an application using X11 forwarding)
105 HOW IT WORKS: Connects to the host ( using SSH if remote)
106 and launches the command in background, detached from any
107 terminal (daemonized), and returns. The command continues to
108 run remotely, but since it is detached from the terminal,
109 its pipes (stdin, stdout, stderr) can't be redirected to the
110 console (as normal non detached processes would), and so they
111 are explicitly redirected to files. The pidfile is created as
112 part of the process of launching the command. The pidfile
113 holds the pid and ppid of the process forked in background,
114 so later on it is possible to check whether the command is still
117 USAGE: long-lived commands that can run detached in background,
118 for which it is NOT necessary to block (wait) until the command
119 has finished. (e.g. start an application that is not using X11
120 forwarding. It can run detached and remotely in background)
124 HOW IT WORKS: Similar to 'run' except that it 'blocks' until
125 the command has finished execution. It also checks whether
126 errors occurred during runtime by reading the exitcode file,
127 which contains the exit code of the command that was run
128 (checking stderr only is not always reliable since many
129 commands throw debugging info to stderr and the only way to
130 automatically know whether an error really happened is to
131 check the process exit code).
133 Another difference with respect to 'run', is that instead
134 of directly executing the command as a bash command line,
135 it uploads the command to a bash script and runs the script.
136 This allows to use the bash script to debug errors, since
137 it remains at the remote host and can be run manually to
140 USAGE: medium-lived commands that can run detached in
141 background, for which it IS necessary to block (wait) until
142 the command has finished. (e.g. Package installation,
143 source compilation, file download, etc)
149 def _register_attributes(cls):
150 hostname = Attribute("hostname", "Hostname of the machine",
151 flags = Flags.ExecReadOnly)
153 username = Attribute("username", "Local account username",
154 flags = Flags.Credential)
156 port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
158 home = Attribute("home",
159 "Experiment home directory to store all experiment related files",
160 flags = Flags.ExecReadOnly)
162 identity = Attribute("identity", "SSH identity file",
163 flags = Flags.Credential)
165 server_key = Attribute("serverKey", "Server public key",
166 flags = Flags.ExecReadOnly)
168 clean_home = Attribute("cleanHome", "Remove all files and directories " + \
169 " from home folder before starting experiment",
170 flags = Flags.ExecReadOnly)
172 clean_processes = Attribute("cleanProcesses",
173 "Kill all running processes before starting experiment",
174 flags = Flags.ExecReadOnly)
176 tear_down = Attribute("tearDown", "Bash script to be executed before " + \
177 "releasing the resource",
178 flags = Flags.ExecReadOnly)
180 cls._register_attribute(hostname)
181 cls._register_attribute(username)
182 cls._register_attribute(port)
183 cls._register_attribute(home)
184 cls._register_attribute(identity)
185 cls._register_attribute(server_key)
186 cls._register_attribute(clean_home)
187 cls._register_attribute(clean_processes)
188 cls._register_attribute(tear_down)
190 def __init__(self, ec, guid):
191 super(LinuxNode, self).__init__(ec, guid)
194 # lock to avoid concurrency issues on methods used by applications
195 self._lock = threading.Lock()
197 def log_message(self, msg):
198 return " guid %d - host %s - %s " % (self.guid,
199 self.get("hostname"), msg)
203 return self.get("home") or ""
207 return os.path.join(self.home, self.ec.exp_id)
211 node_home = "node-%d" % self.guid
212 return os.path.join(self.exp_home, node_home)
219 if (not self.get("hostname") or not self.get("username")):
220 msg = "Can't resolve OS, insufficient data "
222 raise RuntimeError, msg
224 (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
226 if err and proc.poll():
227 msg = "Error detecting OS "
228 self.error(msg, out, err)
229 raise RuntimeError, "%s - %s - %s" %( msg, out, err )
231 if out.find("Fedora release 8") == 0:
232 self._os = OSType.FEDORA_8
233 elif out.find("Fedora release 12") == 0:
234 self._os = OSType.FEDORA_12
235 elif out.find("Fedora release 14") == 0:
236 self._os = OSType.FEDORA_14
237 elif out.find("Debian") == 0:
238 self._os = OSType.DEBIAN
239 elif out.find("Ubuntu") ==0:
240 self._os = OSType.UBUNTU
242 msg = "Unsupported OS"
244 raise RuntimeError, "%s - %s " %( msg, out )
250 return self.os in [OSType.DEBIAN, OSType.UBUNTU]
254 return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
259 return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
262 if not self.is_alive():
263 self._state = ResourceState.FAILED
264 msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
266 raise RuntimeError, msg
268 if self.get("cleanProcesses"):
269 self.clean_processes()
271 if self.get("cleanHome"):
274 self.mkdir(self.node_home)
276 super(LinuxNode, self).provision()
279 if self.state == ResourceState.NEW:
284 self._state = ResourceState.FAILED
287 # Node needs to wait until all associated interfaces are
288 # ready before it can finalize deployment
289 from nepi.resources.linux.interface import LinuxInterface
290 ifaces = self.get_connected(LinuxInterface.rtype())
292 if iface.state < ResourceState.READY:
293 self.ec.schedule(reschedule_delay, self.deploy)
296 super(LinuxNode, self).deploy()
299 tear_down = self.get("tearDown")
301 self.execute(tear_down)
303 super(LinuxNode, self).release()
305 def valid_connection(self, guid):
309 def clean_processes(self, killer = False):
310 self.info("Cleaning up processes")
314 cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
315 "sudo -S killall python tcpdump || /bin/true ; " +
316 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
317 "sudo -S killall -u root || /bin/true ; " +
318 "sudo -S killall -u root || /bin/true ; ")
321 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
322 "sudo -S killall tcpdump || /bin/true ; " +
323 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
324 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
327 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
329 def clean_home(self):
330 self.info("Cleaning up home")
333 # "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
334 "find . -maxdepth 1 -name 'nepi-*' " +
335 " -execdir rm -rf {} + "
339 cmd = "cd %s ; " % self.home + cmd
342 (out, err), proc = self.execute(cmd, with_lock = True)
344 def upload(self, src, dst, text = False):
345 """ Copy content to destination
347 src content to copy. Can be a local file, directory or a list of files
349 dst destination path on the remote host (remote is always self.host)
351 text src is text input, it must be stored into a temp file before uploading
353 # If source is a string input
355 if text and not os.path.isfile(src):
356 # src is text input that should be uploaded as file
357 # create a temporal file with the content to upload
358 f = tempfile.NamedTemporaryFile(delete=False)
363 if not self.localhost:
364 # Build destination as <user>@<server>:<path>
365 dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
366 result = self.copy(src, dst)
374 def download(self, src, dst):
375 if not self.localhost:
376 # Build destination as <user>@<server>:<path>
377 src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
378 return self.copy(src, dst)
380 def install_packages(self, packages, home):
383 command = rpmfuncs.install_packages_command(self.os, packages)
385 command = debfuncs.install_packages_command(self.os, packages)
387 msg = "Error installing packages ( OS not known ) "
388 self.error(msg, self.os)
389 raise RuntimeError, msg
392 (out, err), proc = self.run_and_wait(command, home,
393 shfile = "instpkg.sh",
394 pidfile = "instpkg_pidfile",
395 ecodefile = "instpkg_exitcode",
396 stdout = "instpkg_stdout",
397 stderr = "instpkg_stderr",
398 raise_on_error = True)
400 return (out, err), proc
402 def remove_packages(self, packages, home):
405 command = rpmfuncs.remove_packages_command(self.os, packages)
407 command = debfuncs.remove_packages_command(self.os, packages)
409 msg = "Error removing packages ( OS not known ) "
411 raise RuntimeError, msg
414 (out, err), proc = self.run_and_wait(command, home,
416 pidfile = "rmpkg_pidfile",
417 ecodefile = "rmpkg_exitcode",
418 stdout = "rmpkg_stdout",
419 stderr = "rmpkg_stderr",
420 raise_on_error = True)
422 return (out, err), proc
424 def mkdir(self, path, clean = False):
428 return self.execute("mkdir -p %s" % path, with_lock = True)
430 def rmdir(self, path):
431 return self.execute("rm -rf %s" % path, with_lock = True)
433 def run_and_wait(self, command, home,
437 ecodefile = "exitcode",
443 raise_on_error = False):
445 Uploads the 'command' to a bash script in the host.
446 Then runs the script detached in background in the host, and
447 busy-waites until the script finishes executing.
449 self.upload_command(command, home,
451 ecodefile = ecodefile,
454 command = "bash ./%s" % shfile
455 # run command in background in remote host
456 (out, err), proc = self.run(command, home,
464 # check no errors occurred
466 msg = " Failed to run command '%s' " % command
467 self.error(msg, out, err)
469 raise RuntimeError, msg
471 # Wait for pid file to be generated
472 pid, ppid = self.wait_pid(
475 raise_on_error = raise_on_error)
477 # wait until command finishes to execute
478 self.wait_run(pid, ppid)
480 (out, err), proc = self.check_errors(home,
481 ecodefile = ecodefile,
485 # Out is what was written in the stderr file
487 msg = " Failed to run command '%s' " % command
488 self.error(msg, out, err)
491 raise RuntimeError, msg
493 return (out, err), proc
495 def exitcode(self, home, ecodefile = "exitcode"):
497 Get the exit code of an application.
498 Returns an integer value with the exit code
500 (out, err), proc = self.check_output(home, ecodefile)
502 # Succeeded to open file, return exit code in the file
505 return int(out.strip())
507 # Error in the content of the file!
508 return ExitCode.CORRUPTFILE
510 # No such file or directory
511 if proc.returncode == 1:
512 return ExitCode.FILENOTFOUND
514 # Other error from 'cat'
515 return ExitCode.ERROR
517 def upload_command(self, command, home,
519 ecodefile = "exitcode",
521 """ Saves the command as a bash script file in the remote host, and
522 forces to save the exit code of the command execution to the ecodefile
525 if not (command.strip().endswith(";") or command.strip().endswith("&")):
528 # The exit code of the command will be stored in ecodefile
529 command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
531 'ecodefile': ecodefile,
535 environ = self.format_environment(env)
537 # Add environ to command
538 command = environ + command
540 dst = os.path.join(home, shfile)
541 return self.upload(command, dst, text = True)
543 def format_environment(self, env, inline = False):
544 """Format environmental variables for command to be executed either
546 (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
547 as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
549 if not env: return ""
551 # Remove extra white spaces
552 env = re.sub(r'\s+', ' ', env.strip())
554 sep = ";" if inline else "\n"
555 return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep
557 def check_errors(self, home,
558 ecodefile = "exitcode",
562 Checks whether errors occurred while running a command.
563 It first checks the exit code for the command, and only if the
564 exit code is an error one it returns the error output.
569 # retrive standard output from the file
570 (out, oerr), oproc = self.check_output(home, stdout)
572 # get exit code saved in the 'exitcode' file
573 ecode = self.exitcode(home, ecodefile)
575 if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
576 err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
577 elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
578 # The process returned an error code or didn't exist.
579 # Check standard error.
580 (err, eerr), proc = self.check_output(home, stderr)
582 # If the stderr file was not found, assume nothing bad happened,
583 # and just ignore the error.
584 # (cat returns 1 for error "No such file or directory")
585 if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
588 return (out, err), proc
590 def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
591 """ Waits until the pid file for the command is generated,
592 and returns the pid and ppid of the process """
597 pidtuple = self.getpid(home = home, pidfile = pidfile)
606 msg = " Failed to get pid for pidfile %s/%s " % (
611 raise RuntimeError, msg
615 def wait_run(self, pid, ppid, trial = 0):
616 """ wait for a remote process to finish execution """
620 status = self.status(pid, ppid)
622 if status is ProcStatus.FINISHED:
624 elif status is not ProcStatus.RUNNING:
627 # If it takes more than 20 seconds to start, then
628 # asume something went wrong
632 # The app is running, just wait...
635 def check_output(self, home, filename):
636 """ Retrives content of file """
637 (out, err), proc = self.execute("cat %s" %
638 os.path.join(home, filename), retry = 1, with_lock = True)
639 return (out, err), proc
647 # TODO: FIX NOT ALIVE!!!!
648 (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5,
652 trace = traceback.format_exc()
653 msg = "Unresponsive host %s " % err
654 self.error(msg, out, trace)
657 if out.strip().startswith('ALIVE'):
660 msg = "Unresponsive host "
661 self.error(msg, out, err)
664 def copy(self, src, dst):
666 (out, err), proc = execfuncs.lcopy(source, dest,
668 strict_host_checking = False)
671 (out, err), proc = sshfuncs.rcopy(
673 port = self.get("port"),
674 identity = self.get("identity"),
675 server_key = self.get("serverKey"),
677 strict_host_checking = False)
679 return (out, err), proc
681 def execute(self, command,
689 err_on_timeout = True,
690 connect_timeout = 30,
691 strict_host_checking = False,
696 """ Notice that this invocation will block until the
697 execution finishes. If this is not the desired behavior,
698 use 'run' instead."""
701 (out, err), proc = execfuncs.lexec(command,
709 (out, err), proc = sshfuncs.rexec(
711 host = self.get("hostname"),
712 user = self.get("username"),
713 port = self.get("port"),
717 identity = self.get("identity"),
718 server_key = self.get("serverKey"),
721 forward_x11 = forward_x11,
724 err_on_timeout = err_on_timeout,
725 connect_timeout = connect_timeout,
726 persistent = persistent,
728 strict_host_checking = strict_host_checking
731 (out, err), proc = sshfuncs.rexec(
733 host = self.get("hostname"),
734 user = self.get("username"),
735 port = self.get("port"),
739 identity = self.get("identity"),
740 server_key = self.get("serverKey"),
743 forward_x11 = forward_x11,
746 err_on_timeout = err_on_timeout,
747 connect_timeout = connect_timeout,
748 persistent = persistent,
750 strict_host_checking = strict_host_checking
753 return (out, err), proc
755 def run(self, command, home,
764 self.debug("Running command '%s'" % command)
767 (out, err), proc = execfuncs.lspawn(command, pidfile,
772 create_home = create_home,
777 (out, err), proc = sshfuncs.rspawn(
781 create_home = create_home,
782 stdin = stdin if stdin is not None else '/dev/null',
783 stdout = stdout if stdout else '/dev/null',
784 stderr = stderr if stderr else '/dev/null',
786 host = self.get("hostname"),
787 user = self.get("username"),
788 port = self.get("port"),
790 identity = self.get("identity"),
791 server_key = self.get("serverKey"),
795 return (out, err), proc
797 def getpid(self, home, pidfile = "pidfile"):
799 pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
802 pidtuple = sshfuncs.rgetpid(
803 os.path.join(home, pidfile),
804 host = self.get("hostname"),
805 user = self.get("username"),
806 port = self.get("port"),
808 identity = self.get("identity"),
809 server_key = self.get("serverKey")
814 def status(self, pid, ppid):
816 status = execfuncs.lstatus(pid, ppid)
819 status = sshfuncs.rstatus(
821 host = self.get("hostname"),
822 user = self.get("username"),
823 port = self.get("port"),
825 identity = self.get("identity"),
826 server_key = self.get("serverKey")
831 def kill(self, pid, ppid, sudo = False):
834 status = self.status(pid, ppid)
836 if status == sshfuncs.ProcStatus.RUNNING:
838 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
841 (out, err), proc = sshfuncs.rkill(
843 host = self.get("hostname"),
844 user = self.get("username"),
845 port = self.get("port"),
848 identity = self.get("identity"),
849 server_key = self.get("serverKey")
852 return (out, err), proc