1 from neco.execution.attribute import Attribute, Flags
2 from neco.execution.resource import ResourceManager, clsinit, ResourceState
3 from neco.resources.linux import rpmfuncs, debfuncs
4 from neco.util import sshfuncs, execfuncs
15 # TODO: Verify files and dirs exists already
16 # TODO: Blacklist nodes!
17 # TODO: Unify delays!!
19 reschedule_delay = "0.5s"
23 class LinuxNode(ResourceManager):
27 def _register_attributes(cls):
28 hostname = Attribute("hostname", "Hostname of the machine",
29 flags = Flags.ExecReadOnly)
31 username = Attribute("username", "Local account username",
32 flags = Flags.Credential)
34 port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
36 home = Attribute("home",
37 "Experiment home directory to store all experiment related files",
38 flags = Flags.ExecReadOnly)
40 identity = Attribute("identity", "SSH identity file",
41 flags = Flags.Credential)
43 server_key = Attribute("serverKey", "Server public key",
44 flags = Flags.ExecReadOnly)
46 clean_home = Attribute("cleanHome", "Remove all files and directories " + \
47 " from home folder before starting experiment",
48 flags = Flags.ExecReadOnly)
50 clean_processes = Attribute("cleanProcesses",
51 "Kill all running processes before starting experiment",
52 flags = Flags.ExecReadOnly)
54 tear_down = Attribute("tearDown", "Bash script to be executed before " + \
55 "releasing the resource",
56 flags = Flags.ExecReadOnly)
58 cls._register_attribute(hostname)
59 cls._register_attribute(username)
60 cls._register_attribute(port)
61 cls._register_attribute(home)
62 cls._register_attribute(identity)
63 cls._register_attribute(server_key)
64 cls._register_attribute(clean_home)
65 cls._register_attribute(clean_processes)
66 cls._register_attribute(tear_down)
68 def __init__(self, ec, guid):
69 super(LinuxNode, self).__init__(ec, guid)
72 # lock to avoid concurrency issues on methods used by applications
73 self._lock = threading.Lock()
75 self._logger = logging.getLogger("LinuxNode")
77 def log_message(self, msg):
78 return " guid %d - host %s - %s " % (self.guid,
79 self.get("hostname"), msg)
83 return self.get("home") or ""
87 return os.path.join(self.home, self.ec.exp_id)
91 node_home = "node-%d" % self.guid
92 return os.path.join(self.exp_home, node_home)
99 if (not self.get("hostname") or not self.get("username")):
100 msg = "Can't resolve OS, insufficient data "
102 raise RuntimeError, msg
104 (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
106 if err and proc.poll():
107 msg = "Error detecting OS "
108 self.error(msg, out, err)
109 raise RuntimeError, "%s - %s - %s" %( msg, out, err )
111 if out.find("Fedora release 12") == 0:
113 elif out.find("Fedora release 14") == 0:
115 elif out.find("Debian") == 0:
117 elif out.find("Ubuntu") ==0:
120 msg = "Unsupported OS"
122 raise RuntimeError, "%s - %s " %( msg, out )
128 return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
130 def provision(self, filters = None):
131 if not self.is_alive():
132 self._state = ResourceState.FAILED
133 self.error("Deploy failed. Unresponsive node")
136 if self.get("cleanProcesses"):
137 self.clean_processes()
139 if self.get("cleanHome"):
142 self.mkdir(self.node_home)
144 super(LinuxNode, self).provision()
147 if self.state == ResourceState.NEW:
152 self._state = ResourceState.FAILED
155 # Node needs to wait until all associated interfaces are
156 # ready before it can finalize deployment
157 from neco.resources.linux.interface import LinuxInterface
158 ifaces = self.get_connected(LinuxInterface.rtype())
160 if iface.state < ResourceState.READY:
161 self.ec.schedule(reschedule_delay, self.deploy)
164 super(LinuxNode, self).deploy()
167 tear_down = self.get("tearDown")
169 self.execute(tear_down)
171 super(LinuxNode, self).release()
173 def valid_connection(self, guid):
177 def clean_processes(self, killer = False):
178 self.info("Cleaning up processes")
182 cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
183 "sudo -S killall python tcpdump || /bin/true ; " +
184 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
185 "sudo -S killall -u root || /bin/true ; " +
186 "sudo -S killall -u root || /bin/true ; ")
189 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
190 "sudo -S killall tcpdump || /bin/true ; " +
191 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
192 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
195 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
197 def clean_home(self):
198 self.info("Cleaning up home")
201 # "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
202 "find . -maxdepth 1 -name 'nepi-*' " +
203 " -execdir rm -rf {} + "
207 cmd = "cd %s ; " % self.home + cmd
210 (out, err), proc = self.execute(cmd, with_lock = True)
212 def upload(self, src, dst, text = False):
213 """ Copy content to destination
215 src content to copy. Can be a local file, directory or a list of files
217 dst destination path on the remote host (remote is always self.host)
219 text src is text input, it must be stored into a temp file before uploading
221 # If source is a string input
223 if text and not os.path.isfile(src):
224 # src is text input that should be uploaded as file
225 # create a temporal file with the content to upload
226 f = tempfile.NamedTemporaryFile(delete=False)
231 if not self.localhost:
232 # Build destination as <user>@<server>:<path>
233 dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
235 result = self.copy(src, dst)
243 def download(self, src, dst):
244 if not self.localhost:
245 # Build destination as <user>@<server>:<path>
246 src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
247 return self.copy(src, dst)
249 def install_packages(self, packages, home = None):
250 home = home or self.node_home
253 if self.os in ["f12", "f14"]:
254 cmd = rpmfuncs.install_packages_command(self.os, packages)
255 elif self.os in ["debian", "ubuntu"]:
256 cmd = debfuncs.install_packages_command(self.os, packages)
258 msg = "Error installing packages ( OS not known ) "
259 self.error(msg, self.os)
260 raise RuntimeError, msg
263 (out, err), proc = self.run_and_wait(cmd, home,
264 pidfile = "instpkg_pid",
265 stdout = "instpkg_out",
266 stderr = "instpkg_err",
267 raise_on_error = True)
269 return (out, err), proc
271 def remove_packages(self, packages, home = None):
272 home = home or self.node_home
275 if self.os in ["f12", "f14"]:
276 cmd = rpmfuncs.remove_packages_command(self.os, packages)
277 elif self.os in ["debian", "ubuntu"]:
278 cmd = debfuncs.remove_packages_command(self.os, packages)
280 msg = "Error removing packages ( OS not known ) "
282 raise RuntimeError, msg
285 (out, err), proc = self.run_and_wait(cmd, home,
286 pidfile = "rmpkg_pid",
287 stdout = "rmpkg_out",
288 stderr = "rmpkg_err",
289 raise_on_error = True)
291 return (out, err), proc
293 def mkdir(self, path, clean = False):
297 return self.execute("mkdir -p %s" % path, with_lock = True)
299 def rmdir(self, path):
300 return self.execute("rm -rf %s" % path, with_lock = True)
302 def run_and_wait(self, command,
310 raise_on_error = False):
311 """ runs a command in background on the remote host, but waits
312 until the command finishes execution.
313 This is more robust than doing a simple synchronized 'execute',
314 since in the remote host the command can continue to run detached
315 even if network disconnections occur
317 # run command in background in remote host
318 (out, err), proc = self.run(command, home,
326 # check no errors occurred
327 if proc.poll() and err:
328 msg = " Failed to run command '%s' " % command
329 self.error(msg, out, err)
331 raise RuntimeError, msg
333 # Wait for pid file to be generated
334 pid, ppid = self.wait_pid(
337 raise_on_error = raise_on_error)
339 # wait until command finishes to execute
340 self.wait_run(pid, ppid)
342 # check if execution errors occurred
343 (out, err), proc = self.check_output(home, stderr)
346 msg = " Failed to run command '%s' " % command
347 self.error(msg, out, err)
350 raise RuntimeError, msg
352 return (out, err), proc
354 def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
355 """ Waits until the pid file for the command is generated,
356 and returns the pid and ppid of the process """
360 pidtuple = self.checkpid(home = home, pidfile = pidfile)
367 delay = min(30,delay*1.2)
369 msg = " Failed to get pid for pidfile %s/%s " % (
374 raise RuntimeError, msg
378 def wait_run(self, pid, ppid, trial = 0):
379 """ wait for a remote process to finish execution """
385 status = self.status(pid, ppid)
387 if status is sshfuncs.FINISHED:
389 elif status is not sshfuncs.RUNNING:
391 time.sleep(delay*(5.5+random.random()))
398 time.sleep(delay*(0.5+random.random()))
399 delay = min(30,delay*1.2)
402 def check_output(self, home, filename):
403 """ checks file content """
404 (out, err), proc = self.execute("cat %s" %
405 os.path.join(home, filename), retry = 1, with_lock = True)
406 return (out, err), proc
414 (out, err), proc = self.execute("echo 'ALIVE'", with_lock = True)
417 trace = traceback.format_exc()
418 msg = "Unresponsive host "
419 self.warn(msg, out, trace)
422 if out.strip().startswith('ALIVE'):
425 msg = "Unresponsive host "
426 self.warn(msg, out, err)
430 #if self.check_bad_host(out,err):
433 def copy(self, src, dst):
435 (out, err), proc = execfuncs.lcopy(source, dest,
439 (out, err), proc = sshfuncs.rcopy(
441 port = self.get("port"),
442 identity = self.get("identity"),
443 server_key = self.get("serverKey"),
446 return (out, err), proc
448 def execute(self, command,
456 err_on_timeout = True,
457 connect_timeout = 30,
461 """ Notice that this invocation will block until the
462 execution finishes. If this is not the desired behavior,
463 use 'run' instead."""
466 (out, err), proc = execfuncs.lexec(command,
474 (out, err), proc = sshfuncs.rexec(
476 host = self.get("hostname"),
477 user = self.get("username"),
478 port = self.get("port"),
482 identity = self.get("identity"),
483 server_key = self.get("serverKey"),
486 forward_x11 = forward_x11,
489 err_on_timeout = err_on_timeout,
490 connect_timeout = connect_timeout,
491 persistent = persistent
494 (out, err), proc = sshfuncs.rexec(
496 host = self.get("hostname"),
497 user = self.get("username"),
498 port = self.get("port"),
502 identity = self.get("identity"),
503 server_key = self.get("serverKey"),
506 forward_x11 = forward_x11,
509 err_on_timeout = err_on_timeout,
510 connect_timeout = connect_timeout,
511 persistent = persistent
514 return (out, err), proc
516 def run(self, command,
526 self.debug("Running command '%s'" % command)
529 (out, err), proc = execfuncs.lspawn(command, pidfile,
534 create_home = create_home,
538 # Start process in a "daemonized" way, using nohup and heavy
539 # stdin/out redirection to avoid connection issues
541 (out,err), proc = sshfuncs.rspawn(
545 create_home = create_home,
546 stdin = stdin if stdin is not None else '/dev/null',
547 stdout = stdout if stdout else '/dev/null',
548 stderr = stderr if stderr else '/dev/null',
550 host = self.get("hostname"),
551 user = self.get("username"),
552 port = self.get("port"),
554 identity = self.get("identity"),
555 server_key = self.get("serverKey"),
559 return (out, err), proc
561 def checkpid(self, home = ".", pidfile = "pid"):
563 pidtuple = execfuncs.lcheckpid(os.path.join(home, pidfile))
566 pidtuple = sshfuncs.rcheckpid(
567 os.path.join(home, pidfile),
568 host = self.get("hostname"),
569 user = self.get("username"),
570 port = self.get("port"),
572 identity = self.get("identity"),
573 server_key = self.get("serverKey")
578 def status(self, pid, ppid):
580 status = execfuncs.lstatus(pid, ppid)
583 status = sshfuncs.rstatus(
585 host = self.get("hostname"),
586 user = self.get("username"),
587 port = self.get("port"),
589 identity = self.get("identity"),
590 server_key = self.get("serverKey")
595 def kill(self, pid, ppid, sudo = False):
598 status = self.status(pid, ppid)
600 if status == sshfuncs.RUNNING:
602 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
605 (out, err), proc = sshfuncs.rkill(
607 host = self.get("hostname"),
608 user = self.get("username"),
609 port = self.get("port"),
612 identity = self.get("identity"),
613 server_key = self.get("serverKey")
615 return (out, err), proc
617 def check_bad_host(self, out, err):
618 badre = re.compile(r'(?:'
619 r'|Error: disk I/O error'
622 return badre.search(out) or badre.search(err)
626 self.warn(" Blacklisting malfunctioning node ")
628 #util.appendBlacklist(self.hostname)