2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
23 from nepi.execution.attribute import Attribute, Flags, Types
24 from nepi.execution.trace import Trace, TraceAttr
25 from nepi.execution.resource import ResourceManager, clsinit_copy, \
27 from nepi.resources.linux.node import LinuxNode
28 from nepi.util.sshfuncs import ProcStatus, STDOUT
29 from nepi.util.timefuncs import tnow, tdiffsec
32 # logging.getLogger('application').setLevel(logging.DEBUG)
33 logger = logging.getLogger("application")
35 # TODO: Resolve wildcards in commands!!
36 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
39 class LinuxApplication(ResourceManager):
41 .. class:: Class Args :
43 :param ec: The Experiment controller
44 :type ec: ExperimentController
45 :param guid: guid of the RM
50 A LinuxApplication RM represents a process that can be executed in
51 a remote Linux host using SSH.
53 The LinuxApplication RM takes care of uploadin sources and any files
54 needed to run the experiment, to the remote host.
55 It also allows to provide source compilation (build) and installation
56 instructions, and takes care of automating the sources build and
57 installation tasks for the user.
59 It is important to note that files uploaded to the remote host have
60 two possible scopes: single-experiment or multi-experiment.
61 Single experiment files are those that will not be re-used by other
62 experiments. Multi-experiment files are those that will.
63 Sources and shared files are always made available to all experiments.
67 The directory structure used by LinuxApplication RM at the Linux
68 host is the following:
70 ${HOME}/.nepi/nepi-usr --> Base directory for multi-experiment files
72 ${LIB} |- /lib --> Base directory for libraries
73 ${BIN} |- /bin --> Base directory for binary files
74 ${SRC} |- /src --> Base directory for sources
75 ${SHARE} |- /share --> Base directory for other files
77 ${HOME}/.nepi/nepi-exp --> Base directory for single-experiment files
79 ${EXP_HOME} |- /<exp-id> --> Base directory for experiment exp-id
81 ${APP_HOME} |- /<app-guid> --> Base directory for application
82 | specific files (e.g. command.sh, input)
84 ${RUN_HOME} |- /<run-id> --> Base directory for run specific
88 _rtype = "linux::Application"
89 _help = "Runs an application on a Linux host with a BASH command "
93 def _register_attributes(cls):
94 cls._register_attribute(
95 Attribute("command", "Command to execute at application start. "
96 "Note that commands will be executed in the ${RUN_HOME} directory, "
97 "make sure to take this into account when using relative paths. ",
98 flags = Flags.Design))
99 cls._register_attribute(
100 Attribute("forwardX11",
101 "Enables X11 forwarding for SSH connections",
102 flags = Flags.Design))
103 cls._register_attribute(
105 "Environment variables string for command execution",
106 flags = Flags.Design))
107 cls._register_attribute(
109 "Run with root privileges",
110 flags = Flags.Design))
111 cls._register_attribute(
113 "Space-separated list of packages required to run the application",
114 flags = Flags.Design))
115 cls._register_attribute(
117 "semi-colon separated list of regular files to be uploaded to ${SRC} "
118 "directory prior to building. Archives won't be expanded automatically. "
119 "Sources are globally available for all experiments unless "
120 "cleanHome is set to True (This will delete all sources). ",
121 flags = Flags.Design))
122 cls._register_attribute(
124 "semi-colon separated list of regular miscellaneous files to be uploaded "
125 "to ${SHARE} directory. "
126 "Files are globally available for all experiments unless "
127 "cleanHome is set to True (This will delete all files). ",
128 flags = Flags.Design))
129 cls._register_attribute(
131 "semi-colon separated list of libraries (e.g. .so files) to be uploaded "
132 "to ${LIB} directory. "
133 "Libraries are globally available for all experiments unless "
134 "cleanHome is set to True (This will delete all files). ",
135 flags = Flags.Design))
136 cls._register_attribute(
138 "semi-colon separated list of binary files to be uploaded "
139 "to ${BIN} directory. "
140 "Binaries are globally available for all experiments unless "
141 "cleanHome is set to True (This will delete all files). ",
142 flags = Flags.Design))
143 cls._register_attribute(
145 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
146 flags = Flags.Design))
147 cls._register_attribute(
149 "Build commands to execute after deploying the sources. "
150 "Sources are uploaded to the ${SRC} directory and code "
151 "is uploaded to the ${APP_HOME} directory. \n"
152 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
153 "./configure && make && make clean.\n"
154 "Make sure to make the build commands return with a nonzero exit "
156 flags = Flags.Design))
157 cls._register_attribute(
159 "Commands to transfer built files to their final destinations. "
160 "Install commands are executed after build commands. ",
161 flags = Flags.Design))
162 cls._register_attribute(
163 Attribute("stdin", "Standard input for the 'command'",
164 flags = Flags.Design))
165 cls._register_attribute(
166 Attribute("tearDown",
167 "Command to be executed just before releasing the resource",
168 flags = Flags.Design))
169 cls._register_attribute(
170 Attribute("splitStderr",
171 "requests stderr to be retrieved separately",
175 def _register_traces(cls):
177 Trace("stdout", "Standard output stream", enabled = True))
179 Trace("stderr", "Standard error stream", enabled = True))
181 def __init__(self, ec, guid):
182 super(LinuxApplication, self).__init__(ec, guid)
186 self._home = "app-{}".format(self.guid)
188 # whether the command should run in foreground attached
190 self._in_foreground = False
192 # whether to use sudo to kill the application process
193 self._sudo_kill = False
195 # keep a reference to the running process handler when
196 # the command is not executed as remote daemon in background
199 # timestamp of last state check of the application
200 self._last_state_check = tnow()
202 def log_message(self, msg):
203 return " guid {} - host {} - {} "\
204 .format(self.guid, self.node.get("hostname"), msg)
209 node = self.get_connected(LinuxNode.get_rtype())
211 msg = "Application {} guid {} NOT connected to Node"\
212 .format(self._rtype, self.guid)
213 raise RuntimeError(msg)
221 return os.path.join(self.node.exp_home, self._home)
225 return os.path.join(self.app_home, self.ec.run_id)
236 def in_foreground(self):
238 Returns True if the command needs to be executed in foreground.
239 This means that command will be executed using 'execute' instead of
240 'run' ('run' executes a command in background and detached from the
243 When using X11 forwarding option, the command can not run in background
244 and detached from a terminal, since we need to keep the terminal attached
247 return self.get("forwardX11") or self._in_foreground
249 def trace_filepath(self, filename):
250 return os.path.join(self.run_home, filename)
252 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
253 self.info("Retrieving '{}' trace {} ".format(name, attr))
255 path = self.trace_filepath(name)
256 logger.debug("trace: path= {}".format(path))
258 command = "(test -f {} && echo 'success') || echo 'error'".format(path)
259 (out, err), proc = self.node.execute(command)
261 if (err and proc.poll()) or out.find("error") != -1:
262 msg = " Couldn't find trace {} ".format(name)
263 self.error(msg, out, err)
266 if attr == TraceAttr.PATH:
269 if attr == TraceAttr.ALL:
270 (out, err), proc = self.node.check_output(self.run_home, name)
273 msg = " Couldn't read trace {} ".format(name)
274 self.error(msg, out, err)
279 if attr == TraceAttr.STREAM:
280 cmd = "dd if={} bs={} count=1 skip={}".format(path, block, offset)
281 elif attr == TraceAttr.SIZE:
282 cmd = "stat -c {} ".format(path)
284 (out, err), proc = self.node.execute(cmd)
287 msg = " Couldn't find trace {} ".format(name)
288 self.error(msg, out, err)
291 if attr == TraceAttr.SIZE:
292 out = int(out.strip())
296 def do_provision(self):
297 # take a snapshot of the system if user is root
298 # to ensure that cleanProcess will not kill
299 # pre-existent processes
300 if self.node.get("username") == 'root':
303 ps_aux = "ps aux | awk '{print $2,$11}'"
304 (out, err), proc = self.node.execute(ps_aux)
306 for line in out.strip().split("\n"):
307 parts = line.strip().split(" ")
308 procs[parts[0]] = parts[1]
309 with open("/tmp/save.proc", "wb") as pickle_file:
310 pickle.dump(procs, pickle_file)
312 # create run dir for application
313 self.node.mkdir(self.run_home)
315 # List of all the provision methods to invoke
322 self.upload_binaries,
324 self.upload_libraries,
329 # install dependencies
330 self.install_dependencies,
339 # Since provisioning takes a long time, before
340 # each step we check that the EC is still
343 self.debug("Interrupting provisioning. EC says 'ABORT")
350 # upload deploy script
351 deploy_command = ";".join(command)
352 self.execute_deploy_command(deploy_command)
354 # upload start script
355 self.upload_start_command()
357 self.info("Provisioning finished")
359 super(LinuxApplication, self).do_provision()
361 def upload_start_command(self, overwrite = False):
362 # Upload command to remote bash script
363 # - only if command can be executed in background and detached
364 command = self.get("command")
366 if command and not self.in_foreground:
367 # self.info("Uploading command '{}'".format(command))
369 # replace application specific paths in the command
370 command = self.replace_paths(command)
371 # replace application specific paths in the environment
372 env = self.get("env")
373 env = env and self.replace_paths(env)
375 shfile = os.path.join(self.app_home, "start.sh")
377 self.node.upload_command(command,
380 overwrite = overwrite)
382 def execute_deploy_command(self, command, prefix="deploy"):
384 # replace application specific paths in the command
385 command = self.replace_paths(command)
387 # replace application specific paths in the environment
388 env = self.get("env")
389 env = env and self.replace_paths(env)
391 # Upload the command to a bash script and run it
392 # in background ( but wait until the command has
393 # finished to continue )
394 shfile = os.path.join(self.app_home, "{}.sh".format(prefix))
395 # low-level spawn tools in both sshfuncs and execfuncs
396 # expect stderr=sshfuncs.STDOUT to mean std{out,err} are merged
397 stderr = "{}_stderr".format(prefix) \
398 if self.get("splitStderr") \
400 print("{} : prefix = {}, command={}, stderr={}"
401 .format(self, prefix, command, stderr))
402 self.node.run_and_wait(command, self.run_home,
405 pidfile = "{}_pidfile".format(prefix),
406 ecodefile = "{}_exitcode".format(prefix),
407 stdout = "{}_stdout".format(prefix),
410 def upload_sources(self, sources = None, src_dir = None):
412 sources = self.get("sources")
417 src_dir = self.node.src_dir
420 self.info("Uploading sources ")
422 sources = [str.strip(source) for source in sources.split(";")]
424 # Separate sources that should be downloaded from
425 # the web, from sources that should be uploaded from
428 for source in list(sources):
429 if source.startswith("http") or source.startswith("https"):
430 # remove the hhtp source from the sources list
431 sources.remove(source)
435 # Check if the source already exists
436 " ls {src_dir}/{basename} "
438 # If source doesn't exist, download it and check
439 # that it it downloaded ok
440 " wget -c --directory-prefix={src_dir} {source} && "
441 " ls {src_dir}/{basename} "
443 basename = os.path.basename(source),
448 command = " && ".join(command)
450 # replace application specific paths in the command
451 command = self.replace_paths(command)
454 sources = ';'.join(sources)
455 self.node.upload(sources, src_dir, overwrite = False)
459 def upload_files(self, files = None):
461 files = self.get("files")
464 self.info("Uploading files {} ".format(files))
465 self.node.upload(files, self.node.share_dir, overwrite = False)
467 def upload_libraries(self, libs = None):
469 libs = self.get("libs")
472 self.info("Uploading libraries {} ".format(libs))
473 self.node.upload(libs, self.node.lib_dir, overwrite = False)
475 def upload_binaries(self, bins = None):
477 bins = self.get("bins")
480 self.info("Uploading binaries {} ".format(bins))
481 self.node.upload(bins, self.node.bin_dir, overwrite = False)
483 def upload_code(self, code = None):
485 code = self.get("code")
488 self.info("Uploading code")
489 dst = os.path.join(self.app_home, "code")
490 self.node.upload(code, dst, overwrite = False, text = True, executable = True)
492 def upload_stdin(self, stdin = None):
494 stdin = self.get("stdin")
497 # create dir for sources
498 self.info("Uploading stdin")
500 # upload stdin file to ${SHARE_DIR} directory
501 if os.path.isfile(stdin):
502 basename = os.path.basename(stdin)
503 dst = os.path.join(self.node.share_dir, basename)
505 dst = os.path.join(self.app_home, "stdin")
507 self.node.upload(stdin, dst, overwrite = False, text = True)
509 # create "stdin" symlink on ${APP_HOME} directory
510 command = "( cd {app_home} ; [ ! -f stdin ] && ln -s {stdin} stdin )"\
511 .format(app_home = self.app_home, stdin = dst)
514 def install_dependencies(self, depends = None):
516 depends = self.get("depends")
519 self.info("Installing dependencies {}".format(depends))
520 return self.node.install_packages_command(depends)
522 def build(self, build = None):
524 build = self.get("build")
527 self.info("Building sources ")
529 # replace application specific paths in the command
530 return self.replace_paths(build)
532 def install(self, install = None):
534 install = self.get("install")
537 self.info("Installing sources ")
539 # replace application specific paths in the command
540 return self.replace_paths(install)
543 # Wait until node is associated and deployed
545 if not node or node.state < ResourceState.READY:
546 self.debug("---- RESCHEDULING DEPLOY ---- node state {} ".format(self.node.state))
547 self.ec.schedule(self.reschedule_delay, self.deploy)
549 command = self.get("command") or ""
550 self.info("Deploying command '{}' ".format(command))
554 super(LinuxApplication, self).do_deploy()
557 command = self.get("command")
559 self.info("Starting command '{}'".format(command))
562 # If no command was given (i.e. Application was used for dependency
563 # installation), then the application is directly marked as STOPPED
564 super(LinuxApplication, self).set_stopped()
566 if self.in_foreground:
567 self._run_in_foreground()
569 self._run_in_background()
571 super(LinuxApplication, self).do_start()
573 def _run_in_foreground(self):
574 command = self.get("command")
575 sudo = self.get("sudo") or False
576 x11 = self.get("forwardX11")
577 env = self.get("env")
579 # Command will be launched in foreground and attached to the
580 # terminal using the node 'execute' in non blocking mode.
582 # We save the reference to the process in self._proc
583 # to be able to kill the process from the stop method.
584 # We also set blocking = False, since we don't want the
585 # thread to block until the execution finishes.
586 (out, err), self._proc = self.execute_command(command,
592 if self._proc.poll():
593 self.error(msg, out, err)
594 raise RuntimeError(msg)
596 def _run_in_background(self):
597 command = self.get("command")
598 env = self.get("env")
599 sudo = self.get("sudo") or False
602 # low-level spawn tools in both sshfuncs and execfuncs
603 # expect stderr=sshfuncs.STDOUT to mean std{out,err} are merged
605 if self.get("splitStderr") \
607 stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
610 # Command will be run as a daemon in baground and detached from any
612 # The command to run was previously uploaded to a bash script
613 # during deployment, now we launch the remote script using 'run'
614 # method from the node.
615 cmd = "bash {}".format(os.path.join(self.app_home, "start.sh"))
616 (out, err), proc = self.node.run(cmd, self.run_home,
622 # check if execution errors occurred
623 msg = " Failed to start command '{}' ".format(command)
626 self.error(msg, out, err)
627 raise RuntimeError(msg)
629 # Wait for pid file to be generated
630 pid, ppid = self.node.wait_pid(self.run_home)
631 if pid: self._pid = int(pid)
632 if ppid: self._ppid = int(ppid)
634 # If the process is not running, check for error information
635 # on the remote machine
636 if not self.pid or not self.ppid:
637 (out, err), proc = self.node.check_errors(self.run_home,
640 # Out is what was written in the stderr file
642 msg = " Failed to start command '{}' ".format(command)
643 self.error(msg, out, err)
644 raise RuntimeError(msg)
647 """ Stops application execution
649 command = self.get('command') or ''
651 if self.state == ResourceState.STARTED:
653 self.info("Stopping command '{}' ".format(command))
655 # If the command is running in foreground (it was launched using
656 # the node 'execute' method), then we use the handler to the Popen
657 # process to kill it. Else we send a kill signal using the pid and ppid
658 # retrieved after running the command with the node 'run' method
662 # Only try to kill the process if the pid and ppid
664 if self.pid and self.ppid:
665 (out, err), proc = self.node.kill(self.pid, self.ppid,
666 sudo = self._sudo_kill)
669 # TODO: check if execution errors occurred
670 if (proc and proc.poll()) or err:
671 msg = " Failed to STOP command '{}' ".format(self.get("command"))
672 self.error(msg, out, err)
675 super(LinuxApplication, self).do_stop()
677 def do_release(self):
678 self.info("Releasing resource")
682 tear_down = self.get("tearDown")
684 self.node.execute(tear_down)
686 hard_release = self.get("hardRelease")
688 self.node.rmdir(self.app_home)
690 super(LinuxApplication, self).do_release()
694 """ Returns the state of the application
696 if self._state == ResourceState.STARTED:
697 if self.in_foreground:
698 # Check if the process we used to execute the command
699 # is still running ...
700 retcode = self._proc.poll()
702 # retcode == None -> running
703 # retcode > 0 -> error
704 # retcode == 0 -> finished
707 msg = " Failed to execute command '{}'".format(self.get("command"))
708 err = self._proc.stderr.read()
709 self.error(msg, out, err)
715 # We need to query the status of the command we launched in
716 # background. In order to avoid overwhelming the remote host and
717 # the local processor with too many ssh queries, the state is only
718 # requested every 'state_check_delay' seconds.
719 state_check_delay = 0.5
720 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
721 if self.pid and self.ppid:
722 # Make sure the process is still running in background
723 status = self.node.status(self.pid, self.ppid)
725 if status == ProcStatus.FINISHED:
726 # If the program finished, check if execution
729 = self.node.check_errors(self.run_home)
732 # Thierry : there's nothing wrong with a non-empty
734 #msg = "Failed to execute command '{}'"\
735 # .format(self.get("command"))
736 #self.error(msg, out, err)
738 # xxx TODO OTOH it would definitely make sense
739 # to check the exitcode
744 self._last_state_check = tnow()
748 def execute_command(self, command,
757 environ = self.node.format_environment(env, inline = True)
758 command = environ + command
759 command = self.replace_paths(command)
761 return self.node.execute(command,
764 forward_x11 = forward_x11,
767 def replace_paths(self, command, node = None, app_home = None, run_home = None):
769 Replace all special path tags with shell-escaped actual paths.
775 app_home = self.app_home
778 run_home = self.run_home
781 .replace("${USR}", node.usr_dir)
782 .replace("${LIB}", node.lib_dir)
783 .replace("${BIN}", node.bin_dir)
784 .replace("${SRC}", node.src_dir)
785 .replace("${SHARE}", node.share_dir)
786 .replace("${EXP}", node.exp_dir)
787 .replace("${EXP_HOME}", node.exp_home)
788 .replace("${APP_HOME}", app_home)
789 .replace("${RUN_HOME}", run_home)
790 .replace("${NODE_HOME}", node.node_home)
791 .replace("${HOME}", node.home_dir)
792 # a shortcut to refer to the file uploaded as 'code = '
793 .replace("${CODE}", "{}/code".format(app_home))
796 def valid_connection(self, guid):