1 from neco.execution.attribute import Attribute, Flags
2 from neco.execution.resource import ResourceManager, clsinit, ResourceState
3 from neco.resources.linux import rpmfuncs, debfuncs
4 from neco.util import sshfuncs, execfuncs
15 # TODO: Verify files and dirs exists already
16 # TODO: Blacklist nodes!
17 # TODO: Unify delays!!
18 # TODO: Validate outcome of uploads!!
20 reschedule_delay = "0.5s"
24 class LinuxNode(ResourceManager):
28 def _register_attributes(cls):
29 hostname = Attribute("hostname", "Hostname of the machine",
30 flags = Flags.ExecReadOnly)
32 username = Attribute("username", "Local account username",
33 flags = Flags.Credential)
35 port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
37 home = Attribute("home",
38 "Experiment home directory to store all experiment related files",
39 flags = Flags.ExecReadOnly)
41 identity = Attribute("identity", "SSH identity file",
42 flags = Flags.Credential)
44 server_key = Attribute("serverKey", "Server public key",
45 flags = Flags.ExecReadOnly)
47 clean_home = Attribute("cleanHome", "Remove all files and directories " + \
48 " from home folder before starting experiment",
49 flags = Flags.ExecReadOnly)
51 clean_processes = Attribute("cleanProcesses",
52 "Kill all running processes before starting experiment",
53 flags = Flags.ExecReadOnly)
55 tear_down = Attribute("tearDown", "Bash script to be executed before " + \
56 "releasing the resource",
57 flags = Flags.ExecReadOnly)
59 cls._register_attribute(hostname)
60 cls._register_attribute(username)
61 cls._register_attribute(port)
62 cls._register_attribute(home)
63 cls._register_attribute(identity)
64 cls._register_attribute(server_key)
65 cls._register_attribute(clean_home)
66 cls._register_attribute(clean_processes)
67 cls._register_attribute(tear_down)
69 def __init__(self, ec, guid):
70 super(LinuxNode, self).__init__(ec, guid)
73 # lock to avoid concurrency issues on methods used by applications
74 self._lock = threading.Lock()
76 self._logger = logging.getLogger("LinuxNode")
78 def log_message(self, msg):
79 return " guid %d - host %s - %s " % (self.guid,
80 self.get("hostname"), msg)
84 return self.get("home") or ""
88 return os.path.join(self.home, self.ec.exp_id)
92 node_home = "node-%d" % self.guid
93 return os.path.join(self.exp_home, node_home)
100 if (not self.get("hostname") or not self.get("username")):
101 msg = "Can't resolve OS, insufficient data "
103 raise RuntimeError, msg
105 (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
107 if err and proc.poll():
108 msg = "Error detecting OS "
109 self.error(msg, out, err)
110 raise RuntimeError, "%s - %s - %s" %( msg, out, err )
112 if out.find("Fedora release 12") == 0:
114 elif out.find("Fedora release 14") == 0:
116 elif out.find("Debian") == 0:
118 elif out.find("Ubuntu") ==0:
121 msg = "Unsupported OS"
123 raise RuntimeError, "%s - %s " %( msg, out )
129 return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
131 def provision(self, filters = None):
132 if not self.is_alive():
133 self._state = ResourceState.FAILED
134 msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
136 raise RuntimeError, msg
138 if self.get("cleanProcesses"):
139 self.clean_processes()
141 if self.get("cleanHome"):
144 self.mkdir(self.node_home)
146 super(LinuxNode, self).provision()
149 if self.state == ResourceState.NEW:
154 self._state = ResourceState.FAILED
157 # Node needs to wait until all associated interfaces are
158 # ready before it can finalize deployment
159 from neco.resources.linux.interface import LinuxInterface
160 ifaces = self.get_connected(LinuxInterface.rtype())
162 if iface.state < ResourceState.READY:
163 self.ec.schedule(reschedule_delay, self.deploy)
166 super(LinuxNode, self).deploy()
169 tear_down = self.get("tearDown")
171 self.execute(tear_down)
173 super(LinuxNode, self).release()
175 def valid_connection(self, guid):
179 def clean_processes(self, killer = False):
180 self.info("Cleaning up processes")
184 cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
185 "sudo -S killall python tcpdump || /bin/true ; " +
186 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
187 "sudo -S killall -u root || /bin/true ; " +
188 "sudo -S killall -u root || /bin/true ; ")
191 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
192 "sudo -S killall tcpdump || /bin/true ; " +
193 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
194 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
197 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
199 def clean_home(self):
200 self.info("Cleaning up home")
203 # "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
204 "find . -maxdepth 1 -name 'nepi-*' " +
205 " -execdir rm -rf {} + "
209 cmd = "cd %s ; " % self.home + cmd
212 (out, err), proc = self.execute(cmd, with_lock = True)
214 def upload(self, src, dst, text = False):
215 """ Copy content to destination
217 src content to copy. Can be a local file, directory or a list of files
219 dst destination path on the remote host (remote is always self.host)
221 text src is text input, it must be stored into a temp file before uploading
223 # If source is a string input
225 if text and not os.path.isfile(src):
226 # src is text input that should be uploaded as file
227 # create a temporal file with the content to upload
228 f = tempfile.NamedTemporaryFile(delete=False)
233 if not self.localhost:
234 # Build destination as <user>@<server>:<path>
235 dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
237 result = self.copy(src, dst)
245 def download(self, src, dst):
246 if not self.localhost:
247 # Build destination as <user>@<server>:<path>
248 src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
249 return self.copy(src, dst)
251 def install_packages(self, packages, home = None):
252 home = home or self.node_home
255 if self.os in ["f12", "f14"]:
256 cmd = rpmfuncs.install_packages_command(self.os, packages)
257 elif self.os in ["debian", "ubuntu"]:
258 cmd = debfuncs.install_packages_command(self.os, packages)
260 msg = "Error installing packages ( OS not known ) "
261 self.error(msg, self.os)
262 raise RuntimeError, msg
265 (out, err), proc = self.run_and_wait(cmd, home,
266 pidfile = "instpkg_pid",
267 stdout = "instpkg_out",
268 stderr = "instpkg_err",
269 raise_on_error = True)
271 return (out, err), proc
273 def remove_packages(self, packages, home = None):
274 home = home or self.node_home
277 if self.os in ["f12", "f14"]:
278 cmd = rpmfuncs.remove_packages_command(self.os, packages)
279 elif self.os in ["debian", "ubuntu"]:
280 cmd = debfuncs.remove_packages_command(self.os, packages)
282 msg = "Error removing packages ( OS not known ) "
284 raise RuntimeError, msg
287 (out, err), proc = self.run_and_wait(cmd, home,
288 pidfile = "rmpkg_pid",
289 stdout = "rmpkg_out",
290 stderr = "rmpkg_err",
291 raise_on_error = True)
293 return (out, err), proc
295 def mkdir(self, path, clean = False):
299 return self.execute("mkdir -p %s" % path, with_lock = True)
301 def rmdir(self, path):
302 return self.execute("rm -rf %s" % path, with_lock = True)
304 def run_and_wait(self, command,
312 raise_on_error = False):
313 """ runs a command in background on the remote host, but waits
314 until the command finishes execution.
315 This is more robust than doing a simple synchronized 'execute',
316 since in the remote host the command can continue to run detached
317 even if network disconnections occur
319 # run command in background in remote host
320 (out, err), proc = self.run(command, home,
328 # check no errors occurred
329 if proc.poll() and err:
330 msg = " Failed to run command '%s' " % command
331 self.error(msg, out, err)
333 raise RuntimeError, msg
335 # Wait for pid file to be generated
336 pid, ppid = self.wait_pid(
339 raise_on_error = raise_on_error)
341 # wait until command finishes to execute
342 self.wait_run(pid, ppid)
344 # check if execution errors occurred
345 (out, err), proc = self.check_output(home, stderr)
348 msg = " Failed to run command '%s' " % command
349 self.error(msg, out, err)
352 raise RuntimeError, msg
354 return (out, err), proc
356 def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
357 """ Waits until the pid file for the command is generated,
358 and returns the pid and ppid of the process """
362 pidtuple = self.checkpid(home = home, pidfile = pidfile)
369 delay = min(30,delay*1.2)
371 msg = " Failed to get pid for pidfile %s/%s " % (
376 raise RuntimeError, msg
380 def wait_run(self, pid, ppid, trial = 0):
381 """ wait for a remote process to finish execution """
387 status = self.status(pid, ppid)
389 if status is sshfuncs.FINISHED:
391 elif status is not sshfuncs.RUNNING:
393 time.sleep(delay*(5.5+random.random()))
400 time.sleep(delay*(0.5+random.random()))
401 delay = min(30,delay*1.2)
404 def check_output(self, home, filename):
405 """ checks file content """
406 (out, err), proc = self.execute("cat %s" %
407 os.path.join(home, filename), retry = 1, with_lock = True)
408 return (out, err), proc
416 (out, err), proc = self.execute("echo 'ALIVE'", retry = 5,
420 trace = traceback.format_exc()
421 msg = "Unresponsive host "
422 self.error(msg, out, trace)
425 if out.strip().startswith('ALIVE'):
428 msg = "Unresponsive host "
429 self.error(msg, out, err)
432 def copy(self, src, dst):
434 (out, err), proc = execfuncs.lcopy(source, dest,
436 strict_host_checking = False)
439 (out, err), proc = sshfuncs.rcopy(
441 port = self.get("port"),
442 identity = self.get("identity"),
443 server_key = self.get("serverKey"),
445 strict_host_checking = False)
447 return (out, err), proc
449 def execute(self, command,
457 err_on_timeout = True,
458 connect_timeout = 30,
459 strict_host_checking = False,
463 """ Notice that this invocation will block until the
464 execution finishes. If this is not the desired behavior,
465 use 'run' instead."""
468 (out, err), proc = execfuncs.lexec(command,
476 (out, err), proc = sshfuncs.rexec(
478 host = self.get("hostname"),
479 user = self.get("username"),
480 port = self.get("port"),
484 identity = self.get("identity"),
485 server_key = self.get("serverKey"),
488 forward_x11 = forward_x11,
491 err_on_timeout = err_on_timeout,
492 connect_timeout = connect_timeout,
493 persistent = persistent,
494 strict_host_checking = strict_host_checking
497 (out, err), proc = sshfuncs.rexec(
499 host = self.get("hostname"),
500 user = self.get("username"),
501 port = self.get("port"),
505 identity = self.get("identity"),
506 server_key = self.get("serverKey"),
509 forward_x11 = forward_x11,
512 err_on_timeout = err_on_timeout,
513 connect_timeout = connect_timeout,
514 persistent = persistent
517 return (out, err), proc
519 def run(self, command,
529 self.debug("Running command '%s'" % command)
532 (out, err), proc = execfuncs.lspawn(command, pidfile,
537 create_home = create_home,
541 # Start process in a "daemonized" way, using nohup and heavy
542 # stdin/out redirection to avoid connection issues
544 (out,err), proc = sshfuncs.rspawn(
548 create_home = create_home,
549 stdin = stdin if stdin is not None else '/dev/null',
550 stdout = stdout if stdout else '/dev/null',
551 stderr = stderr if stderr else '/dev/null',
553 host = self.get("hostname"),
554 user = self.get("username"),
555 port = self.get("port"),
557 identity = self.get("identity"),
558 server_key = self.get("serverKey"),
562 return (out, err), proc
564 def checkpid(self, home = ".", pidfile = "pid"):
566 pidtuple = execfuncs.lcheckpid(os.path.join(home, pidfile))
569 pidtuple = sshfuncs.rcheckpid(
570 os.path.join(home, pidfile),
571 host = self.get("hostname"),
572 user = self.get("username"),
573 port = self.get("port"),
575 identity = self.get("identity"),
576 server_key = self.get("serverKey")
581 def status(self, pid, ppid):
583 status = execfuncs.lstatus(pid, ppid)
586 status = sshfuncs.rstatus(
588 host = self.get("hostname"),
589 user = self.get("username"),
590 port = self.get("port"),
592 identity = self.get("identity"),
593 server_key = self.get("serverKey")
598 def kill(self, pid, ppid, sudo = False):
601 status = self.status(pid, ppid)
603 if status == sshfuncs.RUNNING:
605 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
608 (out, err), proc = sshfuncs.rkill(
610 host = self.get("hostname"),
611 user = self.get("username"),
612 port = self.get("port"),
615 identity = self.get("identity"),
616 server_key = self.get("serverKey")
618 return (out, err), proc
620 def check_bad_host(self, out, err):
621 badre = re.compile(r'(?:'
622 r'|Error: disk I/O error'
625 return badre.search(out) or badre.search(err)
629 self.warn(" Blacklisting malfunctioning node ")
631 #util.appendBlacklist(self.hostname)