2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
22 from nepi.resources.linux import rpmfuncs, debfuncs
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
34 # TODO: Verify files and dirs exists already
35 # TODO: Blacklist nodes!
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!!
39 reschedule_delay = "0.5s"
43 Error codes that the rexitcode function can return if unable to
44 check the exit code of a spawned process
53 Supported flavors of Linux OS
62 class LinuxNode(ResourceManager):
66 def _register_attributes(cls):
67 hostname = Attribute("hostname", "Hostname of the machine",
68 flags = Flags.ExecReadOnly)
70 username = Attribute("username", "Local account username",
71 flags = Flags.Credential)
73 port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
75 home = Attribute("home",
76 "Experiment home directory to store all experiment related files",
77 flags = Flags.ExecReadOnly)
79 identity = Attribute("identity", "SSH identity file",
80 flags = Flags.Credential)
82 server_key = Attribute("serverKey", "Server public key",
83 flags = Flags.ExecReadOnly)
85 clean_home = Attribute("cleanHome", "Remove all files and directories " + \
86 " from home folder before starting experiment",
87 flags = Flags.ExecReadOnly)
89 clean_processes = Attribute("cleanProcesses",
90 "Kill all running processes before starting experiment",
91 flags = Flags.ExecReadOnly)
93 tear_down = Attribute("tearDown", "Bash script to be executed before " + \
94 "releasing the resource",
95 flags = Flags.ExecReadOnly)
97 cls._register_attribute(hostname)
98 cls._register_attribute(username)
99 cls._register_attribute(port)
100 cls._register_attribute(home)
101 cls._register_attribute(identity)
102 cls._register_attribute(server_key)
103 cls._register_attribute(clean_home)
104 cls._register_attribute(clean_processes)
105 cls._register_attribute(tear_down)
107 def __init__(self, ec, guid):
108 super(LinuxNode, self).__init__(ec, guid)
111 # lock to avoid concurrency issues on methods used by applications
112 self._lock = threading.Lock()
114 def log_message(self, msg):
115 return " guid %d - host %s - %s " % (self.guid,
116 self.get("hostname"), msg)
120 return self.get("home") or ""
124 return os.path.join(self.home, self.ec.exp_id)
128 node_home = "node-%d" % self.guid
129 return os.path.join(self.exp_home, node_home)
136 if (not self.get("hostname") or not self.get("username")):
137 msg = "Can't resolve OS, insufficient data "
139 raise RuntimeError, msg
141 (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
143 if err and proc.poll():
144 msg = "Error detecting OS "
145 self.error(msg, out, err)
146 raise RuntimeError, "%s - %s - %s" %( msg, out, err )
148 if out.find("Fedora release 12") == 0:
149 self._os = OSType.FEDORA_12
150 elif out.find("Fedora release 14") == 0:
151 self._os = OSType.FEDORA_14
152 elif out.find("Debian") == 0:
153 self._os = OSType.DEBIAN
154 elif out.find("Ubuntu") ==0:
155 self._os = OSType.UBUNTU
157 msg = "Unsupported OS"
159 raise RuntimeError, "%s - %s " %( msg, out )
165 return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
168 if not self.is_alive():
169 self._state = ResourceState.FAILED
170 msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
172 raise RuntimeError, msg
174 if self.get("cleanProcesses"):
175 self.clean_processes()
177 if self.get("cleanHome"):
180 self.mkdir(self.node_home)
182 super(LinuxNode, self).provision()
185 if self.state == ResourceState.NEW:
190 self._state = ResourceState.FAILED
193 # Node needs to wait until all associated interfaces are
194 # ready before it can finalize deployment
195 from nepi.resources.linux.interface import LinuxInterface
196 ifaces = self.get_connected(LinuxInterface.rtype())
198 if iface.state < ResourceState.READY:
199 self.ec.schedule(reschedule_delay, self.deploy)
202 super(LinuxNode, self).deploy()
205 tear_down = self.get("tearDown")
207 self.execute(tear_down)
209 super(LinuxNode, self).release()
211 def valid_connection(self, guid):
215 def clean_processes(self, killer = False):
216 self.info("Cleaning up processes")
220 cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
221 "sudo -S killall python tcpdump || /bin/true ; " +
222 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
223 "sudo -S killall -u root || /bin/true ; " +
224 "sudo -S killall -u root || /bin/true ; ")
227 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
228 "sudo -S killall tcpdump || /bin/true ; " +
229 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
230 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
233 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
235 def clean_home(self):
236 self.info("Cleaning up home")
239 # "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
240 "find . -maxdepth 1 -name 'nepi-*' " +
241 " -execdir rm -rf {} + "
245 cmd = "cd %s ; " % self.home + cmd
248 (out, err), proc = self.execute(cmd, with_lock = True)
250 def upload(self, src, dst, text = False):
251 """ Copy content to destination
253 src content to copy. Can be a local file, directory or a list of files
255 dst destination path on the remote host (remote is always self.host)
257 text src is text input, it must be stored into a temp file before uploading
259 # If source is a string input
261 if text and not os.path.isfile(src):
262 # src is text input that should be uploaded as file
263 # create a temporal file with the content to upload
264 f = tempfile.NamedTemporaryFile(delete=False)
269 if not self.localhost:
270 # Build destination as <user>@<server>:<path>
271 dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
272 result = self.copy(src, dst)
280 def download(self, src, dst):
281 if not self.localhost:
282 # Build destination as <user>@<server>:<path>
283 src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
284 return self.copy(src, dst)
286 def install_packages(self, packages, home):
288 if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
289 command = rpmfuncs.install_packages_command(self.os, packages)
290 elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
291 command = debfuncs.install_packages_command(self.os, packages)
293 msg = "Error installing packages ( OS not known ) "
294 self.error(msg, self.os)
295 raise RuntimeError, msg
298 (out, err), proc = self.run_and_wait(command, home,
299 shfile = "instpkg.sh",
300 pidfile = "instpkg_pidfile",
301 ecodefile = "instpkg_exitcode",
302 stdout = "instpkg_stdout",
303 stderr = "instpkg_stderr",
304 raise_on_error = True)
306 return (out, err), proc
308 def remove_packages(self, packages, home):
310 if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]:
311 command = rpmfuncs.remove_packages_command(self.os, packages)
312 elif self.os in [OSType.DEBIAN, OSType.UBUNTU]:
313 command = debfuncs.remove_packages_command(self.os, packages)
315 msg = "Error removing packages ( OS not known ) "
317 raise RuntimeError, msg
320 (out, err), proc = self.run_and_wait(command, home,
322 pidfile = "rmpkg_pidfile",
323 ecodefile = "rmpkg_exitcode",
324 stdout = "rmpkg_stdout",
325 stderr = "rmpkg_stderr",
326 raise_on_error = True)
328 return (out, err), proc
330 def mkdir(self, path, clean = False):
334 return self.execute("mkdir -p %s" % path, with_lock = True)
336 def rmdir(self, path):
337 return self.execute("rm -rf %s" % path, with_lock = True)
339 def run_and_wait(self, command, home,
343 ecodefile = "exitcode",
349 raise_on_error = False):
351 runs a command in background on the remote host, busy-waiting
352 until the command finishes execution.
353 This is more robust than doing a simple synchronized 'execute',
354 since in the remote host the command can continue to run detached
355 even if network disconnections occur
357 self.upload_command(command, home,
359 ecodefile = ecodefile,
362 command = "bash ./%s" % shfile
363 # run command in background in remote host
364 (out, err), proc = self.run(command, home,
372 # check no errors occurred
373 if proc.poll() and err:
374 msg = " Failed to run command '%s' " % command
375 self.error(msg, out, err)
377 raise RuntimeError, msg
378 # Wait for pid file to be generated
379 pid, ppid = self.wait_pid(
382 raise_on_error = raise_on_error)
384 # wait until command finishes to execute
385 self.wait_run(pid, ppid)
387 (out, err), proc = self.check_errors(home, ecodefile, stderr)
389 # Out is what was written in the stderr file
391 msg = " Failed to run command '%s' " % command
392 self.error(msg, out, err)
395 raise RuntimeError, msg
397 return (out, err), proc
399 def exitcode(self, home, ecodefile = "exitcode"):
401 Get the exit code of an application.
402 Returns an integer value with the exit code
404 (out, err), proc = self.check_output(home, ecodefile)
406 # Succeeded to open file, return exit code in the file
409 return int(out.strip())
411 # Error in the content of the file!
412 return ExitCode.CORRUPTFILE
414 # No such file or directory
415 if proc.returncode == 1:
416 return ExitCode.FILENOTFOUND
418 # Other error from 'cat'
419 return ExitCode.ERROR
421 def upload_command(self, command, home,
423 ecodefile = "exitcode",
425 """ Saves the command as a bash script file in the remote host, and
426 forces to save the exit code of the command execution to the ecodefile
429 # The exit code of the command will be stored in ecodefile
430 command = " %(command)s ; echo $? > %(ecodefile)s ;" % {
432 'ecodefile': ecodefile,
436 environ = self.format_environment(env)
438 # Add environ to command
439 command = environ + command
441 dst = os.path.join(home, shfile)
442 return self.upload(command, dst, text = True)
444 def format_environment(self, env, inline = False):
445 """Format environmental variables for command to be executed either
446 as an inline command (i.e. PYTHONPATH=src/.. python script.py) or
447 as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
449 sep = " " if inline else "\n"
450 export = " " if inline else "export"
451 return sep.join(map(lambda e: "%s %s" % (export, e), env.split(" "))) \
454 def check_errors(self, home,
455 ecodefile = "exitcode",
459 Checks whether errors occurred while running a command.
460 It first checks the exit code for the command, and only if the
461 exit code is an error one it returns the error output.
467 # get exit code saved in the 'exitcode' file
468 ecode = self.exitcode(home, ecodefile)
470 if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
471 err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
472 elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
473 # The process returned an error code or didn't exist.
474 # Check standard error.
475 (err, eerr), proc = self.check_output(home, stderr)
477 # Alsow retrive standard output for information
478 (out, oerr), oproc = self.check_output(home, stdout)
480 # If the stderr file was not found, assume nothing bad happened,
481 # and just ignore the error.
482 # (cat returns 1 for error "No such file or directory")
483 if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
486 return (out, err), proc
488 def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
489 """ Waits until the pid file for the command is generated,
490 and returns the pid and ppid of the process """
495 pidtuple = self.getpid(home = home, pidfile = pidfile)
504 msg = " Failed to get pid for pidfile %s/%s " % (
509 raise RuntimeError, msg
513 def wait_run(self, pid, ppid, trial = 0):
514 """ wait for a remote process to finish execution """
518 status = self.status(pid, ppid)
520 if status is ProcStatus.FINISHED:
522 elif status is not ProcStatus.RUNNING:
525 # If it takes more than 20 seconds to start, then
526 # asume something went wrong
530 # The app is running, just wait...
533 def check_output(self, home, filename):
534 """ Retrives content of file """
535 (out, err), proc = self.execute("cat %s" %
536 os.path.join(home, filename), retry = 1, with_lock = True)
537 return (out, err), proc
545 # TODO: FIX NOT ALIVE!!!!
546 (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5,
550 trace = traceback.format_exc()
551 msg = "Unresponsive host %s " % err
552 self.error(msg, out, trace)
555 if out.strip().startswith('ALIVE'):
558 msg = "Unresponsive host "
559 self.error(msg, out, err)
562 def copy(self, src, dst):
564 (out, err), proc = execfuncs.lcopy(source, dest,
566 strict_host_checking = False)
569 (out, err), proc = sshfuncs.rcopy(
571 port = self.get("port"),
572 identity = self.get("identity"),
573 server_key = self.get("serverKey"),
575 strict_host_checking = False)
577 return (out, err), proc
579 def execute(self, command,
587 err_on_timeout = True,
588 connect_timeout = 30,
589 strict_host_checking = False,
594 """ Notice that this invocation will block until the
595 execution finishes. If this is not the desired behavior,
596 use 'run' instead."""
599 (out, err), proc = execfuncs.lexec(command,
607 (out, err), proc = sshfuncs.rexec(
609 host = self.get("hostname"),
610 user = self.get("username"),
611 port = self.get("port"),
615 identity = self.get("identity"),
616 server_key = self.get("serverKey"),
619 forward_x11 = forward_x11,
622 err_on_timeout = err_on_timeout,
623 connect_timeout = connect_timeout,
624 persistent = persistent,
626 strict_host_checking = strict_host_checking
629 (out, err), proc = sshfuncs.rexec(
631 host = self.get("hostname"),
632 user = self.get("username"),
633 port = self.get("port"),
637 identity = self.get("identity"),
638 server_key = self.get("serverKey"),
641 forward_x11 = forward_x11,
644 err_on_timeout = err_on_timeout,
645 connect_timeout = connect_timeout,
646 persistent = persistent,
648 strict_host_checking = strict_host_checking
651 return (out, err), proc
653 def run(self, command, home,
662 self.debug("Running command '%s'" % command)
665 (out, err), proc = execfuncs.lspawn(command, pidfile,
670 create_home = create_home,
675 (out, err), proc = sshfuncs.rspawn(
679 create_home = create_home,
680 stdin = stdin if stdin is not None else '/dev/null',
681 stdout = stdout if stdout else '/dev/null',
682 stderr = stderr if stderr else '/dev/null',
684 host = self.get("hostname"),
685 user = self.get("username"),
686 port = self.get("port"),
688 identity = self.get("identity"),
689 server_key = self.get("serverKey"),
693 return (out, err), proc
695 def getpid(self, home, pidfile = "pidfile"):
697 pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
700 pidtuple = sshfuncs.rgetpid(
701 os.path.join(home, pidfile),
702 host = self.get("hostname"),
703 user = self.get("username"),
704 port = self.get("port"),
706 identity = self.get("identity"),
707 server_key = self.get("serverKey")
712 def status(self, pid, ppid):
714 status = execfuncs.lstatus(pid, ppid)
717 status = sshfuncs.rstatus(
719 host = self.get("hostname"),
720 user = self.get("username"),
721 port = self.get("port"),
723 identity = self.get("identity"),
724 server_key = self.get("serverKey")
729 def kill(self, pid, ppid, sudo = False):
732 status = self.status(pid, ppid)
734 if status == sshfuncs.ProcStatus.RUNNING:
736 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
739 (out, err), proc = sshfuncs.rkill(
741 host = self.get("hostname"),
742 user = self.get("username"),
743 port = self.get("port"),
746 identity = self.get("identity"),
747 server_key = self.get("serverKey")
750 return (out, err), proc