1 from neco.execution.attribute import Attribute, Flags
2 from neco.execution.resource import ResourceManager, clsinit, ResourceState
3 from neco.resources.linux import rpmfuncs, debfuncs
4 from neco.util import sshfuncs, execfuncs
15 # TODO: Verify files and dirs exists already
18 class LinuxNode(ResourceManager):
22 def _register_attributes(cls):
23 hostname = Attribute("hostname", "Hostname of the machine")
25 username = Attribute("username", "Local account username",
26 flags = Flags.Credential)
28 port = Attribute("port", "SSH port", flags = Flags.Credential)
30 home = Attribute("home",
31 "Experiment home directory to store all experiment related files")
33 identity = Attribute("identity", "SSH identity file",
34 flags = Flags.Credential)
36 server_key = Attribute("serverKey", "Server public key",
37 flags = Flags.Credential)
39 clean_home = Attribute("cleanHome", "Remove all files and directories " + \
40 " from home folder before starting experiment",
41 flags = Flags.ReadOnly)
43 clean_processes = Attribute("cleanProcesses",
44 "Kill all running processes before starting experiment",
45 flags = Flags.ReadOnly)
47 tear_down = Attribute("tearDown", "Bash script to be executed before " + \
48 "releasing the resource", flags = Flags.ReadOnly)
50 cls._register_attribute(hostname)
51 cls._register_attribute(username)
52 cls._register_attribute(port)
53 cls._register_attribute(home)
54 cls._register_attribute(identity)
55 cls._register_attribute(server_key)
56 cls._register_attribute(clean_home)
57 cls._register_attribute(clean_processes)
58 cls._register_attribute(tear_down)
60 def __init__(self, ec, guid):
61 super(LinuxNode, self).__init__(ec, guid)
63 self._home = "nepi-exp-%s" % os.urandom(8).encode('hex')
65 # lock to avoid concurrency issues on methods used by applications
66 self._lock = threading.Lock()
68 self._logger = logging.getLogger("neco.linux.Node.%d " % self.guid)
72 home = self.get("home")
73 if home and not home.startswith("nepi-"):
75 return home or self._home
82 if (not self.get("hostname") or not self.get("username")):
83 msg = "Can't resolve OS for guid %d. Insufficient data." % self.guid
84 self.logger.error(msg)
85 raise RuntimeError, msg
87 (out, err), proc = self.execute("cat /etc/issue")
89 if err and proc.poll():
90 msg = "Error detecting OS for host %s. err: %s " % (self.get("hostname"), err)
91 self.logger.error(msg)
92 raise RuntimeError, msg
94 if out.find("Fedora release 12") == 0:
96 elif out.find("Fedora release 14") == 0:
98 elif out.find("Debian") == 0:
100 elif out.find("Ubuntu") ==0:
103 msg = "Unsupported OS %s for host %s" % (out, self.get("hostname"))
104 self.logger.error(msg)
105 raise RuntimeError, msg
111 return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
113 def provision(self, filters = None):
114 if not self.is_alive():
115 self._state = ResourceState.FAILED
116 self.logger.error("Deploy failed. Unresponsive node")
122 if self.get("cleanProcesses"):
123 self.clean_processes()
125 if self.get("cleanHome"):
126 # self.clean_home() -> this is dangerous
129 self.mkdir(self.home)
131 super(LinuxNode, self).deploy()
134 tear_down = self.get("tearDown")
136 self.execute(tear_down)
138 super(LinuxNode, self).release()
140 def validate_connection(self, guid):
144 def clean_processes(self):
145 self.logger.info("Cleaning up processes")
147 cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
148 "sudo -S killall python tcpdump || /bin/true ; " +
149 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
150 "sudo -S killall -u root || /bin/true ; " +
151 "sudo -S killall -u root || /bin/true ; ")
155 (out, err), proc = self.run_and_wait(cmd, self.home,
159 raise_on_error = True)
163 def clean_home(self):
164 self.logger.info("Cleaning up home")
166 cmd = "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \) -execdir rm -rf {} + "
170 (out, err), proc = self.run_and_wait(cmd, self.home,
174 raise_on_error = True)
178 def upload(self, src, dst):
179 """ Copy content to destination
181 src content to copy. Can be a local file, directory or text input
183 dst destination path on the remote host (remote is always self.host)
185 # If source is a string input
186 if not os.path.isfile(src):
187 # src is text input that should be uploaded as file
188 # create a temporal file with the content to upload
189 f = tempfile.NamedTemporaryFile(delete=False)
194 if not self.localhost:
195 # Build destination as <user>@<server>:<path>
196 dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
198 return self.copy(src, dst)
200 def download(self, src, dst):
201 if not self.localhost:
202 # Build destination as <user>@<server>:<path>
203 src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
204 return self.copy(src, dst)
206 def install_packages(self, packages):
208 if self.os in ["f12", "f14"]:
209 cmd = rpmfuncs.install_packages_command(self.os, packages)
210 elif self.os in ["debian", "ubuntu"]:
211 cmd = debfuncs.install_packages_command(self.os, packages)
213 msg = "Error installing packages. OS not known for host %s " % (
214 self.get("hostname"))
215 self.logger.error(msg)
216 raise RuntimeError, msg
220 (out, err), proc = self.run_and_wait(cmd, self.home,
221 pidfile = "instpkgpid",
222 stdout = "instpkglog",
223 stderr = "instpkgerr",
224 raise_on_error = True)
226 return (out, err), proc
228 def remove_packages(self, packages):
230 if self.os in ["f12", "f14"]:
231 cmd = rpmfuncs.remove_packages_command(self.os, packages)
232 elif self.os in ["debian", "ubuntu"]:
233 cmd = debfuncs.remove_packages_command(self.os, packages)
235 msg = "Error removing packages. OS not known for host %s " % (
236 self.get("hostname"))
237 self.logger.error(msg)
238 raise RuntimeError, msg
242 (out, err), proc = self.run_and_wait(cmd, self.home,
243 pidfile = "rmpkgpid",
246 raise_on_error = True)
248 return (out, err), proc
250 def mkdir(self, path, clean = False):
254 return self.execute("mkdir -p %s" % path)
256 def rmdir(self, path):
257 return self.execute("rm -rf %s" % path)
259 def run_and_wait(self, command,
266 raise_on_error = False):
268 (out, err), proc = self.run(command, home,
275 if proc.poll() and err:
276 msg = " Failed to run command %s on host %s" % (
277 command, self.get("hostname"))
278 self.logger.error(msg)
280 raise RuntimeError, msg
282 pid, ppid = self.wait_pid(
285 raise_on_error = raise_on_error)
287 self.wait_run(pid, ppid)
289 (out, err), proc = self.check_run_error(home, stderr)
292 msg = "Error while running command %s on host %s. error output: %s" % (
293 command, self.get("hostname"), out)
295 msg += " . err: %s" % err
297 self.logger.error(msg)
299 raise RuntimeError, msg
301 return (out, err), proc
303 def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
307 pidtuple = self.checkpid(home = home, pidfile = pidfile)
314 delay = min(30,delay*1.2)
316 msg = " Failed to get pid for pidfile %s/%s on host %s" % (
317 home, pidfile, self.get("hostname"))
318 self.logger.error(msg)
320 raise RuntimeError, msg
324 def wait_run(self, pid, ppid, trial = 0):
330 status = self.status(pid, ppid)
332 if status is sshfuncs.FINISHED:
334 elif status is not sshfuncs.RUNNING:
336 time.sleep(delay*(5.5+random.random()))
343 time.sleep(delay*(0.5+random.random()))
344 delay = min(30,delay*1.2)
347 def check_run_error(self, home, stderr = 'stderr'):
348 (out, err), proc = self.execute("cat %s" %
349 os.path.join(home, stderr))
350 return (out, err), proc
352 def check_run_output(self, home, stdout = 'stdout'):
353 (out, err), proc = self.execute("cat %s" %
354 os.path.join(home, stdout))
355 return (out, err), proc
364 (out, err), proc = self.execute("echo 'ALIVE'")
367 trace = traceback.format_exc()
368 self.logger.warn("Unresponsive host %s. got:\n out: %s err: %s\n traceback: %s",
369 self.get("hostname"), out, err, trace)
372 if out.strip().startswith('ALIVE'):
375 self.logger.warn("Unresponsive host %s. got:\n%s%s",
376 self.get("hostname"), out, err)
380 #if self.check_bad_host(out,err):
383 def copy(self, src, dst):
385 (out, err), proc = execfuncs.lcopy(source, dest,
388 (out, err), proc = self.safe_retry(sshfuncs.rcopy)(
390 port = self.get("port"),
391 identity = self.get("identity"),
392 server_key = self.get("serverKey"),
395 return (out, err), proc
397 def execute(self, command,
405 err_on_timeout = True,
406 connect_timeout = 30,
409 """ Notice that this invocation will block until the
410 execution finishes. If this is not the desired behavior,
411 use 'run' instead."""
414 (out, err), proc = execfuncs.lexec(command,
420 (out, err), proc = self.safe_retry(sshfuncs.rexec)(
422 host = self.get("hostname"),
423 user = self.get("username"),
424 port = self.get("port"),
428 identity = self.get("identity"),
429 server_key = self.get("serverKey"),
432 forward_x11 = forward_x11,
435 err_on_timeout = err_on_timeout,
436 connect_timeout = connect_timeout,
437 persistent = persistent
440 return (out, err), proc
442 def run(self, command,
451 self.logger.info("Running %s", command)
454 (out, err), proc = execfuncs.lspawn(command, pidfile,
459 create_home = create_home,
463 # Start process in a "daemonized" way, using nohup and heavy
464 # stdin/out redirection to avoid connection issues
465 (out,err), proc = self.safe_retry(sshfuncs.rspawn)(
469 create_home = create_home,
470 stdin = stdin if stdin is not None else '/dev/null',
471 stdout = stdout if stdout else '/dev/null',
472 stderr = stderr if stderr else '/dev/null',
474 host = self.get("hostname"),
475 user = self.get("username"),
476 port = self.get("port"),
478 identity = self.get("identity"),
479 server_key = self.get("serverKey")
482 return (out, err), proc
484 def checkpid(self, home = ".", pidfile = "pid"):
486 pidtuple = execfuncs.lcheckpid(os.path.join(home, pidfile))
488 pidtuple = sshfuncs.rcheckpid(
489 os.path.join(home, pidfile),
490 host = self.get("hostname"),
491 user = self.get("username"),
492 port = self.get("port"),
494 identity = self.get("identity"),
495 server_key = self.get("serverKey")
500 def status(self, pid, ppid):
502 status = execfuncs.lstatus(pid, ppid)
504 status = sshfuncs.rstatus(
506 host = self.get("hostname"),
507 user = self.get("username"),
508 port = self.get("port"),
510 identity = self.get("identity"),
511 server_key = self.get("serverKey")
516 def kill(self, pid, ppid, sudo = False):
519 status = self.status(pid, ppid)
521 if status == sshfuncs.RUNNING:
523 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
525 (out, err), proc = self.safe_retry(sshfuncs.rkill)(
527 host = self.get("hostname"),
528 user = self.get("username"),
529 port = self.get("port"),
532 identity = self.get("identity"),
533 server_key = self.get("serverKey")
535 return (out, err), proc
537 def check_bad_host(self, out, err):
538 badre = re.compile(r'(?:'
539 r'|Error: disk I/O error'
542 return badre.search(out) or badre.search(err)
546 self.logger.warn("Blacklisting malfunctioning node %s", self.hostname)
548 #util.appendBlacklist(self.hostname)
550 def safe_retry(self, func):
551 """Retries a function invocation using a lock"""
553 @functools.wraps(func)
555 fail_msg = " Failed to execute function %s(%s, %s) at host %s" % (
556 func.__name__, p, kw, self.get("hostname"))
557 retry = kw.pop("_retry", False)
558 wlock = kw.pop("_with_lock", False)
562 for i in xrange(0 if retry else 4):
566 (out, err), proc = func(*p, **kw)
568 (out, err), proc = func(*p, **kw)
575 self.logger.error("%s. out: %s error: %s", fail_msg, out, err)
577 except RuntimeError, e:
579 self.logger.error("%s. error: %s", fail_msg, e.args)
580 return (out, err), proc