2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.trace import Trace, TraceAttr
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sshfuncs import ProcStatus, STDOUT
25 from nepi.util.timefuncs import tnow, tdiffsec
30 # TODO: Resolve wildcards in commands!!
31 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
34 class LinuxApplication(ResourceManager):
36 .. class:: Class Args :
38 :param ec: The Experiment controller
39 :type ec: ExperimentController
40 :param guid: guid of the RM
45 A LinuxApplication RM represents a process that can be executed in
46 a remote Linux host using SSH.
48 The LinuxApplication RM takes care of uploadin sources and any files
49 needed to run the experiment, to the remote host.
50 It also allows to provide source compilation (build) and installation
51 instructions, and takes care of automating the sources build and
52 installation tasks for the user.
54 It is important to note that files uploaded to the remote host have
55 two possible scopes: single-experiment or multi-experiment.
56 Single experiment files are those that will not be re-used by other
57 experiments. Multi-experiment files are those that will.
58 Sources and shared files are always made available to all experiments.
62 The directory structure used by LinuxApplication RM at the Linux
63 host is the following:
65 ${HOME}/.nepi/nepi-usr --> Base directory for multi-experiment files
67 ${LIB} |- /lib --> Base directory for libraries
68 ${BIN} |- /bin --> Base directory for binary files
69 ${SRC} |- /src --> Base directory for sources
70 ${SHARE} |- /share --> Base directory for other files
72 ${HOME}/.nepi/nepi-exp --> Base directory for single-experiment files
74 ${EXP_HOME} |- /<exp-id> --> Base directory for experiment exp-id
76 ${APP_HOME} |- /<app-guid> --> Base directory for application
77 | specific files (e.g. command.sh, input)
79 ${RUN_HOME} |- /<run-id> --> Base directory for run specific
83 _rtype = "linux::Application"
84 _help = "Runs an application on a Linux host with a BASH command "
88 def _register_attributes(cls):
89 cls._register_attribute(
90 Attribute("command", "Command to execute at application start. "
91 "Note that commands will be executed in the ${RUN_HOME} directory, "
92 "make sure to take this into account when using relative paths. ",
93 flags = Flags.Design))
94 cls._register_attribute(
95 Attribute("forwardX11",
96 "Enables X11 forwarding for SSH connections",
97 flags = Flags.Design))
98 cls._register_attribute(
100 "Environment variables string for command execution",
101 flags = Flags.Design))
102 cls._register_attribute(
104 "Run with root privileges",
105 flags = Flags.Design))
106 cls._register_attribute(
108 "Space-separated list of packages required to run the application",
109 flags = Flags.Design))
110 cls._register_attribute(
112 "semi-colon separated list of regular files to be uploaded to ${SRC} "
113 "directory prior to building. Archives won't be expanded automatically. "
114 "Sources are globally available for all experiments unless "
115 "cleanHome is set to True (This will delete all sources). ",
116 flags = Flags.Design))
117 cls._register_attribute(
119 "semi-colon separated list of regular miscellaneous files to be uploaded "
120 "to ${SHARE} directory. "
121 "Files are globally available for all experiments unless "
122 "cleanHome is set to True (This will delete all files). ",
123 flags = Flags.Design))
124 cls._register_attribute(
126 "semi-colon separated list of libraries (e.g. .so files) to be uploaded "
127 "to ${LIB} directory. "
128 "Libraries are globally available for all experiments unless "
129 "cleanHome is set to True (This will delete all files). ",
130 flags = Flags.Design))
131 cls._register_attribute(
133 "semi-colon separated list of binary files to be uploaded "
134 "to ${BIN} directory. "
135 "Binaries are globally available for all experiments unless "
136 "cleanHome is set to True (This will delete all files). ",
137 flags = Flags.Design))
138 cls._register_attribute(
140 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
141 flags = Flags.Design))
142 cls._register_attribute(
144 "Build commands to execute after deploying the sources. "
145 "Sources are uploaded to the ${SRC} directory and code "
146 "is uploaded to the ${APP_HOME} directory. \n"
147 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
148 "./configure && make && make clean.\n"
149 "Make sure to make the build commands return with a nonzero exit "
151 flags = Flags.Design))
152 cls._register_attribute(
154 "Commands to transfer built files to their final destinations. "
155 "Install commands are executed after build commands. ",
156 flags = Flags.Design))
157 cls._register_attribute(
158 Attribute("stdin", "Standard input for the 'command'",
159 flags = Flags.Design))
160 cls._register_attribute(
161 Attribute("tearDown",
162 "Command to be executed just before releasing the resource",
163 flags = Flags.Design))
164 cls._register_attribute(
165 Attribute("splitStderr",
166 "requests stderr to be retrieved separately",
170 def _register_traces(cls):
172 Trace("stdout", "Standard output stream", enabled = True))
174 Trace("stderr", "Standard error stream", enabled = True))
176 def __init__(self, ec, guid):
177 super(LinuxApplication, self).__init__(ec, guid)
181 self._home = "app-{}".format(self.guid)
183 # whether the command should run in foreground attached
185 self._in_foreground = False
187 # whether to use sudo to kill the application process
188 self._sudo_kill = False
190 # keep a reference to the running process handler when
191 # the command is not executed as remote daemon in background
194 # timestamp of last state check of the application
195 self._last_state_check = tnow()
197 def log_message(self, msg):
198 return " guid {} - host {} - {} "\
199 .format(self.guid, self.node.get("hostname"), msg)
204 node = self.get_connected(LinuxNode.get_rtype())
206 msg = "Application {} guid {} NOT connected to Node"\
207 .format(self._rtype, self.guid)
208 raise RuntimeError(msg)
216 return os.path.join(self.node.exp_home, self._home)
220 return os.path.join(self.app_home, self.ec.run_id)
231 def in_foreground(self):
233 Returns True if the command needs to be executed in foreground.
234 This means that command will be executed using 'execute' instead of
235 'run' ('run' executes a command in background and detached from the
238 When using X11 forwarding option, the command can not run in background
239 and detached from a terminal, since we need to keep the terminal attached
242 return self.get("forwardX11") or self._in_foreground
244 def trace_filepath(self, filename):
245 return os.path.join(self.run_home, filename)
247 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
248 self.info("Retrieving '{}' trace {} ".format(name, attr))
250 path = self.trace_filepath(name)
252 command = "(test -f {} && echo 'success') || echo 'error'".format(path)
253 (out, err), proc = self.node.execute(command)
255 if (err and proc.poll()) or out.find("error") != -1:
256 msg = " Couldn't find trace {} ".format(name)
257 self.error(msg, out, err)
260 if attr == TraceAttr.PATH:
263 if attr == TraceAttr.ALL:
264 (out, err), proc = self.node.check_output(self.run_home, name)
267 msg = " Couldn't read trace {} ".format(name)
268 self.error(msg, out, err)
273 if attr == TraceAttr.STREAM:
274 cmd = "dd if={} bs={} count=1 skip={}".format(path, block, offset)
275 elif attr == TraceAttr.SIZE:
276 cmd = "stat -c {} ".format(path)
278 (out, err), proc = self.node.execute(cmd)
281 msg = " Couldn't find trace {} ".format(name)
282 self.error(msg, out, err)
285 if attr == TraceAttr.SIZE:
286 out = int(out.strip())
290 def do_provision(self):
291 # take a snapshot of the system if user is root
292 # to ensure that cleanProcess will not kill
293 # pre-existent processes
294 if self.node.get("username") == 'root':
297 ps_aux = "ps aux | awk '{print $2,$11}'"
298 (out, err), proc = self.node.execute(ps_aux)
300 for line in out.strip().split("\n"):
301 parts = line.strip().split(" ")
302 procs[parts[0]] = parts[1]
303 with open("/tmp/save.proc", "wb") as pickle_file:
304 pickle.dump(procs, pickle_file)
306 # create run dir for application
307 self.node.mkdir(self.run_home)
309 # List of all the provision methods to invoke
316 self.upload_binaries,
318 self.upload_libraries,
323 # install dependencies
324 self.install_dependencies,
333 # Since provisioning takes a long time, before
334 # each step we check that the EC is still
337 self.debug("Interrupting provisioning. EC says 'ABORT")
344 # upload deploy script
345 deploy_command = ";".join(command)
346 self.execute_deploy_command(deploy_command)
348 # upload start script
349 self.upload_start_command()
351 self.info("Provisioning finished")
353 super(LinuxApplication, self).do_provision()
355 def upload_start_command(self, overwrite = False):
356 # Upload command to remote bash script
357 # - only if command can be executed in background and detached
358 command = self.get("command")
360 if command and not self.in_foreground:
361 # self.info("Uploading command '{}'".format(command))
363 # replace application specific paths in the command
364 command = self.replace_paths(command)
365 # replace application specific paths in the environment
366 env = self.get("env")
367 env = env and self.replace_paths(env)
369 shfile = os.path.join(self.app_home, "start.sh")
371 self.node.upload_command(command,
374 overwrite = overwrite)
376 def execute_deploy_command(self, command, prefix="deploy"):
378 # replace application specific paths in the command
379 command = self.replace_paths(command)
381 # replace application specific paths in the environment
382 env = self.get("env")
383 env = env and self.replace_paths(env)
385 # Upload the command to a bash script and run it
386 # in background ( but wait until the command has
387 # finished to continue )
388 shfile = os.path.join(self.app_home, "{}.sh".format(prefix))
389 # low-level spawn tools in both sshfuncs and execfuncs
390 # expect stderr=sshfuncs.STDOUT to mean std{out,err} are merged
391 stderr = "{}_stderr".format(prefix) \
392 if self.get("splitStderr") \
394 print("{} : prefix = {}, command={}, stderr={}"
395 .format(self, prefix, command, stderr))
396 self.node.run_and_wait(command, self.run_home,
399 pidfile = "{}_pidfile".format(prefix),
400 ecodefile = "{}_exitcode".format(prefix),
401 stdout = "{}_stdout".format(prefix),
404 def upload_sources(self, sources = None, src_dir = None):
406 sources = self.get("sources")
411 src_dir = self.node.src_dir
414 self.info("Uploading sources ")
416 sources = [str.strip(source) for source in sources.split(";")]
418 # Separate sources that should be downloaded from
419 # the web, from sources that should be uploaded from
422 for source in list(sources):
423 if source.startswith("http") or source.startswith("https"):
424 # remove the hhtp source from the sources list
425 sources.remove(source)
429 # Check if the source already exists
430 " ls {src_dir}/{basename} "
432 # If source doesn't exist, download it and check
433 # that it it downloaded ok
434 " wget -c --directory-prefix={src_dir} {source} && "
435 " ls {src_dir}/{basename} "
437 basename = os.path.basename(source),
442 command = " && ".join(command)
444 # replace application specific paths in the command
445 command = self.replace_paths(command)
448 sources = ';'.join(sources)
449 self.node.upload(sources, src_dir, overwrite = False)
453 def upload_files(self, files = None):
455 files = self.get("files")
458 self.info("Uploading files {} ".format(files))
459 self.node.upload(files, self.node.share_dir, overwrite = False)
461 def upload_libraries(self, libs = None):
463 libs = self.get("libs")
466 self.info("Uploading libraries {} ".format(libs))
467 self.node.upload(libs, self.node.lib_dir, overwrite = False)
469 def upload_binaries(self, bins = None):
471 bins = self.get("bins")
474 self.info("Uploading binaries {} ".format(bins))
475 self.node.upload(bins, self.node.bin_dir, overwrite = False)
477 def upload_code(self, code = None):
479 code = self.get("code")
482 self.info("Uploading code")
483 dst = os.path.join(self.app_home, "code")
484 self.node.upload(code, dst, overwrite = False, text = True, executable = True)
486 def upload_stdin(self, stdin = None):
488 stdin = self.get("stdin")
491 # create dir for sources
492 self.info("Uploading stdin")
494 # upload stdin file to ${SHARE_DIR} directory
495 if os.path.isfile(stdin):
496 basename = os.path.basename(stdin)
497 dst = os.path.join(self.node.share_dir, basename)
499 dst = os.path.join(self.app_home, "stdin")
501 self.node.upload(stdin, dst, overwrite = False, text = True)
503 # create "stdin" symlink on ${APP_HOME} directory
504 command = "( cd {app_home} ; [ ! -f stdin ] && ln -s {stdin} stdin )"\
505 .format(app_home = self.app_home, stdin = dst)
508 def install_dependencies(self, depends = None):
510 depends = self.get("depends")
513 self.info("Installing dependencies {}".format(depends))
514 return self.node.install_packages_command(depends)
516 def build(self, build = None):
518 build = self.get("build")
521 self.info("Building sources ")
523 # replace application specific paths in the command
524 return self.replace_paths(build)
526 def install(self, install = None):
528 install = self.get("install")
531 self.info("Installing sources ")
533 # replace application specific paths in the command
534 return self.replace_paths(install)
537 # Wait until node is associated and deployed
539 if not node or node.state < ResourceState.READY:
540 self.debug("---- RESCHEDULING DEPLOY ---- node state {} ".format(self.node.state))
541 self.ec.schedule(self.reschedule_delay, self.deploy)
543 command = self.get("command") or ""
544 self.info("Deploying command '{}' ".format(command))
548 super(LinuxApplication, self).do_deploy()
551 command = self.get("command")
553 self.info("Starting command '{}'".format(command))
556 # If no command was given (i.e. Application was used for dependency
557 # installation), then the application is directly marked as STOPPED
558 super(LinuxApplication, self).set_stopped()
560 if self.in_foreground:
561 self._run_in_foreground()
563 self._run_in_background()
565 super(LinuxApplication, self).do_start()
567 def _run_in_foreground(self):
568 command = self.get("command")
569 sudo = self.get("sudo") or False
570 x11 = self.get("forwardX11")
571 env = self.get("env")
573 # Command will be launched in foreground and attached to the
574 # terminal using the node 'execute' in non blocking mode.
576 # We save the reference to the process in self._proc
577 # to be able to kill the process from the stop method.
578 # We also set blocking = False, since we don't want the
579 # thread to block until the execution finishes.
580 (out, err), self._proc = self.execute_command(command,
586 if self._proc.poll():
587 self.error(msg, out, err)
588 raise RuntimeError(msg)
590 def _run_in_background(self):
591 command = self.get("command")
592 env = self.get("env")
593 sudo = self.get("sudo") or False
596 # low-level spawn tools in both sshfuncs and execfuncs
597 # expect stderr=sshfuncs.STDOUT to mean std{out,err} are merged
599 if self.get("splitStderr") \
601 stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
604 # Command will be run as a daemon in baground and detached from any
606 # The command to run was previously uploaded to a bash script
607 # during deployment, now we launch the remote script using 'run'
608 # method from the node.
609 cmd = "bash {}".format(os.path.join(self.app_home, "start.sh"))
610 (out, err), proc = self.node.run(cmd, self.run_home,
616 # check if execution errors occurred
617 msg = " Failed to start command '{}' ".format(command)
620 self.error(msg, out, err)
621 raise RuntimeError(msg)
623 # Wait for pid file to be generated
624 pid, ppid = self.node.wait_pid(self.run_home)
625 if pid: self._pid = int(pid)
626 if ppid: self._ppid = int(ppid)
628 # If the process is not running, check for error information
629 # on the remote machine
630 if not self.pid or not self.ppid:
631 (out, err), proc = self.node.check_errors(self.run_home,
634 # Out is what was written in the stderr file
636 msg = " Failed to start command '{}' ".format(command)
637 self.error(msg, out, err)
638 raise RuntimeError(msg)
641 """ Stops application execution
643 command = self.get('command') or ''
645 if self.state == ResourceState.STARTED:
647 self.info("Stopping command '{}' ".format(command))
649 # If the command is running in foreground (it was launched using
650 # the node 'execute' method), then we use the handler to the Popen
651 # process to kill it. Else we send a kill signal using the pid and ppid
652 # retrieved after running the command with the node 'run' method
656 # Only try to kill the process if the pid and ppid
658 if self.pid and self.ppid:
659 (out, err), proc = self.node.kill(self.pid, self.ppid,
660 sudo = self._sudo_kill)
663 # TODO: check if execution errors occurred
664 if (proc and proc.poll()) or err:
665 msg = " Failed to STOP command '{}' ".format(self.get("command"))
666 self.error(msg, out, err)
669 super(LinuxApplication, self).do_stop()
671 def do_release(self):
672 self.info("Releasing resource")
676 tear_down = self.get("tearDown")
678 self.node.execute(tear_down)
680 hard_release = self.get("hardRelease")
682 self.node.rmdir(self.app_home)
684 super(LinuxApplication, self).do_release()
688 """ Returns the state of the application
690 if self._state == ResourceState.STARTED:
691 if self.in_foreground:
692 # Check if the process we used to execute the command
693 # is still running ...
694 retcode = self._proc.poll()
696 # retcode == None -> running
697 # retcode > 0 -> error
698 # retcode == 0 -> finished
701 msg = " Failed to execute command '{}'".format(self.get("command"))
702 err = self._proc.stderr.read()
703 self.error(msg, out, err)
709 # We need to query the status of the command we launched in
710 # background. In order to avoid overwhelming the remote host and
711 # the local processor with too many ssh queries, the state is only
712 # requested every 'state_check_delay' seconds.
713 state_check_delay = 0.5
714 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
715 if self.pid and self.ppid:
716 # Make sure the process is still running in background
717 status = self.node.status(self.pid, self.ppid)
719 if status == ProcStatus.FINISHED:
720 # If the program finished, check if execution
723 = self.node.check_errors(self.run_home)
726 msg = "Failed to execute command '{}'"\
727 .format(self.get("command"))
728 self.error(msg, out, err)
733 self._last_state_check = tnow()
737 def execute_command(self, command,
746 environ = self.node.format_environment(env, inline = True)
747 command = environ + command
748 command = self.replace_paths(command)
750 return self.node.execute(command,
753 forward_x11 = forward_x11,
756 def replace_paths(self, command, node = None, app_home = None, run_home = None):
758 Replace all special path tags with shell-escaped actual paths.
764 app_home = self.app_home
767 run_home = self.run_home
770 .replace("${USR}", node.usr_dir)
771 .replace("${LIB}", node.lib_dir)
772 .replace("${BIN}", node.bin_dir)
773 .replace("${SRC}", node.src_dir)
774 .replace("${SHARE}", node.share_dir)
775 .replace("${EXP}", node.exp_dir)
776 .replace("${EXP_HOME}", node.exp_home)
777 .replace("${APP_HOME}", app_home)
778 .replace("${RUN_HOME}", run_home)
779 .replace("${NODE_HOME}", node.node_home)
780 .replace("${HOME}", node.home_dir)
781 # a shortcut to refer to the file uploaded as 'code = '
782 .replace("${CODE}", "{}/code".format(app_home))
785 def valid_connection(self, guid):