2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sshfuncs import ProcStatus
26 from nepi.util.timefuncs import tnow, tdiffsec
31 # TODO: Resolve wildcards in commands!!
32 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
35 class LinuxApplication(ResourceManager):
37 .. class:: Class Args :
39 :param ec: The Experiment controller
40 :type ec: ExperimentController
41 :param guid: guid of the RM
46 A LinuxApplication RM represents a process that can be executed in
47 a remote Linux host using SSH.
49 The LinuxApplication RM takes care of uploadin sources and any files
50 needed to run the experiment, to the remote host.
51 It also allows to provide source compilation (build) and installation
52 instructions, and takes care of automating the sources build and
53 installation tasks for the user.
55 It is important to note that files uploaded to the remote host have
56 two possible scopes: single-experiment or multi-experiment.
57 Single experiment files are those that will not be re-used by other
58 experiments. Multi-experiment files are those that will.
59 Sources and shared files are always made available to all experiments.
63 The directory structure used by LinuxApplication RM at the Linux
64 host is the following:
66 ${HOME}/nepi-usr --> Base directory for multi-experiment files
68 ${LIB} |- /lib --> Base directory for libraries
69 ${BIN} |- /bin --> Base directory for binary files
70 ${SRC} |- /src --> Base directory for sources
71 ${SHARE} |- /share --> Base directory for other files
73 ${HOME}/nepi-exp --> Base directory for single-experiment files
75 ${EXP_HOME} |- /<exp-id> --> Base directory for experiment exp-id
77 ${APP_HOME} |- /<app-guid> --> Base directory for application
78 | specific files (e.g. command.sh, input)
80 ${RUN_HOME} |- /<run-id> --> Base directory for run specific
84 _rtype = "LinuxApplication"
87 def _register_attributes(cls):
88 command = Attribute("command", "Command to execute at application start. "
89 "Note that commands will be executed in the ${RUN_HOME} directory, "
90 "make sure to take this into account when using relative paths. ",
91 flags = Flags.ExecReadOnly)
92 forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections",
93 flags = Flags.ExecReadOnly)
94 env = Attribute("env", "Environment variables string for command execution",
95 flags = Flags.ExecReadOnly)
96 sudo = Attribute("sudo", "Run with root privileges",
97 flags = Flags.ExecReadOnly)
98 depends = Attribute("depends",
99 "Space-separated list of packages required to run the application",
100 flags = Flags.ExecReadOnly)
101 sources = Attribute("sources",
102 "Space-separated list of regular files to be uploaded to ${SRC} "
103 "directory prior to building. Archives won't be expanded automatically. "
104 "Sources are globally available for all experiments unless "
105 "cleanHome is set to True (This will delete all sources). ",
106 flags = Flags.ExecReadOnly)
107 files = Attribute("files",
108 "Space-separated list of regular miscellaneous files to be uploaded "
109 "to ${SHARE} directory. "
110 "Files are globally available for all experiments unless "
111 "cleanHome is set to True (This will delete all files). ",
112 flags = Flags.ExecReadOnly)
113 libs = Attribute("libs",
114 "Space-separated list of libraries (e.g. .so files) to be uploaded "
115 "to ${LIB} directory. "
116 "Libraries are globally available for all experiments unless "
117 "cleanHome is set to True (This will delete all files). ",
118 flags = Flags.ExecReadOnly)
119 bins = Attribute("bins",
120 "Space-separated list of binary files to be uploaded "
121 "to ${BIN} directory. "
122 "Binaries are globally available for all experiments unless "
123 "cleanHome is set to True (This will delete all files). ",
124 flags = Flags.ExecReadOnly)
125 code = Attribute("code",
126 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
127 flags = Flags.ExecReadOnly)
128 build = Attribute("build",
129 "Build commands to execute after deploying the sources. "
130 "Sources are uploaded to the ${SRC} directory and code "
131 "is uploaded to the ${APP_HOME} directory. \n"
132 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
133 "./configure && make && make clean.\n"
134 "Make sure to make the build commands return with a nonzero exit "
136 flags = Flags.ReadOnly)
137 install = Attribute("install",
138 "Commands to transfer built files to their final destinations. "
139 "Install commands are executed after build commands. ",
140 flags = Flags.ReadOnly)
141 stdin = Attribute("stdin", "Standard input for the 'command'",
142 flags = Flags.ExecReadOnly)
143 tear_down = Attribute("tearDown", "Command to be executed just before "
144 "releasing the resource",
145 flags = Flags.ReadOnly)
147 cls._register_attribute(command)
148 cls._register_attribute(forward_x11)
149 cls._register_attribute(env)
150 cls._register_attribute(sudo)
151 cls._register_attribute(depends)
152 cls._register_attribute(sources)
153 cls._register_attribute(code)
154 cls._register_attribute(files)
155 cls._register_attribute(bins)
156 cls._register_attribute(libs)
157 cls._register_attribute(build)
158 cls._register_attribute(install)
159 cls._register_attribute(stdin)
160 cls._register_attribute(tear_down)
163 def _register_traces(cls):
164 stdout = Trace("stdout", "Standard output stream")
165 stderr = Trace("stderr", "Standard error stream")
167 cls._register_trace(stdout)
168 cls._register_trace(stderr)
170 def __init__(self, ec, guid):
171 super(LinuxApplication, self).__init__(ec, guid)
174 self._home = "app-%s" % self.guid
175 # whether the command should run in foreground attached
177 self._in_foreground = False
179 # whether to use sudo to kill the application process
180 self._sudo_kill = False
182 # keep a reference to the running process handler when
183 # the command is not executed as remote daemon in background
186 # timestamp of last state check of the application
187 self._last_state_check = tnow()
189 def log_message(self, msg):
190 return " guid %d - host %s - %s " % (self.guid,
191 self.node.get("hostname"), msg)
195 node = self.get_connected(LinuxNode.rtype())
196 if node: return node[0]
201 return os.path.join(self.node.exp_home, self._home)
205 return os.path.join(self.app_home, self.ec.run_id)
216 def in_foreground(self):
217 """ Returns True if the command needs to be executed in foreground.
218 This means that command will be executed using 'execute' instead of
219 'run' ('run' executes a command in background and detached from the
222 When using X11 forwarding option, the command can not run in background
223 and detached from a terminal, since we need to keep the terminal attached
226 return self.get("forwardX11") or self._in_foreground
228 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
229 self.info("Retrieving '%s' trace %s " % (name, attr))
231 path = os.path.join(self.run_home, name)
233 command = "(test -f %s && echo 'success') || echo 'error'" % path
234 (out, err), proc = self.node.execute(command)
236 if (err and proc.poll()) or out.find("error") != -1:
237 msg = " Couldn't find trace %s " % name
238 self.error(msg, out, err)
241 if attr == TraceAttr.PATH:
244 if attr == TraceAttr.ALL:
245 (out, err), proc = self.node.check_output(self.run_home, name)
248 msg = " Couldn't read trace %s " % name
249 self.error(msg, out, err)
254 if attr == TraceAttr.STREAM:
255 cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
256 elif attr == TraceAttr.SIZE:
257 cmd = "stat -c%%s %s " % path
259 (out, err), proc = self.node.execute(cmd)
262 msg = " Couldn't find trace %s " % name
263 self.error(msg, out, err)
266 if attr == TraceAttr.SIZE:
267 out = int(out.strip())
272 # create run dir for application
273 self.node.mkdir(self.run_home)
275 # List of all the provision methods to invoke
282 self.upload_binaries,
284 self.upload_libraries,
289 # install dependencies
290 self.install_dependencies,
298 # Since provisioning takes a long time, before
299 # each step we check that the EC is still
302 raise RuntimeError, "EC finished"
308 # upload deploy script
309 deploy_command = ";".join(command)
310 self.execute_deploy_command(deploy_command)
312 # upload start script
313 self.upload_start_command()
315 self.info("Provisioning finished")
317 super(LinuxApplication, self).provision()
319 def upload_start_command(self):
320 # Upload command to remote bash script
321 # - only if command can be executed in background and detached
322 command = self.get("command")
324 if command and not self.in_foreground:
325 self.info("Uploading command '%s'" % command)
327 # replace application specific paths in the command
328 command = self.replace_paths(command)
330 # replace application specific paths in the environment
331 env = self.get("env")
332 env = env and self.replace_paths(env)
334 shfile = os.path.join(self.app_home, "start.sh")
336 self.node.upload_command(command,
341 def execute_deploy_command(self, command):
343 # Upload the command to a bash script and run it
344 # in background ( but wait until the command has
345 # finished to continue )
346 shfile = os.path.join(self.app_home, "deploy.sh")
347 self.node.run_and_wait(command, self.run_home,
350 pidfile = "deploy_pidfile",
351 ecodefile = "deploy_exitcode",
352 stdout = "deploy_stdout",
353 stderr = "deploy_stderr")
355 def upload_sources(self):
356 sources = self.get("sources")
361 self.info("Uploading sources ")
363 sources = sources.split(' ')
365 # Separate sources that should be downloaded from
366 # the web, from sources that should be uploaded from
369 for source in list(sources):
370 if source.startswith("http") or source.startswith("https"):
371 # remove the hhtp source from the sources list
372 sources.remove(source)
374 command.append( " ( "
375 # Check if the source already exists
376 " ls ${SRC}/%(basename)s "
378 # If source doesn't exist, download it and check
379 # that it it downloaded ok
380 " wget -c --directory-prefix=${SRC} %(source)s && "
381 " ls ${SRC}/%(basename)s "
383 "basename": os.path.basename(source),
387 command = " && ".join(command)
389 # replace application specific paths in the command
390 command = self.replace_paths(command)
393 sources = ' '.join(sources)
394 self.node.upload(sources, self.node.src_dir, overwrite = False)
398 def upload_files(self):
399 files = self.get("files")
402 self.info("Uploading files %s " % files)
403 self.node.upload(files, self.node.share_dir, overwrite = False)
405 def upload_libraries(self):
406 libs = self.get("libs")
409 self.info("Uploading libraries %s " % libaries)
410 self.node.upload(libs, self.node.lib_dir, overwrite = False)
412 def upload_binaries(self):
413 bins = self.get("bins")
416 self.info("Uploading binaries %s " % binaries)
417 self.node.upload(bins, self.node.bin_dir, overwrite = False)
419 def upload_code(self):
420 code = self.get("code")
423 self.info("Uploading code")
425 dst = os.path.join(self.app_home, "code")
426 self.node.upload(code, dst, overwrite = False, text = True)
428 def upload_stdin(self):
429 stdin = self.get("stdin")
431 # create dir for sources
432 self.info("Uploading stdin")
434 # upload stdin file to ${SHARE_DIR} directory
435 basename = os.path.basename(stdin)
436 dst = os.path.join(self.node.share_dir, basename)
437 self.node.upload(stdin, dst, overwrite = False, text = True)
439 # create "stdin" symlink on ${APP_HOME} directory
440 command = "( cd %(app_home)s ; [ ! -f stdin ] && ln -s %(stdin)s stdin )" % ({
441 "app_home": self.app_home,
446 def install_dependencies(self):
447 depends = self.get("depends")
449 self.info("Installing dependencies %s" % depends)
450 return self.node.install_packages_command(depends)
453 build = self.get("build")
456 self.info("Building sources ")
458 # replace application specific paths in the command
459 return self.replace_paths(build)
462 install = self.get("install")
465 self.info("Installing sources ")
467 # replace application specific paths in the command
468 return self.replace_paths(install)
471 # Wait until node is associated and deployed
473 if not node or node.state < ResourceState.READY:
474 self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
475 self.ec.schedule(reschedule_delay, self.deploy)
478 command = self.get("command") or ""
479 self.info("Deploying command '%s' " % command)
486 super(LinuxApplication, self).deploy()
489 command = self.get("command")
491 self.info("Starting command '%s'" % command)
494 # If no command was given (i.e. Application was used for dependency
495 # installation), then the application is directly marked as FINISHED
499 if self.in_foreground:
500 self._run_in_foreground()
502 self._run_in_background()
504 super(LinuxApplication, self).start()
506 def _run_in_foreground(self):
507 command = self.get("command")
508 sudo = self.get("sudo") or False
509 x11 = self.get("forwardX11")
511 # For a command being executed in foreground, if there is stdin,
512 # it is expected to be text string not a file or pipe
513 stdin = self.get("stdin") or None
515 # Command will be launched in foreground and attached to the
516 # terminal using the node 'execute' in non blocking mode.
518 # We save the reference to the process in self._proc
519 # to be able to kill the process from the stop method.
520 # We also set blocking = False, since we don't want the
521 # thread to block until the execution finishes.
522 (out, err), self._proc = self.execute_command(self, command,
529 if self._proc.poll():
531 self.error(msg, out, err)
532 raise RuntimeError, msg
534 def _run_in_background(self):
535 command = self.get("command")
536 env = self.get("env")
537 sudo = self.get("sudo") or False
541 stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
544 # Command will be run as a daemon in baground and detached from any
546 # The command to run was previously uploaded to a bash script
547 # during deployment, now we launch the remote script using 'run'
548 # method from the node.
549 cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
550 (out, err), proc = self.node.run(cmd, self.run_home,
556 # check if execution errors occurred
557 msg = " Failed to start command '%s' " % command
561 self.error(msg, out, err)
562 raise RuntimeError, msg
564 # Wait for pid file to be generated
565 pid, ppid = self.node.wait_pid(self.run_home)
566 if pid: self._pid = int(pid)
567 if ppid: self._ppid = int(ppid)
569 # If the process is not running, check for error information
570 # on the remote machine
571 if not self.pid or not self.ppid:
572 (out, err), proc = self.node.check_errors(self.run_home,
575 # Out is what was written in the stderr file
578 msg = " Failed to start command '%s' " % command
579 self.error(msg, out, err)
580 raise RuntimeError, msg
583 """ Stops application execution
585 command = self.get('command') or ''
587 if self.state == ResourceState.STARTED:
589 self.info("Stopping command '%s' " % command)
591 # If the command is running in foreground (it was launched using
592 # the node 'execute' method), then we use the handler to the Popen
593 # process to kill it. Else we send a kill signal using the pid and ppid
594 # retrieved after running the command with the node 'run' method
598 # Only try to kill the process if the pid and ppid
600 if self.pid and self.ppid:
601 (out, err), proc = self.node.kill(self.pid, self.ppid,
602 sudo = self._sudo_kill)
604 # TODO: check if execution errors occurred
605 if proc.poll() or err:
606 msg = " Failed to STOP command '%s' " % self.get("command")
607 self.error(msg, out, err)
610 if self.state == ResourceState.STARTED:
611 super(LinuxApplication, self).stop()
614 self.info("Releasing resource")
616 tear_down = self.get("tearDown")
618 self.node.execute(tear_down)
622 if self.state != ResourceState.FAILED:
623 self.info("Resource released")
625 super(LinuxApplication, self).release()
629 """ Returns the state of the application
631 if self._state == ResourceState.STARTED:
632 if self.in_foreground:
633 # Check if the process we used to execute the command
634 # is still running ...
635 retcode = self._proc.poll()
637 # retcode == None -> running
638 # retcode > 0 -> error
639 # retcode == 0 -> finished
642 msg = " Failed to execute command '%s'" % self.get("command")
643 err = self._proc.stderr.read()
644 self.error(msg, out, err)
650 # We need to query the status of the command we launched in
651 # background. In order to avoid overwhelming the remote host and
652 # the local processor with too many ssh queries, the state is only
653 # requested every 'state_check_delay' seconds.
654 state_check_delay = 0.5
655 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
656 if self.pid and self.ppid:
657 # Make sure the process is still running in background
658 status = self.node.status(self.pid, self.ppid)
660 if status == ProcStatus.FINISHED:
661 # If the program finished, check if execution
663 (out, err), proc = self.node.check_errors(
667 msg = "Failed to execute command '%s'" % \
669 self.error(msg, out, err)
674 self._last_state_check = tnow()
678 def execute_command(self, command,
687 environ = self.node.format_environment(env, inline = True)
688 command = environ + command
689 command = self.replace_paths(command)
691 return self.node.execute(command,
694 forward_x11 = forward_x11,
697 def replace_paths(self, command):
699 Replace all special path tags with shell-escaped actual paths.
702 .replace("${USR}", self.node.usr_dir)
703 .replace("${LIB}", self.node.lib_dir)
704 .replace("${BIN}", self.node.bin_dir)
705 .replace("${SRC}", self.node.src_dir)
706 .replace("${SHARE}", self.node.share_dir)
707 .replace("${EXP}", self.node.exp_dir)
708 .replace("${EXP_HOME}", self.node.exp_home)
709 .replace("${APP_HOME}", self.app_home)
710 .replace("${RUN_HOME}", self.run_home)
711 .replace("${NODE_HOME}", self.node.node_home)
712 .replace("${HOME}", self.node.home_dir)
715 def valid_connection(self, guid):