2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.trace import Trace, TraceAttr
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sshfuncs import ProcStatus, STDOUT
25 from nepi.util.timefuncs import tnow, tdiffsec
30 # TODO: Resolve wildcards in commands!!
31 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
34 class LinuxApplication(ResourceManager):
36 .. class:: Class Args :
38 :param ec: The Experiment controller
39 :type ec: ExperimentController
40 :param guid: guid of the RM
45 A LinuxApplication RM represents a process that can be executed in
46 a remote Linux host using SSH.
48 The LinuxApplication RM takes care of uploadin sources and any files
49 needed to run the experiment, to the remote host.
50 It also allows to provide source compilation (build) and installation
51 instructions, and takes care of automating the sources build and
52 installation tasks for the user.
54 It is important to note that files uploaded to the remote host have
55 two possible scopes: single-experiment or multi-experiment.
56 Single experiment files are those that will not be re-used by other
57 experiments. Multi-experiment files are those that will.
58 Sources and shared files are always made available to all experiments.
62 The directory structure used by LinuxApplication RM at the Linux
63 host is the following:
65 ${HOME}/.nepi/nepi-usr --> Base directory for multi-experiment files
67 ${LIB} |- /lib --> Base directory for libraries
68 ${BIN} |- /bin --> Base directory for binary files
69 ${SRC} |- /src --> Base directory for sources
70 ${SHARE} |- /share --> Base directory for other files
72 ${HOME}/.nepi/nepi-exp --> Base directory for single-experiment files
74 ${EXP_HOME} |- /<exp-id> --> Base directory for experiment exp-id
76 ${APP_HOME} |- /<app-guid> --> Base directory for application
77 | specific files (e.g. command.sh, input)
79 ${RUN_HOME} |- /<run-id> --> Base directory for run specific
83 _rtype = "linux::Application"
84 _help = "Runs an application on a Linux host with a BASH command "
88 def _register_attributes(cls):
89 cls._register_attribute(
90 Attribute("command", "Command to execute at application start. "
91 "Note that commands will be executed in the ${RUN_HOME} directory, "
92 "make sure to take this into account when using relative paths. ",
93 flags = Flags.Design))
94 cls._register_attribute(
95 Attribute("forwardX11",
96 "Enables X11 forwarding for SSH connections",
97 flags = Flags.Design))
98 cls._register_attribute(
100 "Environment variables string for command execution",
101 flags = Flags.Design))
102 cls._register_attribute(
104 "Run with root privileges",
105 flags = Flags.Design))
106 cls._register_attribute(
108 "Space-separated list of packages required to run the application",
109 flags = Flags.Design))
110 cls._register_attribute(
112 "semi-colon separated list of regular files to be uploaded to ${SRC} "
113 "directory prior to building. Archives won't be expanded automatically. "
114 "Sources are globally available for all experiments unless "
115 "cleanHome is set to True (This will delete all sources). ",
116 flags = Flags.Design))
117 cls._register_attribute(
119 "semi-colon separated list of regular miscellaneous files to be uploaded "
120 "to ${SHARE} directory. "
121 "Files are globally available for all experiments unless "
122 "cleanHome is set to True (This will delete all files). ",
123 flags = Flags.Design))
124 cls._register_attribute(
126 "semi-colon separated list of libraries (e.g. .so files) to be uploaded "
127 "to ${LIB} directory. "
128 "Libraries are globally available for all experiments unless "
129 "cleanHome is set to True (This will delete all files). ",
130 flags = Flags.Design))
131 cls._register_attribute(
133 "semi-colon separated list of binary files to be uploaded "
134 "to ${BIN} directory. "
135 "Binaries are globally available for all experiments unless "
136 "cleanHome is set to True (This will delete all files). ",
137 flags = Flags.Design))
138 cls._register_attribute(
140 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
141 flags = Flags.Design))
142 cls._register_attribute(
144 "Build commands to execute after deploying the sources. "
145 "Sources are uploaded to the ${SRC} directory and code "
146 "is uploaded to the ${APP_HOME} directory. \n"
147 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
148 "./configure && make && make clean.\n"
149 "Make sure to make the build commands return with a nonzero exit "
151 flags = Flags.Design))
152 cls._register_attribute(
154 "Commands to transfer built files to their final destinations. "
155 "Install commands are executed after build commands. ",
156 flags = Flags.Design))
157 cls._register_attribute(
158 Attribute("stdin", "Standard input for the 'command'",
159 flags = Flags.Design))
160 cls._register_attribute(
161 Attribute("tearDown",
162 "Command to be executed just before releasing the resource",
163 flags = Flags.Design))
164 cls._register_attribute(
165 Attribute("splitStderr",
166 "requests stderr to be retrieved separately",
170 def _register_traces(cls):
171 stdout = Trace("stdout", "Standard output stream", enabled = True)
172 stderr = Trace("stderr", "Standard error stream", enabled = True)
174 cls._register_trace(stdout)
175 cls._register_trace(stderr)
177 def __init__(self, ec, guid):
178 super(LinuxApplication, self).__init__(ec, guid)
182 self._home = "app-{}".format(self.guid)
184 # whether the command should run in foreground attached
186 self._in_foreground = False
188 # whether to use sudo to kill the application process
189 self._sudo_kill = False
191 # keep a reference to the running process handler when
192 # the command is not executed as remote daemon in background
195 # timestamp of last state check of the application
196 self._last_state_check = tnow()
198 def log_message(self, msg):
199 return " guid {} - host {} - {} "\
200 .format(self.guid, self.node.get("hostname"), msg)
205 node = self.get_connected(LinuxNode.get_rtype())
207 msg = "Application {} guid {} NOT connected to Node"\
208 .format(self._rtype, self.guid)
209 raise RuntimeError(msg)
217 return os.path.join(self.node.exp_home, self._home)
221 return os.path.join(self.app_home, self.ec.run_id)
232 def in_foreground(self):
234 Returns True if the command needs to be executed in foreground.
235 This means that command will be executed using 'execute' instead of
236 'run' ('run' executes a command in background and detached from the
239 When using X11 forwarding option, the command can not run in background
240 and detached from a terminal, since we need to keep the terminal attached
243 return self.get("forwardX11") or self._in_foreground
245 def trace_filepath(self, filename):
246 return os.path.join(self.run_home, filename)
248 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
249 self.info("Retrieving '{}' trace {} ".format(name, attr))
251 path = self.trace_filepath(name)
253 command = "(test -f {} && echo 'success') || echo 'error'".format(path)
254 (out, err), proc = self.node.execute(command)
256 if (err and proc.poll()) or out.find("error") != -1:
257 msg = " Couldn't find trace {} ".format(name)
258 self.error(msg, out, err)
261 if attr == TraceAttr.PATH:
264 if attr == TraceAttr.ALL:
265 (out, err), proc = self.node.check_output(self.run_home, name)
268 msg = " Couldn't read trace {} ".format(name)
269 self.error(msg, out, err)
274 if attr == TraceAttr.STREAM:
275 cmd = "dd if={} bs={} count=1 skip={}".format(path, block, offset)
276 elif attr == TraceAttr.SIZE:
277 cmd = "stat -c {} ".format(path)
279 (out, err), proc = self.node.execute(cmd)
282 msg = " Couldn't find trace {} ".format(name)
283 self.error(msg, out, err)
286 if attr == TraceAttr.SIZE:
287 out = int(out.strip())
291 def do_provision(self):
292 # take a snapshot of the system if user is root
293 # to ensure that cleanProcess will not kill
294 # pre-existent processes
295 if self.node.get("username") == 'root':
298 ps_aux = "ps aux | awk '{print $2,$11}'"
299 (out, err), proc = self.node.execute(ps_aux)
301 for line in out.strip().split("\n"):
302 parts = line.strip().split(" ")
303 procs[parts[0]] = parts[1]
304 with open("/tmp/save.proc", "wb") as pickle_file:
305 pickle.dump(procs, pickle_file)
307 # create run dir for application
308 self.node.mkdir(self.run_home)
310 # List of all the provision methods to invoke
317 self.upload_binaries,
319 self.upload_libraries,
324 # install dependencies
325 self.install_dependencies,
334 # Since provisioning takes a long time, before
335 # each step we check that the EC is still
338 self.debug("Interrupting provisioning. EC says 'ABORT")
345 # upload deploy script
346 deploy_command = ";".join(command)
347 self.execute_deploy_command(deploy_command)
349 # upload start script
350 self.upload_start_command()
352 self.info("Provisioning finished")
354 super(LinuxApplication, self).do_provision()
356 def upload_start_command(self, overwrite = False):
357 # Upload command to remote bash script
358 # - only if command can be executed in background and detached
359 command = self.get("command")
361 if command and not self.in_foreground:
362 # self.info("Uploading command '{}'".format(command))
364 # replace application specific paths in the command
365 command = self.replace_paths(command)
366 # replace application specific paths in the environment
367 env = self.get("env")
368 env = env and self.replace_paths(env)
370 shfile = os.path.join(self.app_home, "start.sh")
372 self.node.upload_command(command,
375 overwrite = overwrite)
377 def execute_deploy_command(self, command, prefix="deploy"):
379 # replace application specific paths in the command
380 command = self.replace_paths(command)
382 # replace application specific paths in the environment
383 env = self.get("env")
384 env = env and self.replace_paths(env)
386 # Upload the command to a bash script and run it
387 # in background ( but wait until the command has
388 # finished to continue )
389 shfile = os.path.join(self.app_home, "{}.sh".format(prefix))
390 # low-level spawn tools in both sshfuncs and execfuncs
391 # expect stderr=sshfuncs.STDOUT to mean std{out,err} are merged
392 stderr = "{}_stderr".format(prefix) \
393 if self.get("splitStderr") \
395 print("{} : prefix = {}, command={}, stderr={}"
396 .format(self, prefix, command, stderr))
397 self.node.run_and_wait(command, self.run_home,
400 pidfile = "{}_pidfile".format(prefix),
401 ecodefile = "{}_exitcode".format(prefix),
402 stdout = "{}_stdout".format(prefix),
405 def upload_sources(self, sources = None, src_dir = None):
407 sources = self.get("sources")
412 src_dir = self.node.src_dir
415 self.info("Uploading sources ")
417 sources = [str.strip(source) for source in sources.split(";")]
419 # Separate sources that should be downloaded from
420 # the web, from sources that should be uploaded from
423 for source in list(sources):
424 if source.startswith("http") or source.startswith("https"):
425 # remove the hhtp source from the sources list
426 sources.remove(source)
430 # Check if the source already exists
431 " ls {src_dir}/{basename} "
433 # If source doesn't exist, download it and check
434 # that it it downloaded ok
435 " wget -c --directory-prefix={src_dir} {source} && "
436 " ls {src_dir}/{basename} "
438 basename = os.path.basename(source),
443 command = " && ".join(command)
445 # replace application specific paths in the command
446 command = self.replace_paths(command)
449 sources = ';'.join(sources)
450 self.node.upload(sources, src_dir, overwrite = False)
454 def upload_files(self, files = None):
456 files = self.get("files")
459 self.info("Uploading files {} ".format(files))
460 self.node.upload(files, self.node.share_dir, overwrite = False)
462 def upload_libraries(self, libs = None):
464 libs = self.get("libs")
467 self.info("Uploading libraries {} ".format(libs))
468 self.node.upload(libs, self.node.lib_dir, overwrite = False)
470 def upload_binaries(self, bins = None):
472 bins = self.get("bins")
475 self.info("Uploading binaries {} ".format(bins))
476 self.node.upload(bins, self.node.bin_dir, overwrite = False)
478 def upload_code(self, code = None):
480 code = self.get("code")
483 self.info("Uploading code")
484 dst = os.path.join(self.app_home, "code")
485 self.node.upload(code, dst, overwrite = False, text = True, executable = True)
487 def upload_stdin(self, stdin = None):
489 stdin = self.get("stdin")
492 # create dir for sources
493 self.info("Uploading stdin")
495 # upload stdin file to ${SHARE_DIR} directory
496 if os.path.isfile(stdin):
497 basename = os.path.basename(stdin)
498 dst = os.path.join(self.node.share_dir, basename)
500 dst = os.path.join(self.app_home, "stdin")
502 self.node.upload(stdin, dst, overwrite = False, text = True)
504 # create "stdin" symlink on ${APP_HOME} directory
505 command = "( cd {app_home} ; [ ! -f stdin ] && ln -s {stdin} stdin )"\
506 .format(app_home = self.app_home, stdin = dst)
509 def install_dependencies(self, depends = None):
511 depends = self.get("depends")
514 self.info("Installing dependencies {}".format(depends))
515 return self.node.install_packages_command(depends)
517 def build(self, build = None):
519 build = self.get("build")
522 self.info("Building sources ")
524 # replace application specific paths in the command
525 return self.replace_paths(build)
527 def install(self, install = None):
529 install = self.get("install")
532 self.info("Installing sources ")
534 # replace application specific paths in the command
535 return self.replace_paths(install)
538 # Wait until node is associated and deployed
540 if not node or node.state < ResourceState.READY:
541 self.debug("---- RESCHEDULING DEPLOY ---- node state {} ".format(self.node.state))
542 self.ec.schedule(self.reschedule_delay, self.deploy)
544 command = self.get("command") or ""
545 self.info("Deploying command '{}' ".format(command))
549 super(LinuxApplication, self).do_deploy()
552 command = self.get("command")
554 self.info("Starting command '{}'".format(command))
557 # If no command was given (i.e. Application was used for dependency
558 # installation), then the application is directly marked as STOPPED
559 super(LinuxApplication, self).set_stopped()
561 if self.in_foreground:
562 self._run_in_foreground()
564 self._run_in_background()
566 super(LinuxApplication, self).do_start()
568 def _run_in_foreground(self):
569 command = self.get("command")
570 sudo = self.get("sudo") or False
571 x11 = self.get("forwardX11")
572 env = self.get("env")
574 # Command will be launched in foreground and attached to the
575 # terminal using the node 'execute' in non blocking mode.
577 # We save the reference to the process in self._proc
578 # to be able to kill the process from the stop method.
579 # We also set blocking = False, since we don't want the
580 # thread to block until the execution finishes.
581 (out, err), self._proc = self.execute_command(command,
587 if self._proc.poll():
588 self.error(msg, out, err)
589 raise RuntimeError(msg)
591 def _run_in_background(self):
592 command = self.get("command")
593 env = self.get("env")
594 sudo = self.get("sudo") or False
597 # low-level spawn tools in both sshfuncs and execfuncs
598 # expect stderr=sshfuncs.STDOUT to mean std{out,err} are merged
600 if self.get("splitStderr") \
602 stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
605 # Command will be run as a daemon in baground and detached from any
607 # The command to run was previously uploaded to a bash script
608 # during deployment, now we launch the remote script using 'run'
609 # method from the node.
610 cmd = "bash {}".format(os.path.join(self.app_home, "start.sh"))
611 (out, err), proc = self.node.run(cmd, self.run_home,
617 # check if execution errors occurred
618 msg = " Failed to start command '{}' ".format(command)
621 self.error(msg, out, err)
622 raise RuntimeError(msg)
624 # Wait for pid file to be generated
625 pid, ppid = self.node.wait_pid(self.run_home)
626 if pid: self._pid = int(pid)
627 if ppid: self._ppid = int(ppid)
629 # If the process is not running, check for error information
630 # on the remote machine
631 if not self.pid or not self.ppid:
632 (out, err), proc = self.node.check_errors(self.run_home,
635 # Out is what was written in the stderr file
637 msg = " Failed to start command '{}' ".format(command)
638 self.error(msg, out, err)
639 raise RuntimeError(msg)
642 """ Stops application execution
644 command = self.get('command') or ''
646 if self.state == ResourceState.STARTED:
648 self.info("Stopping command '{}' ".format(command))
650 # If the command is running in foreground (it was launched using
651 # the node 'execute' method), then we use the handler to the Popen
652 # process to kill it. Else we send a kill signal using the pid and ppid
653 # retrieved after running the command with the node 'run' method
657 # Only try to kill the process if the pid and ppid
659 if self.pid and self.ppid:
660 (out, err), proc = self.node.kill(self.pid, self.ppid,
661 sudo = self._sudo_kill)
664 # TODO: check if execution errors occurred
665 if (proc and proc.poll()) or err:
666 msg = " Failed to STOP command '{}' ".format(self.get("command"))
667 self.error(msg, out, err)
670 super(LinuxApplication, self).do_stop()
672 def do_release(self):
673 self.info("Releasing resource")
677 tear_down = self.get("tearDown")
679 self.node.execute(tear_down)
681 hard_release = self.get("hardRelease")
683 self.node.rmdir(self.app_home)
685 super(LinuxApplication, self).do_release()
689 """ Returns the state of the application
691 if self._state == ResourceState.STARTED:
692 if self.in_foreground:
693 # Check if the process we used to execute the command
694 # is still running ...
695 retcode = self._proc.poll()
697 # retcode == None -> running
698 # retcode > 0 -> error
699 # retcode == 0 -> finished
702 msg = " Failed to execute command '{}'".format(self.get("command"))
703 err = self._proc.stderr.read()
704 self.error(msg, out, err)
710 # We need to query the status of the command we launched in
711 # background. In order to avoid overwhelming the remote host and
712 # the local processor with too many ssh queries, the state is only
713 # requested every 'state_check_delay' seconds.
714 state_check_delay = 0.5
715 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
716 if self.pid and self.ppid:
717 # Make sure the process is still running in background
718 status = self.node.status(self.pid, self.ppid)
720 if status == ProcStatus.FINISHED:
721 # If the program finished, check if execution
724 = self.node.check_errors(self.run_home)
727 msg = "Failed to execute command '{}'"\
728 .format(self.get("command"))
729 self.error(msg, out, err)
734 self._last_state_check = tnow()
738 def execute_command(self, command,
747 environ = self.node.format_environment(env, inline = True)
748 command = environ + command
749 command = self.replace_paths(command)
751 return self.node.execute(command,
754 forward_x11 = forward_x11,
757 def replace_paths(self, command, node = None, app_home = None, run_home = None):
759 Replace all special path tags with shell-escaped actual paths.
765 app_home = self.app_home
768 run_home = self.run_home
771 .replace("${USR}", node.usr_dir)
772 .replace("${LIB}", node.lib_dir)
773 .replace("${BIN}", node.bin_dir)
774 .replace("${SRC}", node.src_dir)
775 .replace("${SHARE}", node.share_dir)
776 .replace("${EXP}", node.exp_dir)
777 .replace("${EXP_HOME}", node.exp_home)
778 .replace("${APP_HOME}", app_home)
779 .replace("${RUN_HOME}", run_home)
780 .replace("${NODE_HOME}", node.node_home)
781 .replace("${HOME}", node.home_dir)
782 # a shortcut to refer to the file uploaded as 'code = '
783 .replace("${CODE}", "{}/code".format(app_home))
786 def valid_connection(self, guid):