2 NEPI, a framework to manage network experiments
3 Copyright (C) 2013 INRIA
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
22 from nepi.resources.linux import rpmfuncs, debfuncs
23 from nepi.util import sshfuncs, execfuncs
34 # TODO: Verify files and dirs exists already
35 # TODO: Blacklist nodes!
36 # TODO: Unify delays!!
37 # TODO: Validate outcome of uploads!!
39 reschedule_delay = "0.5s"
43 class LinuxNode(ResourceManager):
47 def _register_attributes(cls):
48 hostname = Attribute("hostname", "Hostname of the machine",
49 flags = Flags.ExecReadOnly)
51 username = Attribute("username", "Local account username",
52 flags = Flags.Credential)
54 port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
56 home = Attribute("home",
57 "Experiment home directory to store all experiment related files",
58 flags = Flags.ExecReadOnly)
60 identity = Attribute("identity", "SSH identity file",
61 flags = Flags.Credential)
63 server_key = Attribute("serverKey", "Server public key",
64 flags = Flags.ExecReadOnly)
66 clean_home = Attribute("cleanHome", "Remove all files and directories " + \
67 " from home folder before starting experiment",
68 flags = Flags.ExecReadOnly)
70 clean_processes = Attribute("cleanProcesses",
71 "Kill all running processes before starting experiment",
72 flags = Flags.ExecReadOnly)
74 tear_down = Attribute("tearDown", "Bash script to be executed before " + \
75 "releasing the resource",
76 flags = Flags.ExecReadOnly)
78 cls._register_attribute(hostname)
79 cls._register_attribute(username)
80 cls._register_attribute(port)
81 cls._register_attribute(home)
82 cls._register_attribute(identity)
83 cls._register_attribute(server_key)
84 cls._register_attribute(clean_home)
85 cls._register_attribute(clean_processes)
86 cls._register_attribute(tear_down)
88 def __init__(self, ec, guid):
89 super(LinuxNode, self).__init__(ec, guid)
92 # lock to avoid concurrency issues on methods used by applications
93 self._lock = threading.Lock()
95 self._logger = logging.getLogger("LinuxNode")
97 def log_message(self, msg):
98 return " guid %d - host %s - %s " % (self.guid,
99 self.get("hostname"), msg)
103 return self.get("home") or ""
107 return os.path.join(self.home, self.ec.exp_id)
111 node_home = "node-%d" % self.guid
112 return os.path.join(self.exp_home, node_home)
119 if (not self.get("hostname") or not self.get("username")):
120 msg = "Can't resolve OS, insufficient data "
122 raise RuntimeError, msg
124 (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
126 if err and proc.poll():
127 msg = "Error detecting OS "
128 self.error(msg, out, err)
129 raise RuntimeError, "%s - %s - %s" %( msg, out, err )
131 if out.find("Fedora release 12") == 0:
133 elif out.find("Fedora release 14") == 0:
135 elif out.find("Debian") == 0:
137 elif out.find("Ubuntu") ==0:
140 msg = "Unsupported OS"
142 raise RuntimeError, "%s - %s " %( msg, out )
148 return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
150 def provision(self, filters = None):
151 if not self.is_alive():
152 self._state = ResourceState.FAILED
153 msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
155 raise RuntimeError, msg
157 if self.get("cleanProcesses"):
158 self.clean_processes()
160 if self.get("cleanHome"):
163 self.mkdir(self.node_home)
165 super(LinuxNode, self).provision()
168 if self.state == ResourceState.NEW:
173 self._state = ResourceState.FAILED
176 # Node needs to wait until all associated interfaces are
177 # ready before it can finalize deployment
178 from nepi.resources.linux.interface import LinuxInterface
179 ifaces = self.get_connected(LinuxInterface.rtype())
181 if iface.state < ResourceState.READY:
182 self.ec.schedule(reschedule_delay, self.deploy)
185 super(LinuxNode, self).deploy()
188 tear_down = self.get("tearDown")
190 self.execute(tear_down)
192 super(LinuxNode, self).release()
194 def valid_connection(self, guid):
198 def clean_processes(self, killer = False):
199 self.info("Cleaning up processes")
203 cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
204 "sudo -S killall python tcpdump || /bin/true ; " +
205 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
206 "sudo -S killall -u root || /bin/true ; " +
207 "sudo -S killall -u root || /bin/true ; ")
210 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
211 "sudo -S killall tcpdump || /bin/true ; " +
212 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
213 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
216 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
218 def clean_home(self):
219 self.info("Cleaning up home")
222 # "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
223 "find . -maxdepth 1 -name 'nepi-*' " +
224 " -execdir rm -rf {} + "
228 cmd = "cd %s ; " % self.home + cmd
231 (out, err), proc = self.execute(cmd, with_lock = True)
233 def upload(self, src, dst, text = False):
234 """ Copy content to destination
236 src content to copy. Can be a local file, directory or a list of files
238 dst destination path on the remote host (remote is always self.host)
240 text src is text input, it must be stored into a temp file before uploading
242 # If source is a string input
244 if text and not os.path.isfile(src):
245 # src is text input that should be uploaded as file
246 # create a temporal file with the content to upload
247 f = tempfile.NamedTemporaryFile(delete=False)
252 if not self.localhost:
253 # Build destination as <user>@<server>:<path>
254 dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
256 result = self.copy(src, dst)
264 def download(self, src, dst):
265 if not self.localhost:
266 # Build destination as <user>@<server>:<path>
267 src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
268 return self.copy(src, dst)
270 def install_packages(self, packages, home = None):
271 home = home or self.node_home
274 if self.os in ["f12", "f14"]:
275 cmd = rpmfuncs.install_packages_command(self.os, packages)
276 elif self.os in ["debian", "ubuntu"]:
277 cmd = debfuncs.install_packages_command(self.os, packages)
279 msg = "Error installing packages ( OS not known ) "
280 self.error(msg, self.os)
281 raise RuntimeError, msg
284 (out, err), proc = self.run_and_wait(cmd, home,
285 pidfile = "instpkg_pid",
286 stdout = "instpkg_out",
287 stderr = "instpkg_err",
288 raise_on_error = True)
290 return (out, err), proc
292 def remove_packages(self, packages, home = None):
293 home = home or self.node_home
296 if self.os in ["f12", "f14"]:
297 cmd = rpmfuncs.remove_packages_command(self.os, packages)
298 elif self.os in ["debian", "ubuntu"]:
299 cmd = debfuncs.remove_packages_command(self.os, packages)
301 msg = "Error removing packages ( OS not known ) "
303 raise RuntimeError, msg
306 (out, err), proc = self.run_and_wait(cmd, home,
307 pidfile = "rmpkg_pid",
308 stdout = "rmpkg_out",
309 stderr = "rmpkg_err",
310 raise_on_error = True)
312 return (out, err), proc
314 def mkdir(self, path, clean = False):
318 return self.execute("mkdir -p %s" % path, with_lock = True)
320 def rmdir(self, path):
321 return self.execute("rm -rf %s" % path, with_lock = True)
323 def run_and_wait(self, command,
331 raise_on_error = False):
332 """ runs a command in background on the remote host, but waits
333 until the command finishes execution.
334 This is more robust than doing a simple synchronized 'execute',
335 since in the remote host the command can continue to run detached
336 even if network disconnections occur
338 # run command in background in remote host
339 (out, err), proc = self.run(command, home,
347 # check no errors occurred
348 if proc.poll() and err:
349 msg = " Failed to run command '%s' " % command
350 self.error(msg, out, err)
352 raise RuntimeError, msg
354 # Wait for pid file to be generated
355 pid, ppid = self.wait_pid(
358 raise_on_error = raise_on_error)
360 # wait until command finishes to execute
361 self.wait_run(pid, ppid)
363 # check if execution errors occurred
364 (out, err), proc = self.check_output(home, stderr)
367 msg = " Failed to run command '%s' " % command
368 self.error(msg, out, err)
371 raise RuntimeError, msg
373 return (out, err), proc
375 def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
376 """ Waits until the pid file for the command is generated,
377 and returns the pid and ppid of the process """
381 pidtuple = self.checkpid(home = home, pidfile = pidfile)
388 delay = min(30,delay*1.2)
390 msg = " Failed to get pid for pidfile %s/%s " % (
395 raise RuntimeError, msg
399 def wait_run(self, pid, ppid, trial = 0):
400 """ wait for a remote process to finish execution """
406 status = self.status(pid, ppid)
408 if status is sshfuncs.FINISHED:
410 elif status is not sshfuncs.RUNNING:
412 time.sleep(delay*(5.5+random.random()))
419 time.sleep(delay*(0.5+random.random()))
420 delay = min(30,delay*1.2)
423 def check_output(self, home, filename):
424 """ checks file content """
425 (out, err), proc = self.execute("cat %s" %
426 os.path.join(home, filename), retry = 1, with_lock = True)
427 return (out, err), proc
435 # TODO: FIX NOT ALIVE!!!!
436 (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5,
440 trace = traceback.format_exc()
441 msg = "Unresponsive host %s " % err
442 self.error(msg, out, trace)
445 if out.strip().startswith('ALIVE'):
448 msg = "Unresponsive host "
449 self.error(msg, out, err)
452 def copy(self, src, dst):
454 (out, err), proc = execfuncs.lcopy(source, dest,
456 strict_host_checking = False)
459 (out, err), proc = sshfuncs.rcopy(
461 port = self.get("port"),
462 identity = self.get("identity"),
463 server_key = self.get("serverKey"),
465 strict_host_checking = False)
467 return (out, err), proc
469 def execute(self, command,
477 err_on_timeout = True,
478 connect_timeout = 30,
479 strict_host_checking = False,
483 """ Notice that this invocation will block until the
484 execution finishes. If this is not the desired behavior,
485 use 'run' instead."""
488 (out, err), proc = execfuncs.lexec(command,
496 (out, err), proc = sshfuncs.rexec(
498 host = self.get("hostname"),
499 user = self.get("username"),
500 port = self.get("port"),
504 identity = self.get("identity"),
505 server_key = self.get("serverKey"),
508 forward_x11 = forward_x11,
511 err_on_timeout = err_on_timeout,
512 connect_timeout = connect_timeout,
513 persistent = persistent,
514 strict_host_checking = strict_host_checking
517 (out, err), proc = sshfuncs.rexec(
519 host = self.get("hostname"),
520 user = self.get("username"),
521 port = self.get("port"),
525 identity = self.get("identity"),
526 server_key = self.get("serverKey"),
529 forward_x11 = forward_x11,
532 err_on_timeout = err_on_timeout,
533 connect_timeout = connect_timeout,
534 persistent = persistent
537 return (out, err), proc
539 def run(self, command,
549 self.debug("Running command '%s'" % command)
552 (out, err), proc = execfuncs.lspawn(command, pidfile,
557 create_home = create_home,
561 # Start process in a "daemonized" way, using nohup and heavy
562 # stdin/out redirection to avoid connection issues
564 (out,err), proc = sshfuncs.rspawn(
568 create_home = create_home,
569 stdin = stdin if stdin is not None else '/dev/null',
570 stdout = stdout if stdout else '/dev/null',
571 stderr = stderr if stderr else '/dev/null',
573 host = self.get("hostname"),
574 user = self.get("username"),
575 port = self.get("port"),
577 identity = self.get("identity"),
578 server_key = self.get("serverKey"),
582 return (out, err), proc
584 def checkpid(self, home = ".", pidfile = "pid"):
586 pidtuple = execfuncs.lcheckpid(os.path.join(home, pidfile))
589 pidtuple = sshfuncs.rcheckpid(
590 os.path.join(home, pidfile),
591 host = self.get("hostname"),
592 user = self.get("username"),
593 port = self.get("port"),
595 identity = self.get("identity"),
596 server_key = self.get("serverKey")
601 def status(self, pid, ppid):
603 status = execfuncs.lstatus(pid, ppid)
606 status = sshfuncs.rstatus(
608 host = self.get("hostname"),
609 user = self.get("username"),
610 port = self.get("port"),
612 identity = self.get("identity"),
613 server_key = self.get("serverKey")
618 def kill(self, pid, ppid, sudo = False):
621 status = self.status(pid, ppid)
623 if status == sshfuncs.RUNNING:
625 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
628 (out, err), proc = sshfuncs.rkill(
630 host = self.get("hostname"),
631 user = self.get("username"),
632 port = self.get("port"),
635 identity = self.get("identity"),
636 server_key = self.get("serverKey")
638 return (out, err), proc
640 def check_bad_host(self, out, err):
641 badre = re.compile(r'(?:'
642 r'|Error: disk I/O error'
645 return badre.search(out) or badre.search(err)
649 self.warn(" Blacklisting malfunctioning node ")
651 #util.appendBlacklist(self.hostname)