2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.trace import Trace, TraceAttr
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sshfuncs import ProcStatus
25 from nepi.util.timefuncs import tnow, tdiffsec
30 # TODO: Resolve wildcards in commands!!
31 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
34 class LinuxApplication(ResourceManager):
36 .. class:: Class Args :
38 :param ec: The Experiment controller
39 :type ec: ExperimentController
40 :param guid: guid of the RM
45 A LinuxApplication RM represents a process that can be executed in
46 a remote Linux host using SSH.
48 The LinuxApplication RM takes care of uploadin sources and any files
49 needed to run the experiment, to the remote host.
50 It also allows to provide source compilation (build) and installation
51 instructions, and takes care of automating the sources build and
52 installation tasks for the user.
54 It is important to note that files uploaded to the remote host have
55 two possible scopes: single-experiment or multi-experiment.
56 Single experiment files are those that will not be re-used by other
57 experiments. Multi-experiment files are those that will.
58 Sources and shared files are always made available to all experiments.
62 The directory structure used by LinuxApplication RM at the Linux
63 host is the following:
65 ${HOME}/.nepi/nepi-usr --> Base directory for multi-experiment files
67 ${LIB} |- /lib --> Base directory for libraries
68 ${BIN} |- /bin --> Base directory for binary files
69 ${SRC} |- /src --> Base directory for sources
70 ${SHARE} |- /share --> Base directory for other files
72 ${HOME}/.nepi/nepi-exp --> Base directory for single-experiment files
74 ${EXP_HOME} |- /<exp-id> --> Base directory for experiment exp-id
76 ${APP_HOME} |- /<app-guid> --> Base directory for application
77 | specific files (e.g. command.sh, input)
79 ${RUN_HOME} |- /<run-id> --> Base directory for run specific
83 _rtype = "linux::Application"
84 _help = "Runs an application on a Linux host with a BASH command "
88 def _register_attributes(cls):
89 cls._register_attribute(
90 Attribute("command", "Command to execute at application start. "
91 "Note that commands will be executed in the ${RUN_HOME} directory, "
92 "make sure to take this into account when using relative paths. ",
93 flags = Flags.Design))
94 cls._register_attribute(
95 Attribute("forwardX11",
96 "Enables X11 forwarding for SSH connections",
97 flags = Flags.Design))
98 cls._register_attribute(
100 "Environment variables string for command execution",
101 flags = Flags.Design))
102 cls._register_attribute(
104 "Run with root privileges",
105 flags = Flags.Design))
106 cls._register_attribute(
108 "Space-separated list of packages required to run the application",
109 flags = Flags.Design))
110 cls._register_attribute(
112 "semi-colon separated list of regular files to be uploaded to ${SRC} "
113 "directory prior to building. Archives won't be expanded automatically. "
114 "Sources are globally available for all experiments unless "
115 "cleanHome is set to True (This will delete all sources). ",
116 flags = Flags.Design))
117 cls._register_attribute(
119 "semi-colon separated list of regular miscellaneous files to be uploaded "
120 "to ${SHARE} directory. "
121 "Files are globally available for all experiments unless "
122 "cleanHome is set to True (This will delete all files). ",
123 flags = Flags.Design))
124 cls._register_attribute(
126 "semi-colon separated list of libraries (e.g. .so files) to be uploaded "
127 "to ${LIB} directory. "
128 "Libraries are globally available for all experiments unless "
129 "cleanHome is set to True (This will delete all files). ",
130 flags = Flags.Design))
131 cls._register_attribute(
133 "semi-colon separated list of binary files to be uploaded "
134 "to ${BIN} directory. "
135 "Binaries are globally available for all experiments unless "
136 "cleanHome is set to True (This will delete all files). ",
137 flags = Flags.Design))
138 cls._register_attribute(
140 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
141 flags = Flags.Design))
142 cls._register_attribute(
144 "Build commands to execute after deploying the sources. "
145 "Sources are uploaded to the ${SRC} directory and code "
146 "is uploaded to the ${APP_HOME} directory. \n"
147 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
148 "./configure && make && make clean.\n"
149 "Make sure to make the build commands return with a nonzero exit "
151 flags = Flags.Design))
152 cls._register_attribute(
154 "Commands to transfer built files to their final destinations. "
155 "Install commands are executed after build commands. ",
156 flags = Flags.Design))
157 cls._register_attribute(
158 Attribute("stdin", "Standard input for the 'command'",
159 flags = Flags.Design))
160 cls._register_attribute(
161 Attribute("tearDown",
162 "Command to be executed just before releasing the resource",
163 flags = Flags.Design))
166 def _register_traces(cls):
167 stdout = Trace("stdout", "Standard output stream", enabled = True)
168 stderr = Trace("stderr", "Standard error stream", enabled = True)
170 cls._register_trace(stdout)
171 cls._register_trace(stderr)
173 def __init__(self, ec, guid):
174 super(LinuxApplication, self).__init__(ec, guid)
178 self._home = "app-{}".format(self.guid)
180 # whether the command should run in foreground attached
182 self._in_foreground = False
184 # whether to use sudo to kill the application process
185 self._sudo_kill = False
187 # keep a reference to the running process handler when
188 # the command is not executed as remote daemon in background
191 # timestamp of last state check of the application
192 self._last_state_check = tnow()
194 def log_message(self, msg):
195 return " guid {} - host {} - {} "\
196 .format(self.guid, self.node.get("hostname"), msg)
201 node = self.get_connected(LinuxNode.get_rtype())
203 msg = "Application {} guid {} NOT connected to Node"\
204 .format(self._rtype, self.guid)
205 raise RuntimeError(msg)
213 return os.path.join(self.node.exp_home, self._home)
217 return os.path.join(self.app_home, self.ec.run_id)
228 def in_foreground(self):
230 Returns True if the command needs to be executed in foreground.
231 This means that command will be executed using 'execute' instead of
232 'run' ('run' executes a command in background and detached from the
235 When using X11 forwarding option, the command can not run in background
236 and detached from a terminal, since we need to keep the terminal attached
239 return self.get("forwardX11") or self._in_foreground
241 def trace_filepath(self, filename):
242 return os.path.join(self.run_home, filename)
244 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
245 self.info("Retrieving '{}' trace {} ".format(name, attr))
247 path = self.trace_filepath(name)
249 command = "(test -f {} && echo 'success') || echo 'error'".format(path)
250 (out, err), proc = self.node.execute(command)
252 if (err and proc.poll()) or out.find("error") != -1:
253 msg = " Couldn't find trace {} ".format(name)
254 self.error(msg, out, err)
257 if attr == TraceAttr.PATH:
260 if attr == TraceAttr.ALL:
261 (out, err), proc = self.node.check_output(self.run_home, name)
264 msg = " Couldn't read trace {} ".format(name)
265 self.error(msg, out, err)
270 if attr == TraceAttr.STREAM:
271 cmd = "dd if={} bs={} count=1 skip={}".format(path, block, offset)
272 elif attr == TraceAttr.SIZE:
273 cmd = "stat -c {} ".format(path)
275 (out, err), proc = self.node.execute(cmd)
278 msg = " Couldn't find trace {} ".format(name)
279 self.error(msg, out, err)
282 if attr == TraceAttr.SIZE:
283 out = int(out.strip())
287 def do_provision(self):
288 # take a snapshot of the system if user is root
289 # to ensure that cleanProcess will not kill
290 # pre-existent processes
291 if self.node.get("username") == 'root':
294 ps_aux = "ps aux | awk '{print $2,$11}'"
295 (out, err), proc = self.node.execute(ps_aux)
297 for line in out.strip().split("\n"):
298 parts = line.strip().split(" ")
299 procs[parts[0]] = parts[1]
300 with open("/tmp/save.proc", "wb") as pickle_file:
301 pickle.dump(procs, pickle_file)
303 # create run dir for application
304 self.node.mkdir(self.run_home)
306 # List of all the provision methods to invoke
313 self.upload_binaries,
315 self.upload_libraries,
320 # install dependencies
321 self.install_dependencies,
330 # Since provisioning takes a long time, before
331 # each step we check that the EC is still
334 self.debug("Interrupting provisioning. EC says 'ABORT")
341 # upload deploy script
342 deploy_command = ";".join(command)
343 self.execute_deploy_command(deploy_command)
345 # upload start script
346 self.upload_start_command()
348 self.info("Provisioning finished")
350 super(LinuxApplication, self).do_provision()
352 def upload_start_command(self, overwrite = False):
353 # Upload command to remote bash script
354 # - only if command can be executed in background and detached
355 command = self.get("command")
357 if command and not self.in_foreground:
358 # self.info("Uploading command '{}'".format(command))
360 # replace application specific paths in the command
361 command = self.replace_paths(command)
362 # replace application specific paths in the environment
363 env = self.get("env")
364 env = env and self.replace_paths(env)
366 shfile = os.path.join(self.app_home, "start.sh")
368 self.node.upload_command(command,
371 overwrite = overwrite)
373 def execute_deploy_command(self, command, prefix="deploy"):
375 # replace application specific paths in the command
376 command = self.replace_paths(command)
378 # replace application specific paths in the environment
379 env = self.get("env")
380 env = env and self.replace_paths(env)
382 # Upload the command to a bash script and run it
383 # in background ( but wait until the command has
384 # finished to continue )
385 shfile = os.path.join(self.app_home, "{}.sh".format(prefix))
386 self.node.run_and_wait(command, self.run_home,
389 pidfile = "{}_pidfile".format(prefix),
390 ecodefile = "{}_exitcode".format(prefix),
391 stdout = "{}_stdout".format(prefix),
392 stderr = "{}_stderr".format(prefix))
394 def upload_sources(self, sources = None, src_dir = None):
396 sources = self.get("sources")
401 src_dir = self.node.src_dir
404 self.info("Uploading sources ")
406 sources = [str.strip(source) for source in sources.split(";")]
408 # Separate sources that should be downloaded from
409 # the web, from sources that should be uploaded from
412 for source in list(sources):
413 if source.startswith("http") or source.startswith("https"):
414 # remove the hhtp source from the sources list
415 sources.remove(source)
419 # Check if the source already exists
420 " ls {src_dir}/{basename} "
422 # If source doesn't exist, download it and check
423 # that it it downloaded ok
424 " wget -c --directory-prefix={src_dir} {source} && "
425 " ls {src_dir}/{basename} "
427 basename = os.path.basename(source),
432 command = " && ".join(command)
434 # replace application specific paths in the command
435 command = self.replace_paths(command)
438 sources = ';'.join(sources)
439 self.node.upload(sources, src_dir, overwrite = False)
443 def upload_files(self, files = None):
445 files = self.get("files")
448 self.info("Uploading files {} ".format(files))
449 self.node.upload(files, self.node.share_dir, overwrite = False)
451 def upload_libraries(self, libs = None):
453 libs = self.get("libs")
456 self.info("Uploading libraries {} ".format(libs))
457 self.node.upload(libs, self.node.lib_dir, overwrite = False)
459 def upload_binaries(self, bins = None):
461 bins = self.get("bins")
464 self.info("Uploading binaries {} ".format(bins))
465 self.node.upload(bins, self.node.bin_dir, overwrite = False)
467 def upload_code(self, code = None):
469 code = self.get("code")
472 self.info("Uploading code")
473 dst = os.path.join(self.app_home, "code")
474 self.node.upload(code, dst, overwrite = False, text = True, executable = True)
476 def upload_stdin(self, stdin = None):
478 stdin = self.get("stdin")
481 # create dir for sources
482 self.info("Uploading stdin")
484 # upload stdin file to ${SHARE_DIR} directory
485 if os.path.isfile(stdin):
486 basename = os.path.basename(stdin)
487 dst = os.path.join(self.node.share_dir, basename)
489 dst = os.path.join(self.app_home, "stdin")
491 self.node.upload(stdin, dst, overwrite = False, text = True)
493 # create "stdin" symlink on ${APP_HOME} directory
494 command = "( cd {app_home} ; [ ! -f stdin ] && ln -s {stdin} stdin )"\
495 .format(app_home = self.app_home, stdin = dst)
498 def install_dependencies(self, depends = None):
500 depends = self.get("depends")
503 self.info("Installing dependencies {}".format(depends))
504 return self.node.install_packages_command(depends)
506 def build(self, build = None):
508 build = self.get("build")
511 self.info("Building sources ")
513 # replace application specific paths in the command
514 return self.replace_paths(build)
516 def install(self, install = None):
518 install = self.get("install")
521 self.info("Installing sources ")
523 # replace application specific paths in the command
524 return self.replace_paths(install)
527 # Wait until node is associated and deployed
529 if not node or node.state < ResourceState.READY:
530 self.debug("---- RESCHEDULING DEPLOY ---- node state {} ".format(self.node.state))
531 self.ec.schedule(self.reschedule_delay, self.deploy)
533 command = self.get("command") or ""
534 self.info("Deploying command '{}' ".format(command))
538 super(LinuxApplication, self).do_deploy()
541 command = self.get("command")
543 self.info("Starting command '{}'".format(command))
546 # If no command was given (i.e. Application was used for dependency
547 # installation), then the application is directly marked as STOPPED
548 super(LinuxApplication, self).set_stopped()
550 if self.in_foreground:
551 self._run_in_foreground()
553 self._run_in_background()
555 super(LinuxApplication, self).do_start()
557 def _run_in_foreground(self):
558 command = self.get("command")
559 sudo = self.get("sudo") or False
560 x11 = self.get("forwardX11")
561 env = self.get("env")
563 # Command will be launched in foreground and attached to the
564 # terminal using the node 'execute' in non blocking mode.
566 # We save the reference to the process in self._proc
567 # to be able to kill the process from the stop method.
568 # We also set blocking = False, since we don't want the
569 # thread to block until the execution finishes.
570 (out, err), self._proc = self.execute_command(command,
576 if self._proc.poll():
577 self.error(msg, out, err)
578 raise RuntimeError(msg)
580 def _run_in_background(self):
581 command = self.get("command")
582 env = self.get("env")
583 sudo = self.get("sudo") or False
587 stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
590 # Command will be run as a daemon in baground and detached from any
592 # The command to run was previously uploaded to a bash script
593 # during deployment, now we launch the remote script using 'run'
594 # method from the node.
595 cmd = "bash {}".format(os.path.join(self.app_home, "start.sh"))
596 (out, err), proc = self.node.run(cmd, self.run_home,
602 # check if execution errors occurred
603 msg = " Failed to start command '{}' ".format(command)
606 self.error(msg, out, err)
607 raise RuntimeError(msg)
609 # Wait for pid file to be generated
610 pid, ppid = self.node.wait_pid(self.run_home)
611 if pid: self._pid = int(pid)
612 if ppid: self._ppid = int(ppid)
614 # If the process is not running, check for error information
615 # on the remote machine
616 if not self.pid or not self.ppid:
617 (out, err), proc = self.node.check_errors(self.run_home,
620 # Out is what was written in the stderr file
622 msg = " Failed to start command '{}' ".format(command)
623 self.error(msg, out, err)
624 raise RuntimeError(msg)
627 """ Stops application execution
629 command = self.get('command') or ''
631 if self.state == ResourceState.STARTED:
633 self.info("Stopping command '{}' ".format(command))
635 # If the command is running in foreground (it was launched using
636 # the node 'execute' method), then we use the handler to the Popen
637 # process to kill it. Else we send a kill signal using the pid and ppid
638 # retrieved after running the command with the node 'run' method
642 # Only try to kill the process if the pid and ppid
644 if self.pid and self.ppid:
645 (out, err), proc = self.node.kill(self.pid, self.ppid,
646 sudo = self._sudo_kill)
649 # TODO: check if execution errors occurred
650 if (proc and proc.poll()) or err:
651 msg = " Failed to STOP command '{}' ".format(self.get("command"))
652 self.error(msg, out, err)
655 super(LinuxApplication, self).do_stop()
657 def do_release(self):
658 self.info("Releasing resource")
662 tear_down = self.get("tearDown")
664 self.node.execute(tear_down)
666 hard_release = self.get("hardRelease")
668 self.node.rmdir(self.app_home)
670 super(LinuxApplication, self).do_release()
674 """ Returns the state of the application
676 if self._state == ResourceState.STARTED:
677 if self.in_foreground:
678 # Check if the process we used to execute the command
679 # is still running ...
680 retcode = self._proc.poll()
682 # retcode == None -> running
683 # retcode > 0 -> error
684 # retcode == 0 -> finished
687 msg = " Failed to execute command '{}'".format(self.get("command"))
688 err = self._proc.stderr.read()
689 self.error(msg, out, err)
695 # We need to query the status of the command we launched in
696 # background. In order to avoid overwhelming the remote host and
697 # the local processor with too many ssh queries, the state is only
698 # requested every 'state_check_delay' seconds.
699 state_check_delay = 0.5
700 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
701 if self.pid and self.ppid:
702 # Make sure the process is still running in background
703 status = self.node.status(self.pid, self.ppid)
705 if status == ProcStatus.FINISHED:
706 # If the program finished, check if execution
709 = self.node.check_errors(self.run_home)
712 msg = "Failed to execute command '{}'"\
713 .format(self.get("command"))
714 self.error(msg, out, err)
719 self._last_state_check = tnow()
723 def execute_command(self, command,
732 environ = self.node.format_environment(env, inline = True)
733 command = environ + command
734 command = self.replace_paths(command)
736 return self.node.execute(command,
739 forward_x11 = forward_x11,
742 def replace_paths(self, command, node = None, app_home = None, run_home = None):
744 Replace all special path tags with shell-escaped actual paths.
750 app_home = self.app_home
753 run_home = self.run_home
756 .replace("${USR}", node.usr_dir)
757 .replace("${LIB}", node.lib_dir)
758 .replace("${BIN}", node.bin_dir)
759 .replace("${SRC}", node.src_dir)
760 .replace("${SHARE}", node.share_dir)
761 .replace("${EXP}", node.exp_dir)
762 .replace("${EXP_HOME}", node.exp_home)
763 .replace("${APP_HOME}", app_home)
764 .replace("${RUN_HOME}", run_home)
765 .replace("${NODE_HOME}", node.node_home)
766 .replace("${HOME}", node.home_dir)
767 # a shortcut to refer to the file uploaded as 'code = '
768 .replace("${CODE}", "{}/code".format(app_home))
771 def valid_connection(self, guid):