2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags
21 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
22 from nepi.resources.linux import rpmfuncs, debfuncs
23 from nepi.util import sshfuncs, execfuncs
33 # TODO: Verify files and dirs exists already
34 # TODO: Blacklist nodes!
35 # TODO: Unify delays!!
36 # TODO: Validate outcome of uploads!!
38 reschedule_delay = "0.5s"
42 class LinuxNode(ResourceManager):
46 def _register_attributes(cls):
47 hostname = Attribute("hostname", "Hostname of the machine",
48 flags = Flags.ExecReadOnly)
50 username = Attribute("username", "Local account username",
51 flags = Flags.Credential)
53 port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
55 home = Attribute("home",
56 "Experiment home directory to store all experiment related files",
57 flags = Flags.ExecReadOnly)
59 identity = Attribute("identity", "SSH identity file",
60 flags = Flags.Credential)
62 server_key = Attribute("serverKey", "Server public key",
63 flags = Flags.ExecReadOnly)
65 clean_home = Attribute("cleanHome", "Remove all files and directories " + \
66 " from home folder before starting experiment",
67 flags = Flags.ExecReadOnly)
69 clean_processes = Attribute("cleanProcesses",
70 "Kill all running processes before starting experiment",
71 flags = Flags.ExecReadOnly)
73 tear_down = Attribute("tearDown", "Bash script to be executed before " + \
74 "releasing the resource",
75 flags = Flags.ExecReadOnly)
77 cls._register_attribute(hostname)
78 cls._register_attribute(username)
79 cls._register_attribute(port)
80 cls._register_attribute(home)
81 cls._register_attribute(identity)
82 cls._register_attribute(server_key)
83 cls._register_attribute(clean_home)
84 cls._register_attribute(clean_processes)
85 cls._register_attribute(tear_down)
87 def __init__(self, ec, guid):
88 super(LinuxNode, self).__init__(ec, guid)
91 # lock to avoid concurrency issues on methods used by applications
92 self._lock = threading.Lock()
94 def log_message(self, msg):
95 return " guid %d - host %s - %s " % (self.guid,
96 self.get("hostname"), msg)
100 return self.get("home") or ""
104 return os.path.join(self.home, self.ec.exp_id)
108 node_home = "node-%d" % self.guid
109 return os.path.join(self.exp_home, node_home)
116 if (not self.get("hostname") or not self.get("username")):
117 msg = "Can't resolve OS, insufficient data "
119 raise RuntimeError, msg
121 (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
123 if err and proc.poll():
124 msg = "Error detecting OS "
125 self.error(msg, out, err)
126 raise RuntimeError, "%s - %s - %s" %( msg, out, err )
128 if out.find("Fedora release 12") == 0:
130 elif out.find("Fedora release 14") == 0:
132 elif out.find("Debian") == 0:
134 elif out.find("Ubuntu") ==0:
137 msg = "Unsupported OS"
139 raise RuntimeError, "%s - %s " %( msg, out )
145 return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
148 if not self.is_alive():
149 self._state = ResourceState.FAILED
150 msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
152 raise RuntimeError, msg
154 if self.get("cleanProcesses"):
155 self.clean_processes()
157 if self.get("cleanHome"):
160 self.mkdir(self.node_home)
162 super(LinuxNode, self).provision()
165 if self.state == ResourceState.NEW:
170 self._state = ResourceState.FAILED
173 # Node needs to wait until all associated interfaces are
174 # ready before it can finalize deployment
175 from nepi.resources.linux.interface import LinuxInterface
176 ifaces = self.get_connected(LinuxInterface.rtype())
178 if iface.state < ResourceState.READY:
179 self.ec.schedule(reschedule_delay, self.deploy)
182 super(LinuxNode, self).deploy()
185 tear_down = self.get("tearDown")
187 self.execute(tear_down)
189 super(LinuxNode, self).release()
191 def valid_connection(self, guid):
195 def clean_processes(self, killer = False):
196 self.info("Cleaning up processes")
200 cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
201 "sudo -S killall python tcpdump || /bin/true ; " +
202 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
203 "sudo -S killall -u root || /bin/true ; " +
204 "sudo -S killall -u root || /bin/true ; ")
207 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
208 "sudo -S killall tcpdump || /bin/true ; " +
209 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
210 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
213 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
215 def clean_home(self):
216 self.info("Cleaning up home")
219 # "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
220 "find . -maxdepth 1 -name 'nepi-*' " +
221 " -execdir rm -rf {} + "
225 cmd = "cd %s ; " % self.home + cmd
228 (out, err), proc = self.execute(cmd, with_lock = True)
230 def upload(self, src, dst, text = False):
231 """ Copy content to destination
233 src content to copy. Can be a local file, directory or a list of files
235 dst destination path on the remote host (remote is always self.host)
237 text src is text input, it must be stored into a temp file before uploading
239 # If source is a string input
241 if text and not os.path.isfile(src):
242 # src is text input that should be uploaded as file
243 # create a temporal file with the content to upload
244 f = tempfile.NamedTemporaryFile(delete=False)
249 if not self.localhost:
250 # Build destination as <user>@<server>:<path>
251 dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
253 result = self.copy(src, dst)
261 def download(self, src, dst):
262 if not self.localhost:
263 # Build destination as <user>@<server>:<path>
264 src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
265 return self.copy(src, dst)
267 def install_packages(self, packages, home = None):
268 home = home or self.node_home
271 if self.os in ["f12", "f14"]:
272 cmd = rpmfuncs.install_packages_command(self.os, packages)
273 elif self.os in ["debian", "ubuntu"]:
274 cmd = debfuncs.install_packages_command(self.os, packages)
276 msg = "Error installing packages ( OS not known ) "
277 self.error(msg, self.os)
278 raise RuntimeError, msg
281 (out, err), proc = self.run_and_wait(cmd, home,
282 pidfile = "instpkg_pid",
283 stdout = "instpkg_out",
284 stderr = "instpkg_err",
285 raise_on_error = True)
287 return (out, err), proc
289 def remove_packages(self, packages, home = None):
290 home = home or self.node_home
293 if self.os in ["f12", "f14"]:
294 cmd = rpmfuncs.remove_packages_command(self.os, packages)
295 elif self.os in ["debian", "ubuntu"]:
296 cmd = debfuncs.remove_packages_command(self.os, packages)
298 msg = "Error removing packages ( OS not known ) "
300 raise RuntimeError, msg
303 (out, err), proc = self.run_and_wait(cmd, home,
304 pidfile = "rmpkg_pid",
305 stdout = "rmpkg_out",
306 stderr = "rmpkg_err",
307 raise_on_error = True)
309 return (out, err), proc
311 def mkdir(self, path, clean = False):
315 return self.execute("mkdir -p %s" % path, with_lock = True)
317 def rmdir(self, path):
318 return self.execute("rm -rf %s" % path, with_lock = True)
320 def run_and_wait(self, command,
328 raise_on_error = False):
329 """ runs a command in background on the remote host, but waits
330 until the command finishes execution.
331 This is more robust than doing a simple synchronized 'execute',
332 since in the remote host the command can continue to run detached
333 even if network disconnections occur
335 # run command in background in remote host
336 (out, err), proc = self.run(command, home,
344 # check no errors occurred
345 if proc.poll() and err:
346 msg = " Failed to run command '%s' " % command
347 self.error(msg, out, err)
349 raise RuntimeError, msg
351 # Wait for pid file to be generated
352 pid, ppid = self.wait_pid(
355 raise_on_error = raise_on_error)
357 # wait until command finishes to execute
358 self.wait_run(pid, ppid)
360 # check if execution errors occurred
361 (out, err), proc = self.check_output(home, stderr)
364 msg = " Failed to run command '%s' " % command
365 self.error(msg, out, err)
368 raise RuntimeError, msg
370 return (out, err), proc
372 def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
373 """ Waits until the pid file for the command is generated,
374 and returns the pid and ppid of the process """
378 pidtuple = self.checkpid(home = home, pidfile = pidfile)
385 delay = min(30,delay*1.2)
387 msg = " Failed to get pid for pidfile %s/%s " % (
392 raise RuntimeError, msg
396 def wait_run(self, pid, ppid, trial = 0):
397 """ wait for a remote process to finish execution """
403 status = self.status(pid, ppid)
405 if status is sshfuncs.FINISHED:
407 elif status is not sshfuncs.RUNNING:
409 time.sleep(delay*(5.5+random.random()))
416 time.sleep(delay*(0.5+random.random()))
417 delay = min(30,delay*1.2)
420 def check_output(self, home, filename):
421 """ checks file content """
422 (out, err), proc = self.execute("cat %s" %
423 os.path.join(home, filename), retry = 1, with_lock = True)
424 return (out, err), proc
432 # TODO: FIX NOT ALIVE!!!!
433 (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5,
437 trace = traceback.format_exc()
438 msg = "Unresponsive host %s " % err
439 self.error(msg, out, trace)
442 if out.strip().startswith('ALIVE'):
445 msg = "Unresponsive host "
446 self.error(msg, out, err)
449 def copy(self, src, dst):
451 (out, err), proc = execfuncs.lcopy(source, dest,
453 strict_host_checking = False)
456 (out, err), proc = sshfuncs.rcopy(
458 port = self.get("port"),
459 identity = self.get("identity"),
460 server_key = self.get("serverKey"),
462 strict_host_checking = False)
464 return (out, err), proc
466 def execute(self, command,
474 err_on_timeout = True,
475 connect_timeout = 30,
476 strict_host_checking = False,
480 """ Notice that this invocation will block until the
481 execution finishes. If this is not the desired behavior,
482 use 'run' instead."""
485 (out, err), proc = execfuncs.lexec(command,
493 (out, err), proc = sshfuncs.rexec(
495 host = self.get("hostname"),
496 user = self.get("username"),
497 port = self.get("port"),
501 identity = self.get("identity"),
502 server_key = self.get("serverKey"),
505 forward_x11 = forward_x11,
508 err_on_timeout = err_on_timeout,
509 connect_timeout = connect_timeout,
510 persistent = persistent,
511 strict_host_checking = strict_host_checking
514 (out, err), proc = sshfuncs.rexec(
516 host = self.get("hostname"),
517 user = self.get("username"),
518 port = self.get("port"),
522 identity = self.get("identity"),
523 server_key = self.get("serverKey"),
526 forward_x11 = forward_x11,
529 err_on_timeout = err_on_timeout,
530 connect_timeout = connect_timeout,
531 persistent = persistent
534 return (out, err), proc
536 def run(self, command,
546 self.debug("Running command '%s'" % command)
549 (out, err), proc = execfuncs.lspawn(command, pidfile,
554 create_home = create_home,
558 # Start process in a "daemonized" way, using nohup and heavy
559 # stdin/out redirection to avoid connection issues
561 (out,err), proc = sshfuncs.rspawn(
565 create_home = create_home,
566 stdin = stdin if stdin is not None else '/dev/null',
567 stdout = stdout if stdout else '/dev/null',
568 stderr = stderr if stderr else '/dev/null',
570 host = self.get("hostname"),
571 user = self.get("username"),
572 port = self.get("port"),
574 identity = self.get("identity"),
575 server_key = self.get("serverKey"),
579 return (out, err), proc
581 def checkpid(self, home = ".", pidfile = "pid"):
583 pidtuple = execfuncs.lcheckpid(os.path.join(home, pidfile))
586 pidtuple = sshfuncs.rcheckpid(
587 os.path.join(home, pidfile),
588 host = self.get("hostname"),
589 user = self.get("username"),
590 port = self.get("port"),
592 identity = self.get("identity"),
593 server_key = self.get("serverKey")
598 def status(self, pid, ppid):
600 status = execfuncs.lstatus(pid, ppid)
603 status = sshfuncs.rstatus(
605 host = self.get("hostname"),
606 user = self.get("username"),
607 port = self.get("port"),
609 identity = self.get("identity"),
610 server_key = self.get("serverKey")
615 def kill(self, pid, ppid, sudo = False):
618 status = self.status(pid, ppid)
620 if status == sshfuncs.RUNNING:
622 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
625 (out, err), proc = sshfuncs.rkill(
627 host = self.get("hostname"),
628 user = self.get("username"),
629 port = self.get("port"),
632 identity = self.get("identity"),
633 server_key = self.get("serverKey")
635 return (out, err), proc
637 def check_bad_host(self, out, err):
638 badre = re.compile(r'(?:'
639 r'|Error: disk I/O error'
642 return badre.search(out) or badre.search(err)