2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
22 from nepi.resources.linux import rpmfuncs, debfuncs
23 from nepi.util import sshfuncs, execfuncs
24 from nepi.util.sshfuncs import ProcStatus
34 # TODO: Verify files and dirs exists already
35 # TODO: Blacklist nodes!
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!!
39 reschedule_delay = "0.5s"
43 Error codes that the rexitcode function can return if unable to
44 check the exit code of a spawned process
52 class LinuxNode(ResourceManager):
56 def _register_attributes(cls):
57 hostname = Attribute("hostname", "Hostname of the machine",
58 flags = Flags.ExecReadOnly)
60 username = Attribute("username", "Local account username",
61 flags = Flags.Credential)
63 port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
65 home = Attribute("home",
66 "Experiment home directory to store all experiment related files",
67 flags = Flags.ExecReadOnly)
69 identity = Attribute("identity", "SSH identity file",
70 flags = Flags.Credential)
72 server_key = Attribute("serverKey", "Server public key",
73 flags = Flags.ExecReadOnly)
75 clean_home = Attribute("cleanHome", "Remove all files and directories " + \
76 " from home folder before starting experiment",
77 flags = Flags.ExecReadOnly)
79 clean_processes = Attribute("cleanProcesses",
80 "Kill all running processes before starting experiment",
81 flags = Flags.ExecReadOnly)
83 tear_down = Attribute("tearDown", "Bash script to be executed before " + \
84 "releasing the resource",
85 flags = Flags.ExecReadOnly)
87 cls._register_attribute(hostname)
88 cls._register_attribute(username)
89 cls._register_attribute(port)
90 cls._register_attribute(home)
91 cls._register_attribute(identity)
92 cls._register_attribute(server_key)
93 cls._register_attribute(clean_home)
94 cls._register_attribute(clean_processes)
95 cls._register_attribute(tear_down)
97 def __init__(self, ec, guid):
98 super(LinuxNode, self).__init__(ec, guid)
101 # lock to avoid concurrency issues on methods used by applications
102 self._lock = threading.Lock()
104 def log_message(self, msg):
105 return " guid %d - host %s - %s " % (self.guid,
106 self.get("hostname"), msg)
110 return self.get("home") or ""
114 return os.path.join(self.home, self.ec.exp_id)
118 node_home = "node-%d" % self.guid
119 return os.path.join(self.exp_home, node_home)
126 if (not self.get("hostname") or not self.get("username")):
127 msg = "Can't resolve OS, insufficient data "
129 raise RuntimeError, msg
131 (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
133 if err and proc.poll():
134 msg = "Error detecting OS "
135 self.error(msg, out, err)
136 raise RuntimeError, "%s - %s - %s" %( msg, out, err )
138 if out.find("Fedora release 12") == 0:
140 elif out.find("Fedora release 14") == 0:
142 elif out.find("Debian") == 0:
144 elif out.find("Ubuntu") ==0:
147 msg = "Unsupported OS"
149 raise RuntimeError, "%s - %s " %( msg, out )
155 return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
158 if not self.is_alive():
159 self._state = ResourceState.FAILED
160 msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
162 raise RuntimeError, msg
164 if self.get("cleanProcesses"):
165 self.clean_processes()
167 if self.get("cleanHome"):
170 self.mkdir(self.node_home)
172 super(LinuxNode, self).provision()
175 if self.state == ResourceState.NEW:
180 self._state = ResourceState.FAILED
183 # Node needs to wait until all associated interfaces are
184 # ready before it can finalize deployment
185 from nepi.resources.linux.interface import LinuxInterface
186 ifaces = self.get_connected(LinuxInterface.rtype())
188 if iface.state < ResourceState.READY:
189 self.ec.schedule(reschedule_delay, self.deploy)
192 super(LinuxNode, self).deploy()
195 tear_down = self.get("tearDown")
197 self.execute(tear_down)
199 super(LinuxNode, self).release()
201 def valid_connection(self, guid):
205 def clean_processes(self, killer = False):
206 self.info("Cleaning up processes")
210 cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
211 "sudo -S killall python tcpdump || /bin/true ; " +
212 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
213 "sudo -S killall -u root || /bin/true ; " +
214 "sudo -S killall -u root || /bin/true ; ")
217 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
218 "sudo -S killall tcpdump || /bin/true ; " +
219 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
220 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
223 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
225 def clean_home(self):
226 self.info("Cleaning up home")
229 # "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
230 "find . -maxdepth 1 -name 'nepi-*' " +
231 " -execdir rm -rf {} + "
235 cmd = "cd %s ; " % self.home + cmd
238 (out, err), proc = self.execute(cmd, with_lock = True)
240 def upload(self, src, dst, text = False):
241 """ Copy content to destination
243 src content to copy. Can be a local file, directory or a list of files
245 dst destination path on the remote host (remote is always self.host)
247 text src is text input, it must be stored into a temp file before uploading
249 # If source is a string input
251 if text and not os.path.isfile(src):
252 # src is text input that should be uploaded as file
253 # create a temporal file with the content to upload
254 f = tempfile.NamedTemporaryFile(delete=False)
259 if not self.localhost:
260 # Build destination as <user>@<server>:<path>
261 dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
263 result = self.copy(src, dst)
271 def download(self, src, dst):
272 if not self.localhost:
273 # Build destination as <user>@<server>:<path>
274 src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
275 return self.copy(src, dst)
277 def install_packages(self, packages, home):
279 if self.os in ["f12", "f14"]:
280 command = rpmfuncs.install_packages_command(self.os, packages)
281 elif self.os in ["debian", "ubuntu"]:
282 command = debfuncs.install_packages_command(self.os, packages)
284 msg = "Error installing packages ( OS not known ) "
285 self.error(msg, self.os)
286 raise RuntimeError, msg
289 (out, err), proc = self.run_and_wait(command, home,
290 shfile = "instpkg.sh",
291 pidfile = "instpkg_pidfile",
292 ecodefile = "instpkg_exitcode",
293 stdout = "instpkg_stdout",
294 stderr = "instpkg_stderr",
295 raise_on_error = True)
297 return (out, err), proc
299 def remove_packages(self, packages, home):
301 if self.os in ["f12", "f14"]:
302 command = rpmfuncs.remove_packages_command(self.os, packages)
303 elif self.os in ["debian", "ubuntu"]:
304 command = debfuncs.remove_packages_command(self.os, packages)
306 msg = "Error removing packages ( OS not known ) "
308 raise RuntimeError, msg
311 (out, err), proc = self.run_and_wait(command, home,
313 pidfile = "rmpkg_pidfile",
314 ecodefile = "rmpkg_exitcode",
315 stdout = "rmpkg_stdout",
316 stderr = "rmpkg_stderr",
317 raise_on_error = True)
319 return (out, err), proc
321 def mkdir(self, path, clean = False):
325 return self.execute("mkdir -p %s" % path, with_lock = True)
327 def rmdir(self, path):
328 return self.execute("rm -rf %s" % path, with_lock = True)
330 def run_and_wait(self, command, home,
333 ecodefile = "exitcode",
339 raise_on_error = False):
341 runs a command in background on the remote host, busy-waiting
342 until the command finishes execution.
343 This is more robust than doing a simple synchronized 'execute',
344 since in the remote host the command can continue to run detached
345 even if network disconnections occur
347 self.upload_command(command, home, shfile, ecodefile)
349 command = "bash ./%s" % shfile
350 # run command in background in remote host
351 (out, err), proc = self.run(command, home,
359 # check no errors occurred
360 if proc.poll() and err:
361 msg = " Failed to run command '%s' " % command
362 self.error(msg, out, err)
364 raise RuntimeError, msg
366 # Wait for pid file to be generated
367 pid, ppid = self.wait_pid(
370 raise_on_error = raise_on_error)
372 # wait until command finishes to execute
373 self.wait_run(pid, ppid)
375 (out, err), proc = self.check_errors(home, ecodefile, stderr)
377 # Out is what was written in the stderr file
379 msg = " Failed to run command '%s' " % command
380 self.error(msg, out, err)
383 raise RuntimeError, msg
385 return (out, err), proc
387 def exitcode(self, home, ecodefile = "exitcode"):
389 Get the exit code of an application.
390 Returns an integer value with the exit code
392 (out, err), proc = self.check_output(home, ecodefile)
394 # Succeeded to open file, return exit code in the file
397 return int(out.strip())
399 # Error in the content of the file!
400 return ExitCode.CORRUPTFILE
402 # No such file or directory
403 if proc.returncode == 1:
404 return ExitCode.FILENOTFOUND
406 # Other error from 'cat'
407 return ExitCode.ERROR
409 def upload_command(self, command, home,
411 ecodefile = "exitcode",
414 command = "{ ( %(command)s ) ; } ; echo $? > %(ecodefile)s " % {
416 'ecodefile': ecodefile,
422 for var in env.split(" "):
423 environ += 'export %s\n' % var
425 command = environ + command
427 dst = os.path.join(home, shfile)
428 return self.upload(command, dst, text = True)
430 def check_errors(self, home,
431 ecodefile = "exitcode",
434 Checks whether errors occurred while running a command.
435 It first checks the exit code for the command, and only if the
436 exit code is an error one it returns the error output.
442 ecode = self.exitcode(home, ecodefile)
444 if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
445 err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
446 elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
447 # The process returned an error code or didn't exist.
448 # Check standard error.
449 (out, err), proc = self.check_output(home, stderr)
451 # If the stderr file was not found, assume nothing happened.
452 # We just ignore the error.
453 if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: # cat - No such file or directory
456 return (out, err), proc
458 def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
459 """ Waits until the pid file for the command is generated,
460 and returns the pid and ppid of the process """
465 pidtuple = self.getpid(home = home, pidfile = pidfile)
474 msg = " Failed to get pid for pidfile %s/%s " % (
479 raise RuntimeError, msg
483 def wait_run(self, pid, ppid, trial = 0):
484 """ wait for a remote process to finish execution """
488 status = self.status(pid, ppid)
490 if status is ProcStatus.FINISHED:
492 elif status is not ProcStatus.RUNNING:
495 # If it takes more than 20 seconds to start, then
496 # asume something went wrong
500 # The app is running, just wait...
503 def check_output(self, home, filename):
504 """ Retrives content of file """
505 (out, err), proc = self.execute("cat %s" %
506 os.path.join(home, filename), retry = 1, with_lock = True)
507 return (out, err), proc
515 # TODO: FIX NOT ALIVE!!!!
516 (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5,
520 trace = traceback.format_exc()
521 msg = "Unresponsive host %s " % err
522 self.error(msg, out, trace)
525 if out.strip().startswith('ALIVE'):
528 msg = "Unresponsive host "
529 self.error(msg, out, err)
532 def copy(self, src, dst):
534 (out, err), proc = execfuncs.lcopy(source, dest,
536 strict_host_checking = False)
539 (out, err), proc = sshfuncs.rcopy(
541 port = self.get("port"),
542 identity = self.get("identity"),
543 server_key = self.get("serverKey"),
545 strict_host_checking = False)
547 return (out, err), proc
549 def execute(self, command,
557 err_on_timeout = True,
558 connect_timeout = 30,
559 strict_host_checking = False,
563 """ Notice that this invocation will block until the
564 execution finishes. If this is not the desired behavior,
565 use 'run' instead."""
568 (out, err), proc = execfuncs.lexec(command,
576 (out, err), proc = sshfuncs.rexec(
578 host = self.get("hostname"),
579 user = self.get("username"),
580 port = self.get("port"),
584 identity = self.get("identity"),
585 server_key = self.get("serverKey"),
588 forward_x11 = forward_x11,
591 err_on_timeout = err_on_timeout,
592 connect_timeout = connect_timeout,
593 persistent = persistent,
594 strict_host_checking = strict_host_checking
597 (out, err), proc = sshfuncs.rexec(
599 host = self.get("hostname"),
600 user = self.get("username"),
601 port = self.get("port"),
605 identity = self.get("identity"),
606 server_key = self.get("serverKey"),
609 forward_x11 = forward_x11,
612 err_on_timeout = err_on_timeout,
613 connect_timeout = connect_timeout,
614 persistent = persistent
617 return (out, err), proc
619 def run(self, command, home,
628 self.debug("Running command '%s'" % command)
631 (out, err), proc = execfuncs.lspawn(command, pidfile,
636 create_home = create_home,
641 (out, err), proc = sshfuncs.rspawn(
645 create_home = create_home,
646 stdin = stdin if stdin is not None else '/dev/null',
647 stdout = stdout if stdout else '/dev/null',
648 stderr = stderr if stderr else '/dev/null',
650 host = self.get("hostname"),
651 user = self.get("username"),
652 port = self.get("port"),
654 identity = self.get("identity"),
655 server_key = self.get("serverKey"),
659 return (out, err), proc
661 def getpid(self, home, pidfile = "pidfile"):
663 pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
666 pidtuple = sshfuncs.rgetpid(
667 os.path.join(home, pidfile),
668 host = self.get("hostname"),
669 user = self.get("username"),
670 port = self.get("port"),
672 identity = self.get("identity"),
673 server_key = self.get("serverKey")
678 def status(self, pid, ppid):
680 status = execfuncs.lstatus(pid, ppid)
683 status = sshfuncs.rstatus(
685 host = self.get("hostname"),
686 user = self.get("username"),
687 port = self.get("port"),
689 identity = self.get("identity"),
690 server_key = self.get("serverKey")
695 def kill(self, pid, ppid, sudo = False):
698 status = self.status(pid, ppid)
700 if status == sshfuncs.ProcStatus.RUNNING:
702 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
705 (out, err), proc = sshfuncs.rkill(
707 host = self.get("hostname"),
708 user = self.get("username"),
709 port = self.get("port"),
712 identity = self.get("identity"),
713 server_key = self.get("serverKey")
716 return (out, err), proc