2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sshfuncs import ProcStatus
26 from nepi.util.timefuncs import tnow, tdiffsec
31 # TODO: Resolve wildcards in commands!!
32 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
35 class LinuxApplication(ResourceManager):
37 .. class:: Class Args :
39 :param ec: The Experiment controller
40 :type ec: ExperimentController
41 :param guid: guid of the RM
46 A LinuxApplication RM represents a process that can be executed in
47 a remote Linux host using SSH.
49 The LinuxApplication RM takes care of uploadin sources and any files
50 needed to run the experiment, to the remote host.
51 It also allows to provide source compilation (build) and installation
52 instructions, and takes care of automating the sources build and
53 installation tasks for the user.
55 It is important to note that files uploaded to the remote host have
56 two possible scopes: single-experiment or multi-experiment.
57 Single experiment files are those that will not be re-used by other
58 experiments. Multi-experiment files are those that will.
59 Sources and shared files are always made available to all experiments.
63 The directory structure used by LinuxApplication RM at the Linux
64 host is the following:
66 ${HOME}/nepi-usr --> Base directory for multi-experiment files
68 ${LIB} |- /lib --> Base directory for libraries
69 ${BIN} |- /bin --> Base directory for binary files
70 ${SRC} |- /src --> Base directory for sources
71 ${SHARE} |- /share --> Base directory for other files
73 ${HOME}/nepi-exp --> Base directory for single-experiment files
75 ${EXP_HOME} |- /<exp-id> --> Base directory for experiment exp-id
77 ${APP_HOME} |- /<app-guid> --> Base directory for application
78 | specific files (e.g. command.sh, input)
80 ${RUN_HOME} |- /<run-id> --> Base directory for run specific
84 _rtype = "LinuxApplication"
85 _help = "Runs an application on a Linux host with a BASH command "
86 _backend_type = "linux"
90 def _register_attributes(cls):
91 command = Attribute("command", "Command to execute at application start. "
92 "Note that commands will be executed in the ${RUN_HOME} directory, "
93 "make sure to take this into account when using relative paths. ",
94 flags = Flags.ExecReadOnly)
95 forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections",
96 flags = Flags.ExecReadOnly)
97 env = Attribute("env", "Environment variables string for command execution",
98 flags = Flags.ExecReadOnly)
99 sudo = Attribute("sudo", "Run with root privileges",
100 flags = Flags.ExecReadOnly)
101 depends = Attribute("depends",
102 "Space-separated list of packages required to run the application",
103 flags = Flags.ExecReadOnly)
104 sources = Attribute("sources",
105 "Space-separated list of regular files to be uploaded to ${SRC} "
106 "directory prior to building. Archives won't be expanded automatically. "
107 "Sources are globally available for all experiments unless "
108 "cleanHome is set to True (This will delete all sources). ",
109 flags = Flags.ExecReadOnly)
110 files = Attribute("files",
111 "Space-separated list of regular miscellaneous files to be uploaded "
112 "to ${SHARE} directory. "
113 "Files are globally available for all experiments unless "
114 "cleanHome is set to True (This will delete all files). ",
115 flags = Flags.ExecReadOnly)
116 libs = Attribute("libs",
117 "Space-separated list of libraries (e.g. .so files) to be uploaded "
118 "to ${LIB} directory. "
119 "Libraries are globally available for all experiments unless "
120 "cleanHome is set to True (This will delete all files). ",
121 flags = Flags.ExecReadOnly)
122 bins = Attribute("bins",
123 "Space-separated list of binary files to be uploaded "
124 "to ${BIN} directory. "
125 "Binaries are globally available for all experiments unless "
126 "cleanHome is set to True (This will delete all files). ",
127 flags = Flags.ExecReadOnly)
128 code = Attribute("code",
129 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
130 flags = Flags.ExecReadOnly)
131 build = Attribute("build",
132 "Build commands to execute after deploying the sources. "
133 "Sources are uploaded to the ${SRC} directory and code "
134 "is uploaded to the ${APP_HOME} directory. \n"
135 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
136 "./configure && make && make clean.\n"
137 "Make sure to make the build commands return with a nonzero exit "
139 flags = Flags.ReadOnly)
140 install = Attribute("install",
141 "Commands to transfer built files to their final destinations. "
142 "Install commands are executed after build commands. ",
143 flags = Flags.ReadOnly)
144 stdin = Attribute("stdin", "Standard input for the 'command'",
145 flags = Flags.ExecReadOnly)
146 tear_down = Attribute("tearDown", "Command to be executed just before "
147 "releasing the resource",
148 flags = Flags.ReadOnly)
150 cls._register_attribute(command)
151 cls._register_attribute(forward_x11)
152 cls._register_attribute(env)
153 cls._register_attribute(sudo)
154 cls._register_attribute(depends)
155 cls._register_attribute(sources)
156 cls._register_attribute(code)
157 cls._register_attribute(files)
158 cls._register_attribute(bins)
159 cls._register_attribute(libs)
160 cls._register_attribute(build)
161 cls._register_attribute(install)
162 cls._register_attribute(stdin)
163 cls._register_attribute(tear_down)
166 def _register_traces(cls):
167 stdout = Trace("stdout", "Standard output stream")
168 stderr = Trace("stderr", "Standard error stream")
170 cls._register_trace(stdout)
171 cls._register_trace(stderr)
173 def __init__(self, ec, guid):
174 super(LinuxApplication, self).__init__(ec, guid)
177 self._home = "app-%s" % self.guid
178 # whether the command should run in foreground attached
180 self._in_foreground = False
182 # whether to use sudo to kill the application process
183 self._sudo_kill = False
185 # keep a reference to the running process handler when
186 # the command is not executed as remote daemon in background
189 # timestamp of last state check of the application
190 self._last_state_check = tnow()
192 def log_message(self, msg):
193 return " guid %d - host %s - %s " % (self.guid,
194 self.node.get("hostname"), msg)
198 node = self.get_connected(LinuxNode.rtype())
199 if node: return node[0]
204 return os.path.join(self.node.exp_home, self._home)
208 return os.path.join(self.app_home, self.ec.run_id)
219 def in_foreground(self):
220 """ Returns True if the command needs to be executed in foreground.
221 This means that command will be executed using 'execute' instead of
222 'run' ('run' executes a command in background and detached from the
225 When using X11 forwarding option, the command can not run in background
226 and detached from a terminal, since we need to keep the terminal attached
229 return self.get("forwardX11") or self._in_foreground
231 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
232 self.info("Retrieving '%s' trace %s " % (name, attr))
234 path = os.path.join(self.run_home, name)
236 command = "(test -f %s && echo 'success') || echo 'error'" % path
237 (out, err), proc = self.node.execute(command)
239 if (err and proc.poll()) or out.find("error") != -1:
240 msg = " Couldn't find trace %s " % name
241 self.error(msg, out, err)
244 if attr == TraceAttr.PATH:
247 if attr == TraceAttr.ALL:
248 (out, err), proc = self.node.check_output(self.run_home, name)
251 msg = " Couldn't read trace %s " % name
252 self.error(msg, out, err)
257 if attr == TraceAttr.STREAM:
258 cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
259 elif attr == TraceAttr.SIZE:
260 cmd = "stat -c%%s %s " % path
262 (out, err), proc = self.node.execute(cmd)
265 msg = " Couldn't find trace %s " % name
266 self.error(msg, out, err)
269 if attr == TraceAttr.SIZE:
270 out = int(out.strip())
275 # create run dir for application
276 self.node.mkdir(self.run_home)
278 # List of all the provision methods to invoke
285 self.upload_binaries,
287 self.upload_libraries,
292 # install dependencies
293 self.install_dependencies,
301 # Since provisioning takes a long time, before
302 # each step we check that the EC is still
305 raise RuntimeError, "EC finished"
311 # upload deploy script
312 deploy_command = ";".join(command)
313 self.execute_deploy_command(deploy_command)
315 # upload start script
316 self.upload_start_command()
318 self.info("Provisioning finished")
320 super(LinuxApplication, self).provision()
322 def upload_start_command(self):
323 # Upload command to remote bash script
324 # - only if command can be executed in background and detached
325 command = self.get("command")
327 if command and not self.in_foreground:
328 self.info("Uploading command '%s'" % command)
330 # replace application specific paths in the command
331 command = self.replace_paths(command)
333 # replace application specific paths in the environment
334 env = self.get("env")
335 env = env and self.replace_paths(env)
337 shfile = os.path.join(self.app_home, "start.sh")
339 self.node.upload_command(command,
344 def execute_deploy_command(self, command):
346 # Upload the command to a bash script and run it
347 # in background ( but wait until the command has
348 # finished to continue )
349 shfile = os.path.join(self.app_home, "deploy.sh")
350 self.node.run_and_wait(command, self.run_home,
353 pidfile = "deploy_pidfile",
354 ecodefile = "deploy_exitcode",
355 stdout = "deploy_stdout",
356 stderr = "deploy_stderr")
358 def upload_sources(self):
359 sources = self.get("sources")
364 self.info("Uploading sources ")
366 sources = sources.split(' ')
368 # Separate sources that should be downloaded from
369 # the web, from sources that should be uploaded from
372 for source in list(sources):
373 if source.startswith("http") or source.startswith("https"):
374 # remove the hhtp source from the sources list
375 sources.remove(source)
377 command.append( " ( "
378 # Check if the source already exists
379 " ls ${SRC}/%(basename)s "
381 # If source doesn't exist, download it and check
382 # that it it downloaded ok
383 " wget -c --directory-prefix=${SRC} %(source)s && "
384 " ls ${SRC}/%(basename)s "
386 "basename": os.path.basename(source),
390 command = " && ".join(command)
392 # replace application specific paths in the command
393 command = self.replace_paths(command)
396 sources = ' '.join(sources)
397 self.node.upload(sources, self.node.src_dir, overwrite = False)
401 def upload_files(self):
402 files = self.get("files")
405 self.info("Uploading files %s " % files)
406 self.node.upload(files, self.node.share_dir, overwrite = False)
408 def upload_libraries(self):
409 libs = self.get("libs")
412 self.info("Uploading libraries %s " % libaries)
413 self.node.upload(libs, self.node.lib_dir, overwrite = False)
415 def upload_binaries(self):
416 bins = self.get("bins")
419 self.info("Uploading binaries %s " % binaries)
420 self.node.upload(bins, self.node.bin_dir, overwrite = False)
422 def upload_code(self):
423 code = self.get("code")
426 self.info("Uploading code")
428 dst = os.path.join(self.app_home, "code")
429 self.node.upload(code, dst, overwrite = False, text = True)
431 def upload_stdin(self):
432 stdin = self.get("stdin")
434 # create dir for sources
435 self.info("Uploading stdin")
437 # upload stdin file to ${SHARE_DIR} directory
438 basename = os.path.basename(stdin)
439 dst = os.path.join(self.node.share_dir, basename)
440 self.node.upload(stdin, dst, overwrite = False, text = True)
442 # create "stdin" symlink on ${APP_HOME} directory
443 command = "( cd %(app_home)s ; [ ! -f stdin ] && ln -s %(stdin)s stdin )" % ({
444 "app_home": self.app_home,
449 def install_dependencies(self):
450 depends = self.get("depends")
452 self.info("Installing dependencies %s" % depends)
453 return self.node.install_packages_command(depends)
456 build = self.get("build")
459 self.info("Building sources ")
461 # replace application specific paths in the command
462 return self.replace_paths(build)
465 install = self.get("install")
468 self.info("Installing sources ")
470 # replace application specific paths in the command
471 return self.replace_paths(install)
474 # Wait until node is associated and deployed
476 if not node or node.state < ResourceState.READY:
477 self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
478 self.ec.schedule(reschedule_delay, self.deploy)
481 command = self.get("command") or ""
482 self.info("Deploying command '%s' " % command)
489 super(LinuxApplication, self).deploy()
492 command = self.get("command")
494 self.info("Starting command '%s'" % command)
497 # If no command was given (i.e. Application was used for dependency
498 # installation), then the application is directly marked as FINISHED
503 if self.in_foreground:
504 self._run_in_foreground()
506 self._run_in_background()
511 super(LinuxApplication, self).start()
513 def _run_in_foreground(self):
514 command = self.get("command")
515 sudo = self.get("sudo") or False
516 x11 = self.get("forwardX11")
518 # For a command being executed in foreground, if there is stdin,
519 # it is expected to be text string not a file or pipe
520 stdin = self.get("stdin") or None
522 # Command will be launched in foreground and attached to the
523 # terminal using the node 'execute' in non blocking mode.
525 # We save the reference to the process in self._proc
526 # to be able to kill the process from the stop method.
527 # We also set blocking = False, since we don't want the
528 # thread to block until the execution finishes.
529 (out, err), self._proc = self.execute_command(self, command,
536 if self._proc.poll():
537 self.error(msg, out, err)
538 raise RuntimeError, msg
540 def _run_in_background(self):
541 command = self.get("command")
542 env = self.get("env")
543 sudo = self.get("sudo") or False
547 stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
550 # Command will be run as a daemon in baground and detached from any
552 # The command to run was previously uploaded to a bash script
553 # during deployment, now we launch the remote script using 'run'
554 # method from the node.
555 cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
556 (out, err), proc = self.node.run(cmd, self.run_home,
562 # check if execution errors occurred
563 msg = " Failed to start command '%s' " % command
566 self.error(msg, out, err)
567 raise RuntimeError, msg
569 # Wait for pid file to be generated
570 pid, ppid = self.node.wait_pid(self.run_home)
571 if pid: self._pid = int(pid)
572 if ppid: self._ppid = int(ppid)
574 # If the process is not running, check for error information
575 # on the remote machine
576 if not self.pid or not self.ppid:
577 (out, err), proc = self.node.check_errors(self.run_home,
580 # Out is what was written in the stderr file
582 msg = " Failed to start command '%s' " % command
583 self.error(msg, out, err)
584 raise RuntimeError, msg
587 """ Stops application execution
589 command = self.get('command') or ''
591 if self.state == ResourceState.STARTED:
593 self.info("Stopping command '%s' " % command)
595 # If the command is running in foreground (it was launched using
596 # the node 'execute' method), then we use the handler to the Popen
597 # process to kill it. Else we send a kill signal using the pid and ppid
598 # retrieved after running the command with the node 'run' method
602 # Only try to kill the process if the pid and ppid
604 if self.pid and self.ppid:
605 (out, err), proc = self.node.kill(self.pid, self.ppid,
606 sudo = self._sudo_kill)
608 # TODO: check if execution errors occurred
609 if proc.poll() or err:
610 msg = " Failed to STOP command '%s' " % self.get("command")
611 self.error(msg, out, err)
615 super(LinuxApplication, self).stop()
618 self.info("Releasing resource")
620 tear_down = self.get("tearDown")
622 self.node.execute(tear_down)
626 super(LinuxApplication, self).release()
630 """ Returns the state of the application
632 if self._state == ResourceState.STARTED:
633 if self.in_foreground:
634 # Check if the process we used to execute the command
635 # is still running ...
636 retcode = self._proc.poll()
638 # retcode == None -> running
639 # retcode > 0 -> error
640 # retcode == 0 -> finished
643 msg = " Failed to execute command '%s'" % self.get("command")
644 err = self._proc.stderr.read()
645 self.error(msg, out, err)
651 # We need to query the status of the command we launched in
652 # background. In order to avoid overwhelming the remote host and
653 # the local processor with too many ssh queries, the state is only
654 # requested every 'state_check_delay' seconds.
655 state_check_delay = 0.5
656 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
657 if self.pid and self.ppid:
658 # Make sure the process is still running in background
659 status = self.node.status(self.pid, self.ppid)
661 if status == ProcStatus.FINISHED:
662 # If the program finished, check if execution
664 (out, err), proc = self.node.check_errors(
668 msg = "Failed to execute command '%s'" % \
670 self.error(msg, out, err)
675 self._last_state_check = tnow()
679 def execute_command(self, command,
688 environ = self.node.format_environment(env, inline = True)
689 command = environ + command
690 command = self.replace_paths(command)
692 return self.node.execute(command,
695 forward_x11 = forward_x11,
698 def replace_paths(self, command):
700 Replace all special path tags with shell-escaped actual paths.
703 .replace("${USR}", self.node.usr_dir)
704 .replace("${LIB}", self.node.lib_dir)
705 .replace("${BIN}", self.node.bin_dir)
706 .replace("${SRC}", self.node.src_dir)
707 .replace("${SHARE}", self.node.share_dir)
708 .replace("${EXP}", self.node.exp_dir)
709 .replace("${EXP_HOME}", self.node.exp_home)
710 .replace("${APP_HOME}", self.app_home)
711 .replace("${RUN_HOME}", self.run_home)
712 .replace("${NODE_HOME}", self.node.node_home)
713 .replace("${HOME}", self.node.home_dir)
716 def valid_connection(self, guid):