2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sshfuncs import ProcStatus
26 from nepi.util.timefuncs import tnow, tdiffsec
31 # TODO: Resolve wildcards in commands!!
34 class LinuxApplication(ResourceManager):
36 .. class:: Class Args :
38 :param ec: The Experiment controller
39 :type ec: ExperimentController
40 :param guid: guid of the RM
45 A LinuxApplication RM represents a process that can be executed in
46 a remote Linux host using SSH.
48 The LinuxApplication RM takes care of uploadin sources and any files
49 needed to run the experiment, to the remote host.
50 It also allows to provide source compilation (build) and installation
51 instructions, and takes care of automating the sources build and
52 installation tasks for the user.
54 It is important to note that files uploaded to the remote host have
55 two possible scopes: single-experiment or multi-experiment.
56 Single experiment files are those that will not be re-used by other
57 experiments. Multi-experiment files are those that will.
58 Sources and shared files are always made available to all experiments.
62 The directory structure used by LinuxApplication RM at the Linux
63 host is the following:
65 ${HOME}/nepi-usr --> Base directory for multi-experiment files
67 ${LIB} |- /lib --> Base directory for libraries
68 ${BIN} |- /bin --> Base directory for binary files
69 ${SRC} |- /src --> Base directory for sources
70 ${SHARE} |- /share --> Base directory for other files
72 ${HOME}/nepi-exp --> Base directory for single-experiment files
74 ${EXP_HOME} |- /<exp-id> --> Base directory for experiment exp-id
76 ${APP_HOME} |- /<app-guid> --> Base directory for application
77 | specific files (e.g. command.sh, input)
79 ${RUN_HOME} |- /<run-id> --> Base directory for run specific
83 _rtype = "LinuxApplication"
86 def _register_attributes(cls):
87 command = Attribute("command", "Command to execute at application start. "
88 "Note that commands will be executed in the ${RUN_HOME} directory, "
89 "make sure to take this into account when using relative paths. ",
90 flags = Flags.ExecReadOnly)
91 forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections",
92 flags = Flags.ExecReadOnly)
93 env = Attribute("env", "Environment variables string for command execution",
94 flags = Flags.ExecReadOnly)
95 sudo = Attribute("sudo", "Run with root privileges",
96 flags = Flags.ExecReadOnly)
97 depends = Attribute("depends",
98 "Space-separated list of packages required to run the application",
99 flags = Flags.ExecReadOnly)
100 sources = Attribute("sources",
101 "Space-separated list of regular files to be uploaded to ${SRC} "
102 "directory prior to building. Archives won't be expanded automatically. "
103 "Sources are globally available for all experiments unless "
104 "cleanHome is set to True (This will delete all sources). ",
105 flags = Flags.ExecReadOnly)
106 files = Attribute("files",
107 "Space-separated list of regular miscellaneous files to be uploaded "
108 "to ${SHARE} directory. "
109 "Files are globally available for all experiments unless "
110 "cleanHome is set to True (This will delete all files). ",
111 flags = Flags.ExecReadOnly)
112 libs = Attribute("libs",
113 "Space-separated list of libraries (e.g. .so files) to be uploaded "
114 "to ${LIB} directory. "
115 "Libraries are globally available for all experiments unless "
116 "cleanHome is set to True (This will delete all files). ",
117 flags = Flags.ExecReadOnly)
118 bins = Attribute("bins",
119 "Space-separated list of binary files to be uploaded "
120 "to ${BIN} directory. "
121 "Binaries are globally available for all experiments unless "
122 "cleanHome is set to True (This will delete all files). ",
123 flags = Flags.ExecReadOnly)
124 code = Attribute("code",
125 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
126 flags = Flags.ExecReadOnly)
127 build = Attribute("build",
128 "Build commands to execute after deploying the sources. "
129 "Sources are uploaded to the ${SRC} directory and code "
130 "is uploaded to the ${APP_HOME} directory. \n"
131 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
132 "./configure && make && make clean.\n"
133 "Make sure to make the build commands return with a nonzero exit "
135 flags = Flags.ReadOnly)
136 install = Attribute("install",
137 "Commands to transfer built files to their final destinations. "
138 "Install commands are executed after build commands. ",
139 flags = Flags.ReadOnly)
140 stdin = Attribute("stdin", "Standard input for the 'command'",
141 flags = Flags.ExecReadOnly)
142 tear_down = Attribute("tearDown", "Command to be executed just before "
143 "releasing the resource",
144 flags = Flags.ReadOnly)
146 cls._register_attribute(command)
147 cls._register_attribute(forward_x11)
148 cls._register_attribute(env)
149 cls._register_attribute(sudo)
150 cls._register_attribute(depends)
151 cls._register_attribute(sources)
152 cls._register_attribute(code)
153 cls._register_attribute(files)
154 cls._register_attribute(bins)
155 cls._register_attribute(libs)
156 cls._register_attribute(build)
157 cls._register_attribute(install)
158 cls._register_attribute(stdin)
159 cls._register_attribute(tear_down)
162 def _register_traces(cls):
163 stdout = Trace("stdout", "Standard output stream")
164 stderr = Trace("stderr", "Standard error stream")
166 cls._register_trace(stdout)
167 cls._register_trace(stderr)
169 def __init__(self, ec, guid):
170 super(LinuxApplication, self).__init__(ec, guid)
173 self._home = "app-%s" % self.guid
174 # whether the command should run in foreground attached
176 self._in_foreground = False
178 # whether to use sudo to kill the application process
179 self._sudo_kill = False
181 # keep a reference to the running process handler when
182 # the command is not executed as remote daemon in background
185 # timestamp of last state check of the application
186 self._last_state_check = tnow()
188 def log_message(self, msg):
189 return " guid %d - host %s - %s " % (self.guid,
190 self.node.get("hostname"), msg)
194 node = self.get_connected(LinuxNode.rtype())
195 if node: return node[0]
200 return os.path.join(self.node.exp_home, self._home)
204 return os.path.join(self.app_home, self.ec.run_id)
215 def in_foreground(self):
216 """ Returns True if the command needs to be executed in foreground.
217 This means that command will be executed using 'execute' instead of
218 'run' ('run' executes a command in background and detached from the
221 When using X11 forwarding option, the command can not run in background
222 and detached from a terminal, since we need to keep the terminal attached
225 return self.get("forwardX11") or self._in_foreground
227 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
228 self.info("Retrieving '%s' trace %s " % (name, attr))
230 path = os.path.join(self.run_home, name)
232 command = "(test -f %s && echo 'success') || echo 'error'" % path
233 (out, err), proc = self.node.execute(command)
235 if (err and proc.poll()) or out.find("error") != -1:
236 msg = " Couldn't find trace %s " % name
237 self.error(msg, out, err)
240 if attr == TraceAttr.PATH:
243 if attr == TraceAttr.ALL:
244 (out, err), proc = self.node.check_output(self.run_home, name)
247 msg = " Couldn't read trace %s " % name
248 self.error(msg, out, err)
253 if attr == TraceAttr.STREAM:
254 cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
255 elif attr == TraceAttr.SIZE:
256 cmd = "stat -c%%s %s " % path
258 (out, err), proc = self.node.execute(cmd)
261 msg = " Couldn't find trace %s " % name
262 self.error(msg, out, err)
265 if attr == TraceAttr.SIZE:
266 out = int(out.strip())
271 # create run dir for application
272 self.node.mkdir(self.run_home)
274 # List of all the provision methods to invoke
281 self.upload_binaries,
283 self.upload_libraries,
288 # install dependencies
289 self.install_dependencies,
297 # Since provisioning takes a long time, before
298 # each step we check that the EC is still
301 raise RuntimeError, "EC finished"
307 # upload deploy script
308 deploy_command = ";".join(command)
309 self.execute_deploy_command(deploy_command)
311 # upload start script
312 self.upload_start_command()
314 self.info("Provisioning finished")
316 super(LinuxApplication, self).provision()
318 def upload_start_command(self):
319 # Upload command to remote bash script
320 # - only if command can be executed in background and detached
321 command = self.get("command")
323 if command and not self.in_foreground:
324 self.info("Uploading command '%s'" % command)
326 # replace application specific paths in the command
327 command = self.replace_paths(command)
329 # replace application specific paths in the environment
330 env = self.get("env")
331 env = env and self.replace_paths(env)
333 shfile = os.path.join(self.app_home, "start.sh")
335 self.node.upload_command(command,
340 def execute_deploy_command(self, command):
342 # Upload the command to a bash script and run it
343 # in background ( but wait until the command has
344 # finished to continue )
345 shfile = os.path.join(self.app_home, "deploy.sh")
346 self.node.run_and_wait(command, self.run_home,
349 pidfile = "deploy_pidfile",
350 ecodefile = "deploy_exitcode",
351 stdout = "deploy_stdout",
352 stderr = "deploy_stderr")
354 def upload_sources(self):
355 sources = self.get("sources")
360 self.info("Uploading sources ")
362 sources = sources.split(' ')
364 # Separate sources that should be downloaded from
365 # the web, from sources that should be uploaded from
368 for source in list(sources):
369 if source.startswith("http") or source.startswith("https"):
370 # remove the hhtp source from the sources list
371 sources.remove(source)
373 command.append( " ( "
374 # Check if the source already exists
375 " ls ${SRC}/%(basename)s "
377 # If source doesn't exist, download it and check
378 # that it it downloaded ok
379 " wget -c --directory-prefix=${SRC} %(source)s && "
380 " ls ${SRC}/%(basename)s "
382 "basename": os.path.basename(source),
386 command = " && ".join(command)
388 # replace application specific paths in the command
389 command = self.replace_paths(command)
392 sources = ' '.join(sources)
393 self.node.upload(sources, self.node.src_dir, overwrite = False)
397 def upload_files(self):
398 files = self.get("files")
401 self.info("Uploading files %s " % files)
402 self.node.upload(files, self.node.share_dir, overwrite = False)
404 def upload_libraries(self):
405 libs = self.get("libs")
408 self.info("Uploading libraries %s " % libaries)
409 self.node.upload(libs, self.node.lib_dir, overwrite = False)
411 def upload_binaries(self):
412 bins = self.get("bins")
415 self.info("Uploading binaries %s " % binaries)
416 self.node.upload(bins, self.node.bin_dir, overwrite = False)
418 def upload_code(self):
419 code = self.get("code")
422 self.info("Uploading code")
424 dst = os.path.join(self.app_home, "code")
425 self.node.upload(code, dst, overwrite = False, text = True)
427 def upload_stdin(self):
428 stdin = self.get("stdin")
430 # create dir for sources
431 self.info("Uploading stdin")
433 # upload stdin file to ${SHARE_DIR} directory
434 basename = os.path.basename(stdin)
435 dst = os.path.join(self.node.share_dir, basename)
436 self.node.upload(stdin, dst, overwrite = False, text = True)
438 # create "stdin" symlink on ${APP_HOME} directory
439 command = "( cd %(app_home)s ; [ ! -f stdin ] && ln -s %(stdin)s stdin )" % ({
440 "app_home": self.app_home,
445 def install_dependencies(self):
446 depends = self.get("depends")
448 self.info("Installing dependencies %s" % depends)
449 return self.node.install_packages_command(depends)
452 build = self.get("build")
455 self.info("Building sources ")
457 # replace application specific paths in the command
458 return self.replace_paths(build)
461 install = self.get("install")
464 self.info("Installing sources ")
466 # replace application specific paths in the command
467 return self.replace_paths(install)
470 # Wait until node is associated and deployed
472 if not node or node.state < ResourceState.READY:
473 self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
474 self.ec.schedule(reschedule_delay, self.deploy)
477 command = self.get("command") or ""
478 self.info("Deploying command '%s' " % command)
485 super(LinuxApplication, self).deploy()
488 command = self.get("command")
490 self.info("Starting command '%s'" % command)
493 # If no command was given (i.e. Application was used for dependency
494 # installation), then the application is directly marked as FINISHED
495 self._state = ResourceState.FINISHED
498 if self.in_foreground:
499 self._run_in_foreground()
501 self._run_in_background()
503 super(LinuxApplication, self).start()
505 def _run_in_foreground(self):
506 command = self.get("command")
507 sudo = self.get("sudo") or False
508 x11 = self.get("forwardX11")
510 # For a command being executed in foreground, if there is stdin,
511 # it is expected to be text string not a file or pipe
512 stdin = self.get("stdin") or None
514 # Command will be launched in foreground and attached to the
515 # terminal using the node 'execute' in non blocking mode.
517 # We save the reference to the process in self._proc
518 # to be able to kill the process from the stop method.
519 # We also set blocking = False, since we don't want the
520 # thread to block until the execution finishes.
521 (out, err), self._proc = self.execute_command(self, command,
528 if self._proc.poll():
530 self.error(msg, out, err)
531 raise RuntimeError, msg
533 def _run_in_background(self):
534 command = self.get("command")
535 env = self.get("env")
536 sudo = self.get("sudo") or False
540 stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
543 # Command will be run as a daemon in baground and detached from any
545 # The command to run was previously uploaded to a bash script
546 # during deployment, now we launch the remote script using 'run'
547 # method from the node.
548 cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
549 (out, err), proc = self.node.run(cmd, self.run_home,
555 # check if execution errors occurred
556 msg = " Failed to start command '%s' " % command
560 self.error(msg, out, err)
561 raise RuntimeError, msg
563 # Wait for pid file to be generated
564 pid, ppid = self.node.wait_pid(self.run_home)
565 if pid: self._pid = int(pid)
566 if ppid: self._ppid = int(ppid)
568 # If the process is not running, check for error information
569 # on the remote machine
570 if not self.pid or not self.ppid:
571 (out, err), proc = self.node.check_errors(self.run_home,
574 # Out is what was written in the stderr file
577 msg = " Failed to start command '%s' " % command
578 self.error(msg, out, err)
579 raise RuntimeError, msg
582 """ Stops application execution
584 command = self.get('command') or ''
586 if self.state == ResourceState.STARTED:
588 self.info("Stopping command '%s'" % command)
590 # If the command is running in foreground (it was launched using
591 # the node 'execute' method), then we use the handler to the Popen
592 # process to kill it. Else we send a kill signal using the pid and ppid
593 # retrieved after running the command with the node 'run' method
599 # Only try to kill the process if the pid and ppid
601 if self.pid and self.ppid:
602 (out, err), proc = self.node.kill(self.pid, self.ppid,
603 sudo = self._sudo_kill)
605 if proc.poll() or err:
606 # check if execution errors occurred
607 msg = " Failed to STOP command '%s' " % self.get("command")
608 self.error(msg, out, err)
611 if self.state == ResourceState.STARTED:
612 super(LinuxApplication, self).stop()
615 self.info("Releasing resource")
617 tear_down = self.get("tearDown")
619 self.node.execute(tear_down)
623 if self.state == ResourceState.STOPPED:
624 self.info("Resource released")
626 super(LinuxApplication, self).release()
630 """ Returns the state of the application
632 if self._state == ResourceState.STARTED:
633 if self.in_foreground:
634 # Check if the process we used to execute the command
635 # is still running ...
636 retcode = self._proc.poll()
638 # retcode == None -> running
639 # retcode > 0 -> error
640 # retcode == 0 -> finished
643 msg = " Failed to execute command '%s'" % self.get("command")
644 err = self._proc.stderr.read()
645 self.error(msg, out, err)
648 self._state = ResourceState.FINISHED
651 # We need to query the status of the command we launched in
652 # background. In order to avoid overwhelming the remote host and
653 # the local processor with too many ssh queries, the state is only
654 # requested every 'state_check_delay' seconds.
655 state_check_delay = 0.5
656 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
657 if self.pid and self.ppid:
658 # Make sure the process is still running in background
659 status = self.node.status(self.pid, self.ppid)
661 if status == ProcStatus.FINISHED:
662 # If the program finished, check if execution
664 (out, err), proc = self.node.check_errors(
668 msg = " Failed to execute command '%s'" % \
670 self.error(msg, out, err)
673 self._state = ResourceState.FINISHED
675 self._last_state_check = tnow()
679 def execute_command(self, command,
688 environ = self.node.format_environment(env, inline = True)
689 command = environ + command
690 command = self.replace_paths(command)
692 return self.node.execute(command,
695 forward_x11 = forward_x11,
698 def replace_paths(self, command):
700 Replace all special path tags with shell-escaped actual paths.
703 .replace("${USR}", self.node.usr_dir)
704 .replace("${LIB}", self.node.lib_dir)
705 .replace("${BIN}", self.node.bin_dir)
706 .replace("${SRC}", self.node.src_dir)
707 .replace("${SHARE}", self.node.share_dir)
708 .replace("${EXP}", self.node.exp_dir)
709 .replace("${EXP_HOME}", self.node.exp_home)
710 .replace("${APP_HOME}", self.app_home)
711 .replace("${RUN_HOME}", self.run_home)
712 .replace("${NODE_HOME}", self.node.node_home)
713 .replace("${HOME}", self.node.home_dir)
716 def valid_connection(self, guid):