1 from neco.execution.attribute import Attribute, Flags
2 from neco.execution.resource import ResourceManager, clsinit, ResourceState
3 from neco.resources.linux import rpmfuncs, debfuncs
4 from neco.util import sshfuncs, execfuncs
15 # TODO: Verify files and dirs exists already
16 # TODO: Blacklist nodes!
21 class LinuxNode(ResourceManager):
25 def _register_attributes(cls):
26 hostname = Attribute("hostname", "Hostname of the machine",
27 flags = Flags.ExecReadOnly)
29 username = Attribute("username", "Local account username",
30 flags = Flags.Credential)
32 port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
34 home = Attribute("home",
35 "Experiment home directory to store all experiment related files",
36 flags = Flags.ExecReadOnly)
38 identity = Attribute("identity", "SSH identity file",
39 flags = Flags.Credential)
41 server_key = Attribute("serverKey", "Server public key",
42 flags = Flags.ExecReadOnly)
44 clean_home = Attribute("cleanHome", "Remove all files and directories " + \
45 " from home folder before starting experiment",
46 flags = Flags.ExecReadOnly)
48 clean_processes = Attribute("cleanProcesses",
49 "Kill all running processes before starting experiment",
50 flags = Flags.ExecReadOnly)
52 tear_down = Attribute("tearDown", "Bash script to be executed before " + \
53 "releasing the resource",
54 flags = Flags.ExecReadOnly)
56 cls._register_attribute(hostname)
57 cls._register_attribute(username)
58 cls._register_attribute(port)
59 cls._register_attribute(home)
60 cls._register_attribute(identity)
61 cls._register_attribute(server_key)
62 cls._register_attribute(clean_home)
63 cls._register_attribute(clean_processes)
64 cls._register_attribute(tear_down)
66 def __init__(self, ec, guid):
67 super(LinuxNode, self).__init__(ec, guid)
70 # lock to avoid concurrency issues on methods used by applications
71 self._lock = threading.Lock()
73 self._logger = logging.getLogger("LinuxNode")
75 def log_message(self, msg):
76 return " guid %d - host %s - %s " % (self.guid,
77 self.get("hostname"), msg)
81 return self.get("home") or "/tmp"
85 exp_dir = os.path.join(self.home, self.ec.exp_id)
86 return exp_dir if exp_dir.startswith('/') else "${HOME}/"
90 node_dir = "node-%d" % self.guid
91 return os.path.join(self.exp_dir, node_dir)
98 if (not self.get("hostname") or not self.get("username")):
99 msg = "Can't resolve OS, insufficient data "
101 raise RuntimeError, msg
103 (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
105 if err and proc.poll():
106 msg = "Error detecting OS "
107 self.error(msg, out, err)
108 raise RuntimeError, "%s - %s - %s" %( msg, out, err )
110 if out.find("Fedora release 12") == 0:
112 elif out.find("Fedora release 14") == 0:
114 elif out.find("Debian") == 0:
116 elif out.find("Ubuntu") ==0:
119 msg = "Unsupported OS"
121 raise RuntimeError, "%s - %s " %( msg, out )
127 return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
129 def provision(self, filters = None):
130 if not self.is_alive():
131 self._state = ResourceState.FAILED
132 self.error("Deploy failed. Unresponsive node")
135 if self.get("cleanProcesses"):
136 self.clean_processes()
138 if self.get("cleanHome"):
141 self.mkdir(self.node_dir)
143 super(LinuxNode, self).provision()
146 if self.state == ResourceState.NEW:
151 self._state = ResourceState.FAILED
154 # Node needs to wait until all associated interfaces are
155 # ready before it can finalize deployment
156 from neco.resources.linux.interface import LinuxInterface
157 ifaces = self.get_connected(LinuxInterface.rtype())
159 if iface.state < ResourceState.READY:
160 self.ec.schedule(DELAY, self.deploy)
163 super(LinuxNode, self).deploy()
166 tear_down = self.get("tearDown")
168 self.execute(tear_down)
170 super(LinuxNode, self).release()
172 def valid_connection(self, guid):
176 def clean_processes(self, killer = False):
177 self.info("Cleaning up processes")
181 cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
182 "sudo -S killall python tcpdump || /bin/true ; " +
183 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
184 "sudo -S killall -u root || /bin/true ; " +
185 "sudo -S killall -u root || /bin/true ; ")
188 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
189 "sudo -S killall tcpdump || /bin/true ; " +
190 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
191 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
195 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
197 def clean_home(self):
198 self.info("Cleaning up home")
200 cmd = ("cd %s ; " % self.home +
201 "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)"+
202 " -execdir rm -rf {} + ")
205 (out, err), proc = self.execute(cmd, with_lock = True)
207 def upload(self, src, dst, text = False):
208 """ Copy content to destination
210 src content to copy. Can be a local file, directory or a list of files
212 dst destination path on the remote host (remote is always self.host)
214 text src is text input, it must be stored into a temp file before uploading
216 # If source is a string input
218 if text and not os.path.isfile(src):
219 # src is text input that should be uploaded as file
220 # create a temporal file with the content to upload
221 f = tempfile.NamedTemporaryFile(delete=False)
226 if not self.localhost:
227 # Build destination as <user>@<server>:<path>
228 dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
230 result = self.copy(src, dst)
238 def download(self, src, dst):
239 if not self.localhost:
240 # Build destination as <user>@<server>:<path>
241 src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
242 return self.copy(src, dst)
244 def install_packages(self, packages, home = None):
245 home = home or self.node_dir
248 if self.os in ["f12", "f14"]:
249 cmd = rpmfuncs.install_packages_command(self.os, packages)
250 elif self.os in ["debian", "ubuntu"]:
251 cmd = debfuncs.install_packages_command(self.os, packages)
253 msg = "Error installing packages ( OS not known ) "
254 self.error(msg, self.os)
255 raise RuntimeError, msg
258 (out, err), proc = self.run_and_wait(cmd, home,
259 pidfile = "instpkg_pid",
260 stdout = "instpkg_log",
261 stderr = "instpkg_err",
262 raise_on_error = True)
264 return (out, err), proc
266 def remove_packages(self, packages, home = None):
267 home = home or self.node_dir
270 if self.os in ["f12", "f14"]:
271 cmd = rpmfuncs.remove_packages_command(self.os, packages)
272 elif self.os in ["debian", "ubuntu"]:
273 cmd = debfuncs.remove_packages_command(self.os, packages)
275 msg = "Error removing packages ( OS not known ) "
277 raise RuntimeError, msg
280 (out, err), proc = self.run_and_wait(cmd, home,
281 pidfile = "rmpkg_pid",
282 stdout = "rmpkg_log",
283 stderr = "rmpkg_err",
284 raise_on_error = True)
286 return (out, err), proc
288 def mkdir(self, path, clean = False):
292 return self.execute("mkdir -p %s" % path, with_lock = True)
294 def rmdir(self, path):
295 return self.execute("rm -rf %s" % path, with_lock = True)
297 def run_and_wait(self, command,
304 raise_on_error = False):
305 """ runs a command in background on the remote host, but waits
306 until the command finishes execution.
307 This is more robust than doing a simple synchronized 'execute',
308 since in the remote host the command can continue to run detached
309 even if network disconnections occur
311 # run command in background in remote host
312 (out, err), proc = self.run(command, home,
319 # check no errors occurred
320 if proc.poll() and err:
321 msg = " Failed to run command '%s' " % command
322 self.error(msg, out, err)
324 raise RuntimeError, msg
326 # Wait for pid file to be generated
327 pid, ppid = self.wait_pid(
330 raise_on_error = raise_on_error)
332 # wait until command finishes to execute
333 self.wait_run(pid, ppid)
335 # check if execution errors occurred
336 (out, err), proc = self.check_output(home, stderr)
339 msg = " Failed to run command '%s' " % command
340 self.error(msg, out, err)
343 raise RuntimeError, msg
345 return (out, err), proc
347 def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
348 """ Waits until the pid file for the command is generated,
349 and returns the pid and ppid of the process """
353 pidtuple = self.checkpid(home = home, pidfile = pidfile)
360 delay = min(30,delay*1.2)
362 msg = " Failed to get pid for pidfile %s/%s " % (
367 raise RuntimeError, msg
371 def wait_run(self, pid, ppid, trial = 0):
372 """ wait for a remote process to finish execution """
378 status = self.status(pid, ppid)
380 if status is sshfuncs.FINISHED:
382 elif status is not sshfuncs.RUNNING:
384 time.sleep(delay*(5.5+random.random()))
391 time.sleep(delay*(0.5+random.random()))
392 delay = min(30,delay*1.2)
395 def check_output(self, home, filename):
396 """ checks file content """
397 (out, err), proc = self.execute("cat %s" %
398 os.path.join(home, filename), with_lock = True)
399 return (out, err), proc
407 (out, err), proc = self.execute("echo 'ALIVE'", with_lock = True)
410 trace = traceback.format_exc()
411 msg = "Unresponsive host "
412 self.warn(msg, out, trace)
415 if out.strip().startswith('ALIVE'):
418 msg = "Unresponsive host "
419 self.warn(msg, out, err)
423 #if self.check_bad_host(out,err):
426 def copy(self, src, dst):
428 (out, err), proc = execfuncs.lcopy(source, dest,
432 (out, err), proc = sshfuncs.rcopy(
434 port = self.get("port"),
435 identity = self.get("identity"),
436 server_key = self.get("serverKey"),
439 return (out, err), proc
441 def execute(self, command,
449 err_on_timeout = True,
450 connect_timeout = 30,
454 """ Notice that this invocation will block until the
455 execution finishes. If this is not the desired behavior,
456 use 'run' instead."""
459 (out, err), proc = execfuncs.lexec(command,
467 (out, err), proc = sshfuncs.rexec(
469 host = self.get("hostname"),
470 user = self.get("username"),
471 port = self.get("port"),
475 identity = self.get("identity"),
476 server_key = self.get("serverKey"),
479 forward_x11 = forward_x11,
482 err_on_timeout = err_on_timeout,
483 connect_timeout = connect_timeout,
484 persistent = persistent
487 (out, err), proc = sshfuncs.rexec(
489 host = self.get("hostname"),
490 user = self.get("username"),
491 port = self.get("port"),
495 identity = self.get("identity"),
496 server_key = self.get("serverKey"),
499 forward_x11 = forward_x11,
502 err_on_timeout = err_on_timeout,
503 connect_timeout = connect_timeout,
504 persistent = persistent
507 return (out, err), proc
509 def run(self, command,
518 self.debug("Running %s" % command)
521 (out, err), proc = execfuncs.lspawn(command, pidfile,
526 create_home = create_home,
530 # Start process in a "daemonized" way, using nohup and heavy
531 # stdin/out redirection to avoid connection issues
533 (out,err), proc = sshfuncs.rspawn(
537 create_home = create_home,
538 stdin = stdin if stdin is not None else '/dev/null',
539 stdout = stdout if stdout else '/dev/null',
540 stderr = stderr if stderr else '/dev/null',
542 host = self.get("hostname"),
543 user = self.get("username"),
544 port = self.get("port"),
546 identity = self.get("identity"),
547 server_key = self.get("serverKey")
550 return (out, err), proc
552 def checkpid(self, home = ".", pidfile = "pid"):
554 pidtuple = execfuncs.lcheckpid(os.path.join(home, pidfile))
557 pidtuple = sshfuncs.rcheckpid(
558 os.path.join(home, pidfile),
559 host = self.get("hostname"),
560 user = self.get("username"),
561 port = self.get("port"),
563 identity = self.get("identity"),
564 server_key = self.get("serverKey")
569 def status(self, pid, ppid):
571 status = execfuncs.lstatus(pid, ppid)
574 status = sshfuncs.rstatus(
576 host = self.get("hostname"),
577 user = self.get("username"),
578 port = self.get("port"),
580 identity = self.get("identity"),
581 server_key = self.get("serverKey")
586 def kill(self, pid, ppid, sudo = False):
589 status = self.status(pid, ppid)
591 if status == sshfuncs.RUNNING:
593 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
596 (out, err), proc = sshfuncs.rkill(
598 host = self.get("hostname"),
599 user = self.get("username"),
600 port = self.get("port"),
603 identity = self.get("identity"),
604 server_key = self.get("serverKey")
606 return (out, err), proc
608 def check_bad_host(self, out, err):
609 badre = re.compile(r'(?:'
610 r'|Error: disk I/O error'
613 return badre.search(out) or badre.search(err)
617 self.warn(" Blacklisting malfunctioning node ")
619 #util.appendBlacklist(self.hostname)