1 from neco.execution.attribute import Attribute, Flags
2 from neco.execution.resource import ResourceManager, clsinit, ResourceState
3 from neco.resources.linux import rpmfuncs, debfuncs
4 from neco.util import sshfuncs, execfuncs
15 # TODO: Verify files and dirs exists already
16 # TODO: Blacklist nodes!
17 # TODO: Unify delays!!
19 reschedule_delay = "0.5s"
22 class LinuxNode(ResourceManager):
26 def _register_attributes(cls):
27 hostname = Attribute("hostname", "Hostname of the machine",
28 flags = Flags.ExecReadOnly)
30 username = Attribute("username", "Local account username",
31 flags = Flags.Credential)
33 port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
35 home = Attribute("home",
36 "Experiment home directory to store all experiment related files",
37 flags = Flags.ExecReadOnly)
39 identity = Attribute("identity", "SSH identity file",
40 flags = Flags.Credential)
42 server_key = Attribute("serverKey", "Server public key",
43 flags = Flags.ExecReadOnly)
45 clean_home = Attribute("cleanHome", "Remove all files and directories " + \
46 " from home folder before starting experiment",
47 flags = Flags.ExecReadOnly)
49 clean_processes = Attribute("cleanProcesses",
50 "Kill all running processes before starting experiment",
51 flags = Flags.ExecReadOnly)
53 tear_down = Attribute("tearDown", "Bash script to be executed before " + \
54 "releasing the resource",
55 flags = Flags.ExecReadOnly)
57 cls._register_attribute(hostname)
58 cls._register_attribute(username)
59 cls._register_attribute(port)
60 cls._register_attribute(home)
61 cls._register_attribute(identity)
62 cls._register_attribute(server_key)
63 cls._register_attribute(clean_home)
64 cls._register_attribute(clean_processes)
65 cls._register_attribute(tear_down)
67 def __init__(self, ec, guid):
68 super(LinuxNode, self).__init__(ec, guid)
71 # lock to avoid concurrency issues on methods used by applications
72 self._lock = threading.Lock()
74 self._logger = logging.getLogger("LinuxNode")
76 def log_message(self, msg):
77 return " guid %d - host %s - %s " % (self.guid,
78 self.get("hostname"), msg)
82 return self.get("home") or "/tmp"
86 exp_dir = os.path.join(self.home, self.ec.exp_id)
87 return exp_dir if exp_dir.startswith('/') or \
88 exp_dir.startswith("~/") else "~/"
92 node_home = "node-%d" % self.guid
93 return os.path.join(self.exp_dir, node_home)
100 if (not self.get("hostname") or not self.get("username")):
101 msg = "Can't resolve OS, insufficient data "
103 raise RuntimeError, msg
105 (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
107 if err and proc.poll():
108 msg = "Error detecting OS "
109 self.error(msg, out, err)
110 raise RuntimeError, "%s - %s - %s" %( msg, out, err )
112 if out.find("Fedora release 12") == 0:
114 elif out.find("Fedora release 14") == 0:
116 elif out.find("Debian") == 0:
118 elif out.find("Ubuntu") ==0:
121 msg = "Unsupported OS"
123 raise RuntimeError, "%s - %s " %( msg, out )
129 return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
131 def provision(self, filters = None):
132 if not self.is_alive():
133 self._state = ResourceState.FAILED
134 self.error("Deploy failed. Unresponsive node")
137 if self.get("cleanProcesses"):
138 self.clean_processes()
140 if self.get("cleanHome"):
143 self.mkdir(self.node_home)
145 super(LinuxNode, self).provision()
148 if self.state == ResourceState.NEW:
153 self._state = ResourceState.FAILED
156 # Node needs to wait until all associated interfaces are
157 # ready before it can finalize deployment
158 from neco.resources.linux.interface import LinuxInterface
159 ifaces = self.get_connected(LinuxInterface.rtype())
161 if iface.state < ResourceState.READY:
162 self.ec.schedule(reschedule_delay, self.deploy)
165 super(LinuxNode, self).deploy()
168 tear_down = self.get("tearDown")
170 self.execute(tear_down)
172 super(LinuxNode, self).release()
174 def valid_connection(self, guid):
178 def clean_processes(self, killer = False):
179 self.info("Cleaning up processes")
183 cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
184 "sudo -S killall python tcpdump || /bin/true ; " +
185 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
186 "sudo -S killall -u root || /bin/true ; " +
187 "sudo -S killall -u root || /bin/true ; ")
190 cmd = ("sudo -S killall tcpdump || /bin/true ; " +
191 "sudo -S killall tcpdump || /bin/true ; " +
192 "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
193 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
196 (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
198 def clean_home(self):
199 self.info("Cleaning up home")
201 cmd = ("cd %s ; " % self.home +
202 "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)"+
203 " -execdir rm -rf {} + ")
206 (out, err), proc = self.execute(cmd, with_lock = True)
208 def upload(self, src, dst, text = False):
209 """ Copy content to destination
211 src content to copy. Can be a local file, directory or a list of files
213 dst destination path on the remote host (remote is always self.host)
215 text src is text input, it must be stored into a temp file before uploading
217 # If source is a string input
219 if text and not os.path.isfile(src):
220 # src is text input that should be uploaded as file
221 # create a temporal file with the content to upload
222 f = tempfile.NamedTemporaryFile(delete=False)
227 if not self.localhost:
228 # Build destination as <user>@<server>:<path>
229 dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
231 result = self.copy(src, dst)
239 def download(self, src, dst):
240 if not self.localhost:
241 # Build destination as <user>@<server>:<path>
242 src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
243 return self.copy(src, dst)
245 def install_packages(self, packages, home = None):
246 home = home or self.node_home
249 if self.os in ["f12", "f14"]:
250 cmd = rpmfuncs.install_packages_command(self.os, packages)
251 elif self.os in ["debian", "ubuntu"]:
252 cmd = debfuncs.install_packages_command(self.os, packages)
254 msg = "Error installing packages ( OS not known ) "
255 self.error(msg, self.os)
256 raise RuntimeError, msg
259 (out, err), proc = self.run_and_wait(cmd, home,
260 pidfile = "instpkg_pid",
261 stdout = "instpkg_out",
262 stderr = "instpkg_err",
263 raise_on_error = True)
265 return (out, err), proc
267 def remove_packages(self, packages, home = None):
268 home = home or self.node_home
271 if self.os in ["f12", "f14"]:
272 cmd = rpmfuncs.remove_packages_command(self.os, packages)
273 elif self.os in ["debian", "ubuntu"]:
274 cmd = debfuncs.remove_packages_command(self.os, packages)
276 msg = "Error removing packages ( OS not known ) "
278 raise RuntimeError, msg
281 (out, err), proc = self.run_and_wait(cmd, home,
282 pidfile = "rmpkg_pid",
283 stdout = "rmpkg_out",
284 stderr = "rmpkg_err",
285 raise_on_error = True)
287 return (out, err), proc
289 def mkdir(self, path, clean = False):
293 return self.execute("mkdir -p %s" % path, with_lock = True)
295 def rmdir(self, path):
296 return self.execute("rm -rf %s" % path, with_lock = True)
298 def run_and_wait(self, command,
306 raise_on_error = False):
307 """ runs a command in background on the remote host, but waits
308 until the command finishes execution.
309 This is more robust than doing a simple synchronized 'execute',
310 since in the remote host the command can continue to run detached
311 even if network disconnections occur
313 # run command in background in remote host
314 (out, err), proc = self.run(command, home,
322 # check no errors occurred
323 if proc.poll() and err:
324 msg = " Failed to run command '%s' " % command
325 self.error(msg, out, err)
327 raise RuntimeError, msg
329 # Wait for pid file to be generated
330 pid, ppid = self.wait_pid(
333 raise_on_error = raise_on_error)
335 # wait until command finishes to execute
336 self.wait_run(pid, ppid)
338 # check if execution errors occurred
339 (out, err), proc = self.check_output(home, stderr)
342 msg = " Failed to run command '%s' " % command
343 self.error(msg, out, err)
346 raise RuntimeError, msg
348 return (out, err), proc
350 def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
351 """ Waits until the pid file for the command is generated,
352 and returns the pid and ppid of the process """
356 pidtuple = self.checkpid(home = home, pidfile = pidfile)
363 delay = min(30,delay*1.2)
365 msg = " Failed to get pid for pidfile %s/%s " % (
370 raise RuntimeError, msg
374 def wait_run(self, pid, ppid, trial = 0):
375 """ wait for a remote process to finish execution """
381 status = self.status(pid, ppid)
383 if status is sshfuncs.FINISHED:
385 elif status is not sshfuncs.RUNNING:
387 time.sleep(delay*(5.5+random.random()))
394 time.sleep(delay*(0.5+random.random()))
395 delay = min(30,delay*1.2)
398 def check_output(self, home, filename):
399 """ checks file content """
400 (out, err), proc = self.execute("cat %s" %
401 os.path.join(home, filename), retry = 1, with_lock = True)
402 return (out, err), proc
410 (out, err), proc = self.execute("echo 'ALIVE'", with_lock = True)
413 trace = traceback.format_exc()
414 msg = "Unresponsive host "
415 self.warn(msg, out, trace)
418 if out.strip().startswith('ALIVE'):
421 msg = "Unresponsive host "
422 self.warn(msg, out, err)
426 #if self.check_bad_host(out,err):
429 def copy(self, src, dst):
431 (out, err), proc = execfuncs.lcopy(source, dest,
435 (out, err), proc = sshfuncs.rcopy(
437 port = self.get("port"),
438 identity = self.get("identity"),
439 server_key = self.get("serverKey"),
442 return (out, err), proc
444 def execute(self, command,
452 err_on_timeout = True,
453 connect_timeout = 30,
457 """ Notice that this invocation will block until the
458 execution finishes. If this is not the desired behavior,
459 use 'run' instead."""
462 (out, err), proc = execfuncs.lexec(command,
470 (out, err), proc = sshfuncs.rexec(
472 host = self.get("hostname"),
473 user = self.get("username"),
474 port = self.get("port"),
478 identity = self.get("identity"),
479 server_key = self.get("serverKey"),
482 forward_x11 = forward_x11,
485 err_on_timeout = err_on_timeout,
486 connect_timeout = connect_timeout,
487 persistent = persistent
490 (out, err), proc = sshfuncs.rexec(
492 host = self.get("hostname"),
493 user = self.get("username"),
494 port = self.get("port"),
498 identity = self.get("identity"),
499 server_key = self.get("serverKey"),
502 forward_x11 = forward_x11,
505 err_on_timeout = err_on_timeout,
506 connect_timeout = connect_timeout,
507 persistent = persistent
510 return (out, err), proc
512 def run(self, command,
522 self.debug("Running command '%s'" % command)
525 (out, err), proc = execfuncs.lspawn(command, pidfile,
530 create_home = create_home,
534 # Start process in a "daemonized" way, using nohup and heavy
535 # stdin/out redirection to avoid connection issues
537 (out,err), proc = sshfuncs.rspawn(
541 create_home = create_home,
542 stdin = stdin if stdin is not None else '/dev/null',
543 stdout = stdout if stdout else '/dev/null',
544 stderr = stderr if stderr else '/dev/null',
546 host = self.get("hostname"),
547 user = self.get("username"),
548 port = self.get("port"),
550 identity = self.get("identity"),
551 server_key = self.get("serverKey"),
555 return (out, err), proc
557 def checkpid(self, home = ".", pidfile = "pid"):
559 pidtuple = execfuncs.lcheckpid(os.path.join(home, pidfile))
562 pidtuple = sshfuncs.rcheckpid(
563 os.path.join(home, pidfile),
564 host = self.get("hostname"),
565 user = self.get("username"),
566 port = self.get("port"),
568 identity = self.get("identity"),
569 server_key = self.get("serverKey")
574 def status(self, pid, ppid):
576 status = execfuncs.lstatus(pid, ppid)
579 status = sshfuncs.rstatus(
581 host = self.get("hostname"),
582 user = self.get("username"),
583 port = self.get("port"),
585 identity = self.get("identity"),
586 server_key = self.get("serverKey")
591 def kill(self, pid, ppid, sudo = False):
594 status = self.status(pid, ppid)
596 if status == sshfuncs.RUNNING:
598 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
601 (out, err), proc = sshfuncs.rkill(
603 host = self.get("hostname"),
604 user = self.get("username"),
605 port = self.get("port"),
608 identity = self.get("identity"),
609 server_key = self.get("serverKey")
611 return (out, err), proc
613 def check_bad_host(self, out, err):
614 badre = re.compile(r'(?:'
615 r'|Error: disk I/O error'
618 return badre.search(out) or badre.search(err)
622 self.warn(" Blacklisting malfunctioning node ")
624 #util.appendBlacklist(self.hostname)