2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23 ResourceState, reschedule_delay
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sshfuncs import ProcStatus
26 from nepi.util.timefuncs import tnow, tdiffsec
31 # TODO: Resolve wildcards in commands!!
32 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
35 class LinuxApplication(ResourceManager):
37 .. class:: Class Args :
39 :param ec: The Experiment controller
40 :type ec: ExperimentController
41 :param guid: guid of the RM
46 A LinuxApplication RM represents a process that can be executed in
47 a remote Linux host using SSH.
49 The LinuxApplication RM takes care of uploadin sources and any files
50 needed to run the experiment, to the remote host.
51 It also allows to provide source compilation (build) and installation
52 instructions, and takes care of automating the sources build and
53 installation tasks for the user.
55 It is important to note that files uploaded to the remote host have
56 two possible scopes: single-experiment or multi-experiment.
57 Single experiment files are those that will not be re-used by other
58 experiments. Multi-experiment files are those that will.
59 Sources and shared files are always made available to all experiments.
63 The directory structure used by LinuxApplication RM at the Linux
64 host is the following:
66 ${HOME}/.nepi/nepi-usr --> Base directory for multi-experiment files
68 ${LIB} |- /lib --> Base directory for libraries
69 ${BIN} |- /bin --> Base directory for binary files
70 ${SRC} |- /src --> Base directory for sources
71 ${SHARE} |- /share --> Base directory for other files
73 ${HOME}/.nepi/nepi-exp --> Base directory for single-experiment files
75 ${EXP_HOME} |- /<exp-id> --> Base directory for experiment exp-id
77 ${APP_HOME} |- /<app-guid> --> Base directory for application
78 | specific files (e.g. command.sh, input)
80 ${RUN_HOME} |- /<run-id> --> Base directory for run specific
84 _rtype = "LinuxApplication"
85 _help = "Runs an application on a Linux host with a BASH command "
86 _backend_type = "linux"
89 def _register_attributes(cls):
90 command = Attribute("command", "Command to execute at application start. "
91 "Note that commands will be executed in the ${RUN_HOME} directory, "
92 "make sure to take this into account when using relative paths. ",
94 forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections",
96 env = Attribute("env", "Environment variables string for command execution",
98 sudo = Attribute("sudo", "Run with root privileges",
100 depends = Attribute("depends",
101 "Space-separated list of packages required to run the application",
102 flags = Flags.Design)
103 sources = Attribute("sources",
104 "semi-colon separated list of regular files to be uploaded to ${SRC} "
105 "directory prior to building. Archives won't be expanded automatically. "
106 "Sources are globally available for all experiments unless "
107 "cleanHome is set to True (This will delete all sources). ",
108 flags = Flags.Design)
109 files = Attribute("files",
110 "semi-colon separated list of regular miscellaneous files to be uploaded "
111 "to ${SHARE} directory. "
112 "Files are globally available for all experiments unless "
113 "cleanHome is set to True (This will delete all files). ",
114 flags = Flags.Design)
115 libs = Attribute("libs",
116 "semi-colon separated list of libraries (e.g. .so files) to be uploaded "
117 "to ${LIB} directory. "
118 "Libraries are globally available for all experiments unless "
119 "cleanHome is set to True (This will delete all files). ",
120 flags = Flags.Design)
121 bins = Attribute("bins",
122 "semi-colon separated list of binary files to be uploaded "
123 "to ${BIN} directory. "
124 "Binaries are globally available for all experiments unless "
125 "cleanHome is set to True (This will delete all files). ",
126 flags = Flags.Design)
127 code = Attribute("code",
128 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
129 flags = Flags.Design)
130 build = Attribute("build",
131 "Build commands to execute after deploying the sources. "
132 "Sources are uploaded to the ${SRC} directory and code "
133 "is uploaded to the ${APP_HOME} directory. \n"
134 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
135 "./configure && make && make clean.\n"
136 "Make sure to make the build commands return with a nonzero exit "
138 flags = Flags.Design)
139 install = Attribute("install",
140 "Commands to transfer built files to their final destinations. "
141 "Install commands are executed after build commands. ",
142 flags = Flags.Design)
143 stdin = Attribute("stdin", "Standard input for the 'command'",
144 flags = Flags.Design)
145 tear_down = Attribute("tearDown", "Command to be executed just before "
146 "releasing the resource",
147 flags = Flags.Design)
149 cls._register_attribute(command)
150 cls._register_attribute(forward_x11)
151 cls._register_attribute(env)
152 cls._register_attribute(sudo)
153 cls._register_attribute(depends)
154 cls._register_attribute(sources)
155 cls._register_attribute(code)
156 cls._register_attribute(files)
157 cls._register_attribute(bins)
158 cls._register_attribute(libs)
159 cls._register_attribute(build)
160 cls._register_attribute(install)
161 cls._register_attribute(stdin)
162 cls._register_attribute(tear_down)
165 def _register_traces(cls):
166 stdout = Trace("stdout", "Standard output stream", enabled = True)
167 stderr = Trace("stderr", "Standard error stream", enabled = True)
169 cls._register_trace(stdout)
170 cls._register_trace(stderr)
172 def __init__(self, ec, guid):
173 super(LinuxApplication, self).__init__(ec, guid)
176 self._home = "app-%s" % self.guid
177 # whether the command should run in foreground attached
179 self._in_foreground = False
181 # whether to use sudo to kill the application process
182 self._sudo_kill = False
184 # keep a reference to the running process handler when
185 # the command is not executed as remote daemon in background
188 # timestamp of last state check of the application
189 self._last_state_check = tnow()
191 def log_message(self, msg):
192 return " guid %d - host %s - %s " % (self.guid,
193 self.node.get("hostname"), msg)
197 node = self.get_connected(LinuxNode.get_rtype())
198 if node: return node[0]
199 raise RuntimeError, "Application must be connected to Node"
203 return os.path.join(self.node.exp_home, self._home)
207 return os.path.join(self.app_home, self.ec.run_id)
218 def in_foreground(self):
219 """ Returns True if the command needs to be executed in foreground.
220 This means that command will be executed using 'execute' instead of
221 'run' ('run' executes a command in background and detached from the
224 When using X11 forwarding option, the command can not run in background
225 and detached from a terminal, since we need to keep the terminal attached
228 return self.get("forwardX11") or self._in_foreground
230 def trace_filepath(self, filename):
231 return os.path.join(self.run_home, filename)
233 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
234 self.info("Retrieving '%s' trace %s " % (name, attr))
236 path = self.trace_filepath(name)
238 command = "(test -f %s && echo 'success') || echo 'error'" % path
239 (out, err), proc = self.node.execute(command)
241 if (err and proc.poll()) or out.find("error") != -1:
242 msg = " Couldn't find trace %s " % name
243 self.error(msg, out, err)
246 if attr == TraceAttr.PATH:
249 if attr == TraceAttr.ALL:
250 (out, err), proc = self.node.check_output(self.run_home, name)
253 msg = " Couldn't read trace %s " % name
254 self.error(msg, out, err)
259 if attr == TraceAttr.STREAM:
260 cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
261 elif attr == TraceAttr.SIZE:
262 cmd = "stat -c%%s %s " % path
264 (out, err), proc = self.node.execute(cmd)
267 msg = " Couldn't find trace %s " % name
268 self.error(msg, out, err)
271 if attr == TraceAttr.SIZE:
272 out = int(out.strip())
276 def do_provision(self):
277 # take a snapshot of the system if user is root
278 # to ensure that cleanProcess will not kill
279 # pre-existent processes
280 if self.node.get("username") == 'root':
283 ps_aux = "ps aux |awk '{print $2,$11}'"
284 (out, err), proc = self.node.execute(ps_aux)
286 for line in out.strip().split("\n"):
287 parts = line.strip().split(" ")
288 procs[parts[0]] = parts[1]
289 pickle.dump(procs, open("/tmp/save.proc", "wb"))
291 # create run dir for application
292 self.node.mkdir(self.run_home)
294 # List of all the provision methods to invoke
301 self.upload_binaries,
303 self.upload_libraries,
308 # install dependencies
309 self.install_dependencies,
317 # Since provisioning takes a long time, before
318 # each step we check that the EC is still
321 self.debug("Interrupting provisioning. EC says 'ABORT")
328 # upload deploy script
329 deploy_command = ";".join(command)
330 self.execute_deploy_command(deploy_command)
332 # upload start script
333 self.upload_start_command()
335 self.info("Provisioning finished")
337 super(LinuxApplication, self).do_provision()
339 def upload_start_command(self, overwrite = False):
340 # Upload command to remote bash script
341 # - only if command can be executed in background and detached
342 command = self.get("command")
344 if command and not self.in_foreground:
345 self.info("Uploading command '%s'" % command)
347 # replace application specific paths in the command
348 command = self.replace_paths(command)
350 # replace application specific paths in the environment
351 env = self.get("env")
352 env = env and self.replace_paths(env)
354 shfile = os.path.join(self.app_home, "start.sh")
356 self.node.upload_command(command,
359 overwrite = overwrite)
361 def execute_deploy_command(self, command, prefix="deploy"):
363 # replace application specific paths in the command
364 command = self.replace_paths(command)
366 # replace application specific paths in the environment
367 env = self.get("env")
368 env = env and self.replace_paths(env)
370 # Upload the command to a bash script and run it
371 # in background ( but wait until the command has
372 # finished to continue )
373 shfile = os.path.join(self.app_home, "%s.sh" % prefix)
374 self.node.run_and_wait(command, self.run_home,
377 pidfile = "%s_pidfile" % prefix,
378 ecodefile = "%s_exitcode" % prefix,
379 stdout = "%s_stdout" % prefix,
380 stderr = "%s_stderr" % prefix)
382 def upload_sources(self, sources = None, src_dir = None):
384 sources = self.get("sources")
389 src_dir = self.node.src_dir
392 self.info("Uploading sources ")
394 sources = map(str.strip, sources.split(";"))
396 # Separate sources that should be downloaded from
397 # the web, from sources that should be uploaded from
400 for source in list(sources):
401 if source.startswith("http") or source.startswith("https"):
402 # remove the hhtp source from the sources list
403 sources.remove(source)
405 command.append( " ( "
406 # Check if the source already exists
407 " ls %(src_dir)s/%(basename)s "
409 # If source doesn't exist, download it and check
410 # that it it downloaded ok
411 " wget -c --directory-prefix=%(src_dir)s %(source)s && "
412 " ls %(src_dir)s/%(basename)s "
414 "basename": os.path.basename(source),
419 command = " && ".join(command)
421 # replace application specific paths in the command
422 command = self.replace_paths(command)
425 sources = ';'.join(sources)
426 self.node.upload(sources, src_dir, overwrite = False)
430 def upload_files(self, files = None):
432 files = self.get("files")
435 self.info("Uploading files %s " % files)
436 self.node.upload(files, self.node.share_dir, overwrite = False)
438 def upload_libraries(self, libs = None):
440 libs = self.get("libs")
443 self.info("Uploading libraries %s " % libaries)
444 self.node.upload(libs, self.node.lib_dir, overwrite = False)
446 def upload_binaries(self, bins = None):
448 bins = self.get("bins")
451 self.info("Uploading binaries %s " % binaries)
452 self.node.upload(bins, self.node.bin_dir, overwrite = False)
454 def upload_code(self, code = None):
456 code = self.get("code")
459 self.info("Uploading code")
461 dst = os.path.join(self.app_home, "code")
462 self.node.upload(code, dst, overwrite = False, text = True)
464 def upload_stdin(self, stdin = None):
466 stdin = self.get("stdin")
469 # create dir for sources
470 self.info("Uploading stdin")
472 # upload stdin file to ${SHARE_DIR} directory
473 if os.path.isfile(stdin):
474 basename = os.path.basename(stdin)
475 dst = os.path.join(self.node.share_dir, basename)
477 dst = os.path.join(self.app_home, "stdin")
479 self.node.upload(stdin, dst, overwrite = False, text = True)
481 # create "stdin" symlink on ${APP_HOME} directory
482 command = "( cd %(app_home)s ; [ ! -f stdin ] && ln -s %(stdin)s stdin )" % ({
483 "app_home": self.app_home,
488 def install_dependencies(self, depends = None):
490 depends = self.get("depends")
493 self.info("Installing dependencies %s" % depends)
494 return self.node.install_packages_command(depends)
496 def build(self, build = None):
498 build = self.get("build")
501 self.info("Building sources ")
503 # replace application specific paths in the command
504 return self.replace_paths(build)
506 def install(self, install = None):
508 install = self.get("install")
511 self.info("Installing sources ")
513 # replace application specific paths in the command
514 return self.replace_paths(install)
517 # Wait until node is associated and deployed
519 if not node or node.state < ResourceState.READY:
520 self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
521 self.ec.schedule(reschedule_delay, self.deploy)
523 command = self.get("command") or ""
524 self.info("Deploying command '%s' " % command)
528 super(LinuxApplication, self).do_deploy()
531 command = self.get("command")
533 self.info("Starting command '%s'" % command)
536 # If no command was given (i.e. Application was used for dependency
537 # installation), then the application is directly marked as STOPPED
538 super(LinuxApplication, self).set_stopped()
540 if self.in_foreground:
541 self._run_in_foreground()
543 self._run_in_background()
545 super(LinuxApplication, self).do_start()
547 def _run_in_foreground(self):
548 command = self.get("command")
549 sudo = self.get("sudo") or False
550 x11 = self.get("forwardX11")
551 env = self.get("env")
553 # Command will be launched in foreground and attached to the
554 # terminal using the node 'execute' in non blocking mode.
556 # We save the reference to the process in self._proc
557 # to be able to kill the process from the stop method.
558 # We also set blocking = False, since we don't want the
559 # thread to block until the execution finishes.
560 (out, err), self._proc = self.execute_command(command,
566 if self._proc.poll():
567 self.error(msg, out, err)
568 raise RuntimeError, msg
570 def _run_in_background(self):
571 command = self.get("command")
572 env = self.get("env")
573 sudo = self.get("sudo") or False
577 stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
580 # Command will be run as a daemon in baground and detached from any
582 # The command to run was previously uploaded to a bash script
583 # during deployment, now we launch the remote script using 'run'
584 # method from the node.
585 cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
586 (out, err), proc = self.node.run(cmd, self.run_home,
592 # check if execution errors occurred
593 msg = " Failed to start command '%s' " % command
596 self.error(msg, out, err)
597 raise RuntimeError, msg
599 # Wait for pid file to be generated
600 pid, ppid = self.node.wait_pid(self.run_home)
601 if pid: self._pid = int(pid)
602 if ppid: self._ppid = int(ppid)
604 # If the process is not running, check for error information
605 # on the remote machine
606 if not self.pid or not self.ppid:
607 (out, err), proc = self.node.check_errors(self.run_home,
610 # Out is what was written in the stderr file
612 msg = " Failed to start command '%s' " % command
613 self.error(msg, out, err)
614 raise RuntimeError, msg
617 """ Stops application execution
619 command = self.get('command') or ''
621 if self.state == ResourceState.STARTED:
623 self.info("Stopping command '%s' " % command)
625 # If the command is running in foreground (it was launched using
626 # the node 'execute' method), then we use the handler to the Popen
627 # process to kill it. Else we send a kill signal using the pid and ppid
628 # retrieved after running the command with the node 'run' method
632 # Only try to kill the process if the pid and ppid
634 if self.pid and self.ppid:
635 (out, err), proc = self.node.kill(self.pid, self.ppid,
636 sudo = self._sudo_kill)
638 # TODO: check if execution errors occurred
639 if (proc and proc.poll()) or err:
640 msg = " Failed to STOP command '%s' " % self.get("command")
641 self.error(msg, out, err)
643 super(LinuxApplication, self).do_stop()
645 def do_release(self):
646 self.info("Releasing resource")
650 tear_down = self.get("tearDown")
652 self.node.execute(tear_down)
654 hard_release = self.get("hardRelease")
656 self.node.rmdir(self.app_home)
658 super(LinuxApplication, self).do_release()
662 """ Returns the state of the application
664 if self._state == ResourceState.STARTED:
665 if self.in_foreground:
666 # Check if the process we used to execute the command
667 # is still running ...
668 retcode = self._proc.poll()
670 # retcode == None -> running
671 # retcode > 0 -> error
672 # retcode == 0 -> finished
675 msg = " Failed to execute command '%s'" % self.get("command")
676 err = self._proc.stderr.read()
677 self.error(msg, out, err)
683 # We need to query the status of the command we launched in
684 # background. In order to avoid overwhelming the remote host and
685 # the local processor with too many ssh queries, the state is only
686 # requested every 'state_check_delay' seconds.
687 state_check_delay = 0.5
688 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
689 if self.pid and self.ppid:
690 # Make sure the process is still running in background
691 status = self.node.status(self.pid, self.ppid)
693 if status == ProcStatus.FINISHED:
694 # If the program finished, check if execution
696 (out, err), proc = self.node.check_errors(
700 msg = "Failed to execute command '%s'" % \
702 self.error(msg, out, err)
707 self._last_state_check = tnow()
711 def execute_command(self, command,
720 environ = self.node.format_environment(env, inline = True)
721 command = environ + command
722 command = self.replace_paths(command)
724 return self.node.execute(command,
727 forward_x11 = forward_x11,
730 def replace_paths(self, command):
732 Replace all special path tags with shell-escaped actual paths.
735 .replace("${USR}", self.node.usr_dir)
736 .replace("${LIB}", self.node.lib_dir)
737 .replace("${BIN}", self.node.bin_dir)
738 .replace("${SRC}", self.node.src_dir)
739 .replace("${SHARE}", self.node.share_dir)
740 .replace("${EXP}", self.node.exp_dir)
741 .replace("${EXP_HOME}", self.node.exp_home)
742 .replace("${APP_HOME}", self.app_home)
743 .replace("${RUN_HOME}", self.run_home)
744 .replace("${NODE_HOME}", self.node.node_home)
745 .replace("${HOME}", self.node.home_dir)
748 def valid_connection(self, guid):