2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sshfuncs import ProcStatus
26 from nepi.util.timefuncs import tnow, tdiffsec
31 # TODO: Resolve wildcards in commands!!
32 # TODO: compare_hash for all files that are uploaded!
36 class LinuxApplication(ResourceManager):
37 _rtype = "LinuxApplication"
40 def _register_attributes(cls):
41 command = Attribute("command", "Command to execute",
42 flags = Flags.ExecReadOnly)
43 forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections",
44 flags = Flags.ExecReadOnly)
45 env = Attribute("env", "Environment variables string for command execution",
46 flags = Flags.ExecReadOnly)
47 sudo = Attribute("sudo", "Run with root privileges",
48 flags = Flags.ExecReadOnly)
49 depends = Attribute("depends",
50 "Space-separated list of packages required to run the application",
51 flags = Flags.ExecReadOnly)
52 sources = Attribute("sources",
53 "Space-separated list of regular files to be deployed in the working "
54 "path prior to building. Archives won't be expanded automatically.",
55 flags = Flags.ExecReadOnly)
56 code = Attribute("code",
57 "Plain text source code to be uploaded to the server. It will be stored "
58 "under ${SOURCES}/code",
59 flags = Flags.ExecReadOnly)
60 build = Attribute("build",
61 "Build commands to execute after deploying the sources. "
62 "Sources will be in the ${SOURCES} folder. "
63 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
64 "Try to make the commands return with a nonzero exit code on error.\n"
65 "Also, do not install any programs here, use the 'install' attribute. This will "
66 "help keep the built files constrained to the build folder (which may "
67 "not be the home folder), and will result in faster deployment. Also, "
68 "make sure to clean up temporary files, to reduce bandwidth usage between "
69 "nodes when transferring built packages.",
70 flags = Flags.ReadOnly)
71 install = Attribute("install",
72 "Commands to transfer built files to their final destinations. "
73 "Sources will be in the initial working folder, and a special "
74 "tag ${SOURCES} can be used to reference the experiment's "
75 "home folder (where the application commands will run).\n"
76 "ALL sources and targets needed for execution must be copied there, "
77 "if building has been enabled.\n"
78 "That is, 'slave' nodes will not automatically get any source files. "
79 "'slave' nodes don't get build dependencies either, so if you need "
80 "make and other tools to install, be sure to provide them as "
81 "actual dependencies instead.",
82 flags = Flags.ReadOnly)
83 stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
84 stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
85 stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
86 tear_down = Attribute("tearDown", "Bash script to be executed before "
87 "releasing the resource",
88 flags = Flags.ReadOnly)
90 cls._register_attribute(command)
91 cls._register_attribute(forward_x11)
92 cls._register_attribute(env)
93 cls._register_attribute(sudo)
94 cls._register_attribute(depends)
95 cls._register_attribute(sources)
96 cls._register_attribute(code)
97 cls._register_attribute(build)
98 cls._register_attribute(install)
99 cls._register_attribute(stdin)
100 cls._register_attribute(stdout)
101 cls._register_attribute(stderr)
102 cls._register_attribute(tear_down)
105 def _register_traces(cls):
106 stdout = Trace("stdout", "Standard output stream")
107 stderr = Trace("stderr", "Standard error stream")
109 cls._register_trace(stdout)
110 cls._register_trace(stderr)
112 def __init__(self, ec, guid):
113 super(LinuxApplication, self).__init__(ec, guid)
116 self._home = "app-%s" % self.guid
117 self._in_foreground = False
119 # keep a reference to the running process handler when
120 # the command is not executed as remote daemon in background
123 # timestamp of last state check of the application
124 self._last_state_check = tnow()
126 def log_message(self, msg):
127 return " guid %d - host %s - %s " % (self.guid,
128 self.node.get("hostname"), msg)
132 node = self.get_connected(LinuxNode.rtype())
133 if node: return node[0]
138 return os.path.join(self.node.exp_home, self._home)
142 return os.path.join(self.app_home, 'src')
146 return os.path.join(self.app_home, 'build')
157 def in_foreground(self):
158 """ Returns True if the command needs to be executed in foreground.
159 This means that command will be executed using 'execute' instead of
160 'run' ('run' executes a command in background and detached from the
163 When using X11 forwarding option, the command can not run in background
164 and detached from a terminal, since we need to keep the terminal attached
167 return self.get("forwardX11") or self._in_foreground
169 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
170 self.info("Retrieving '%s' trace %s " % (name, attr))
172 path = os.path.join(self.app_home, name)
174 command = "(test -f %s && echo 'success') || echo 'error'" % path
175 (out, err), proc = self.node.execute(command)
177 if (err and proc.poll()) or out.find("error") != -1:
178 msg = " Couldn't find trace %s " % name
179 self.error(msg, out, err)
182 if attr == TraceAttr.PATH:
185 if attr == TraceAttr.ALL:
186 (out, err), proc = self.node.check_output(self.app_home, name)
188 if err and proc.poll():
189 msg = " Couldn't read trace %s " % name
190 self.error(msg, out, err)
195 if attr == TraceAttr.STREAM:
196 cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
197 elif attr == TraceAttr.SIZE:
198 cmd = "stat -c%%s %s " % path
200 (out, err), proc = self.node.execute(cmd)
202 if err and proc.poll():
203 msg = " Couldn't find trace %s " % name
204 self.error(msg, out, err)
207 if attr == TraceAttr.SIZE:
208 out = int(out.strip())
213 # create home dir for application
214 self.node.mkdir(self.app_home)
217 self.upload_sources()
225 # install dependencies
226 self.install_dependencies()
234 # Upload command to remote bash script
235 # - only if command can be executed in background and detached
236 command = self.get("command")
238 if command and not self.in_foreground:
239 self.info("Uploading command '%s'" % command)
241 # replace application specific paths in the command
242 command = self.replace_paths(command)
244 # replace application specific paths in the environment
245 env = self.get("env")
246 env = env and self.replace_paths(env)
248 self.node.upload_command(command, self.app_home,
252 self.info("Provisioning finished")
254 super(LinuxApplication, self).provision()
256 def upload_sources(self):
257 sources = self.get("sources")
259 self.info("Uploading sources ")
261 # create dir for sources
262 self.node.mkdir(self.src_dir)
264 sources = sources.split(' ')
266 http_sources = list()
267 for source in list(sources):
268 if source.startswith("http") or source.startswith("https"):
269 http_sources.append(source)
270 sources.remove(source)
272 # Download http sources remotely
274 command = [" wget -c --directory-prefix=${SOURCES} "]
277 for source in http_sources:
278 command.append(" %s " % (source))
279 check.append(" ls ${SOURCES}/%s " % os.path.basename(source))
281 command = " ".join(command)
282 check = " ; ".join(check)
284 # Append the command to check that the sources were downloaded
285 command += " ; %s " % check
287 # replace application specific paths in the command
288 command = self.replace_paths(command)
290 # Upload the command to a bash script and run it
291 # in background ( but wait until the command has
292 # finished to continue )
293 self.node.run_and_wait(command, self.app_home,
294 shfile = "http_sources.sh",
295 pidfile = "http_sources_pidfile",
296 ecodefile = "http_sources_exitcode",
297 stdout = "http_sources_stdout",
298 stderr = "http_sources_stderr")
301 self.node.upload(sources, self.src_dir)
303 def upload_code(self):
304 code = self.get("code")
306 # create dir for sources
307 self.node.mkdir(self.src_dir)
309 self.info("Uploading code ")
311 dst = os.path.join(self.src_dir, "code")
312 self.node.upload(sources, dst, text = True)
314 def upload_stdin(self):
315 stdin = self.get("stdin")
317 # create dir for sources
318 self.info(" Uploading stdin ")
320 dst = os.path.join(self.app_home, "stdin")
322 # If what we are uploading is a file, check whether
323 # the same file already exists (using md5sum)
324 if self.compare_hash(stdin, dst):
327 self.node.upload(stdin, dst, text = True)
329 def install_dependencies(self):
330 depends = self.get("depends")
332 self.info("Installing dependencies %s" % depends)
333 self.node.install_packages(depends, self.app_home)
336 build = self.get("build")
338 self.info("Building sources ")
340 # create dir for build
341 self.node.mkdir(self.build_dir)
343 # replace application specific paths in the command
344 command = self.replace_paths(build)
346 # Upload the command to a bash script and run it
347 # in background ( but wait until the command has
348 # finished to continue )
349 self.node.run_and_wait(command, self.app_home,
351 pidfile = "build_pidfile",
352 ecodefile = "build_exitcode",
353 stdout = "build_stdout",
354 stderr = "build_stderr")
357 install = self.get("install")
359 self.info("Installing sources ")
361 # replace application specific paths in the command
362 command = self.replace_paths(install)
364 # Upload the command to a bash script and run it
365 # in background ( but wait until the command has
366 # finished to continue )
367 self.node.run_and_wait(command, self.app_home,
368 shfile = "install.sh",
369 pidfile = "install_pidfile",
370 ecodefile = "install_exitcode",
371 stdout = "install_stdout",
372 stderr = "install_stderr")
375 # Wait until node is associated and deployed
377 if not node or node.state < ResourceState.READY:
378 self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
379 self.ec.schedule(reschedule_delay, self.deploy)
382 command = self.get("command") or ""
383 self.info("Deploying command '%s' " % command)
387 self._state = ResourceState.FAILED
390 super(LinuxApplication, self).deploy()
393 command = self.get("command")
395 self.info("Starting command '%s'" % command)
398 # If no command was given (i.e. Application was used for dependency
399 # installation), then the application is directly marked as FINISHED
400 self._state = ResourceState.FINISHED
403 if self.in_foreground:
404 self._start_in_foreground()
406 self._start_in_background()
408 super(LinuxApplication, self).start()
410 def _start_in_foreground(self):
411 command = self.get("command")
412 stdin = "stdin" if self.get("stdin") else None
413 sudo = self.get("sudo") or False
414 x11 = self.get("forwardX11")
416 # Command will be launched in foreground and attached to the
417 # terminal using the node 'execute' in non blocking mode.
420 env = self.get("env")
421 environ = self.node.format_environment(env, inline = True)
422 command = environ + command
423 command = self.replace_paths(command)
425 # We save the reference to the process in self._proc
426 # to be able to kill the process from the stop method.
427 # We also set blocking = False, since we don't want the
428 # thread to block until the execution finishes.
429 (out, err), self._proc = self.node.execute(command,
435 if self._proc.poll():
436 self._state = ResourceState.FAILED
437 self.error(msg, out, err)
438 raise RuntimeError, msg
440 def _start_in_background(self):
441 command = self.get("command")
442 env = self.get("env")
443 stdin = "stdin" if self.get("stdin") else None
444 stdout = "stdout" if self.get("stdout") else "stdout"
445 stderr = "stderr" if self.get("stderr") else "stderr"
446 sudo = self.get("sudo") or False
448 # Command will be as a daemon in baground and detached from any terminal.
449 # The real command to run was previously uploaded to a bash script
450 # during deployment, now launch the remote script using 'run'
451 # method from the node
452 cmd = "bash ./app.sh"
453 (out, err), proc = self.node.run(cmd, self.app_home,
459 # check if execution errors occurred
460 msg = " Failed to start command '%s' " % command
463 self._state = ResourceState.FAILED
464 self.error(msg, out, err)
465 raise RuntimeError, msg
467 # Wait for pid file to be generated
468 pid, ppid = self.node.wait_pid(self.app_home)
469 if pid: self._pid = int(pid)
470 if ppid: self._ppid = int(ppid)
472 # If the process is not running, check for error information
473 # on the remote machine
474 if not self.pid or not self.ppid:
475 (out, err), proc = self.node.check_errors(self.app_home,
478 # Out is what was written in the stderr file
480 self._state = ResourceState.FAILED
481 msg = " Failed to start command '%s' " % command
482 self.error(msg, out, err)
483 raise RuntimeError, msg
486 """ Stops application execution
488 command = self.get('command') or ''
490 if self.state == ResourceState.STARTED:
493 self.info("Stopping command '%s'" % command)
495 # If the command is running in foreground (it was launched using
496 # the node 'execute' method), then we use the handler to the Popen
497 # process to kill it. Else we send a kill signal using the pid and ppid
498 # retrieved after running the command with the node 'run' method
503 # Only try to kill the process if the pid and ppid
505 if self.pid and self.ppid:
506 (out, err), proc = self.node.kill(self.pid, self.ppid)
509 # check if execution errors occurred
510 msg = " Failed to STOP command '%s' " % self.get("command")
511 self.error(msg, out, err)
512 self._state = ResourceState.FAILED
516 super(LinuxApplication, self).stop()
519 self.info("Releasing resource")
521 tear_down = self.get("tearDown")
523 self.node.execute(tear_down)
527 if self.state == ResourceState.STOPPED:
528 super(LinuxApplication, self).release()
532 """ Returns the state of the application
534 if self._state == ResourceState.STARTED:
535 if self.in_foreground:
536 # Check if the process we used to execute the command
537 # is still running ...
538 retcode = self._proc.poll()
540 # retcode == None -> running
541 # retcode > 0 -> error
542 # retcode == 0 -> finished
545 msg = " Failed to execute command '%s'" % self.get("command")
546 err = self._proc.stderr.read()
547 self.error(msg, out, err)
548 self._state = ResourceState.FAILED
550 self._state = ResourceState.FINISHED
553 # We need to query the status of the command we launched in
554 # background. In oredr to avoid overwhelming the remote host and
555 # the local processor with too many ssh queries, the state is only
556 # requested every 'state_check_delay' seconds.
557 state_check_delay = 0.5
558 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
559 # check if execution errors occurred
560 (out, err), proc = self.node.check_errors(self.app_home)
563 msg = " Failed to execute command '%s'" % self.get("command")
564 self.error(msg, out, err)
565 self._state = ResourceState.FAILED
567 elif self.pid and self.ppid:
568 # No execution errors occurred. Make sure the background
569 # process with the recorded pid is still running.
570 status = self.node.status(self.pid, self.ppid)
572 if status == ProcStatus.FINISHED:
573 self._state = ResourceState.FINISHED
575 self._last_state_check = tnow()
579 def replace_paths(self, command):
581 Replace all special path tags with shell-escaped actual paths.
584 return d if d.startswith("/") else os.path.join("${HOME}", d)
587 .replace("${SOURCES}", absolute_dir(self.src_dir))
588 .replace("${BUILD}", absolute_dir(self.build_dir))
589 .replace("${APP_HOME}", absolute_dir(self.app_home))
590 .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
591 .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
594 def compare_hash(self, local, remote):
595 # getting md5sum from remote file
596 (out, err), proc = self.node.execute("md5sum %s " % remote)
598 if proc.poll() == 0: #OK
599 if not os.path.isfile(local):
600 # store to a tmp file
601 f = tempfile.NamedTemporaryFile()
606 lproc = subprocess.Popen(["md5sum", local],
607 stdout = subprocess.PIPE,
608 stderr = subprocess.PIPE)
610 # getting md5sum from local file
611 (lout, lerr) = lproc.communicate()
613 # files are the same, no need to upload
614 lchk = lout.strip().split(" ")[0]
615 rchk = out.strip().split(" ")[0]
617 msg = " Comparing files: LOCAL %s md5sum %s - REMOTE %s md5sum %s" % (
618 local, lchk, remote, rchk)
626 def valid_connection(self, guid):