2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23 ResourceState, reschedule_delay
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sshfuncs import ProcStatus
26 from nepi.util.timefuncs import tnow, tdiffsec
31 # TODO: Resolve wildcards in commands!!
32 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
35 class LinuxApplication(ResourceManager):
37 .. class:: Class Args :
39 :param ec: The Experiment controller
40 :type ec: ExperimentController
41 :param guid: guid of the RM
46 A LinuxApplication RM represents a process that can be executed in
47 a remote Linux host using SSH.
49 The LinuxApplication RM takes care of uploadin sources and any files
50 needed to run the experiment, to the remote host.
51 It also allows to provide source compilation (build) and installation
52 instructions, and takes care of automating the sources build and
53 installation tasks for the user.
55 It is important to note that files uploaded to the remote host have
56 two possible scopes: single-experiment or multi-experiment.
57 Single experiment files are those that will not be re-used by other
58 experiments. Multi-experiment files are those that will.
59 Sources and shared files are always made available to all experiments.
63 The directory structure used by LinuxApplication RM at the Linux
64 host is the following:
66 ${HOME}/.nepi/nepi-usr --> Base directory for multi-experiment files
68 ${LIB} |- /lib --> Base directory for libraries
69 ${BIN} |- /bin --> Base directory for binary files
70 ${SRC} |- /src --> Base directory for sources
71 ${SHARE} |- /share --> Base directory for other files
73 ${HOME}/.nepi/nepi-exp --> Base directory for single-experiment files
75 ${EXP_HOME} |- /<exp-id> --> Base directory for experiment exp-id
77 ${APP_HOME} |- /<app-guid> --> Base directory for application
78 | specific files (e.g. command.sh, input)
80 ${RUN_HOME} |- /<run-id> --> Base directory for run specific
84 _rtype = "LinuxApplication"
85 _help = "Runs an application on a Linux host with a BASH command "
86 _backend_type = "linux"
89 def _register_attributes(cls):
90 command = Attribute("command", "Command to execute at application start. "
91 "Note that commands will be executed in the ${RUN_HOME} directory, "
92 "make sure to take this into account when using relative paths. ",
94 forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections",
96 env = Attribute("env", "Environment variables string for command execution",
98 sudo = Attribute("sudo", "Run with root privileges",
100 depends = Attribute("depends",
101 "Space-separated list of packages required to run the application",
102 flags = Flags.Design)
103 sources = Attribute("sources",
104 "semi-colon separated list of regular files to be uploaded to ${SRC} "
105 "directory prior to building. Archives won't be expanded automatically. "
106 "Sources are globally available for all experiments unless "
107 "cleanHome is set to True (This will delete all sources). ",
108 flags = Flags.Design)
109 files = Attribute("files",
110 "semi-colon separated list of regular miscellaneous files to be uploaded "
111 "to ${SHARE} directory. "
112 "Files are globally available for all experiments unless "
113 "cleanHome is set to True (This will delete all files). ",
114 flags = Flags.Design)
115 libs = Attribute("libs",
116 "semi-colon separated list of libraries (e.g. .so files) to be uploaded "
117 "to ${LIB} directory. "
118 "Libraries are globally available for all experiments unless "
119 "cleanHome is set to True (This will delete all files). ",
120 flags = Flags.Design)
121 bins = Attribute("bins",
122 "semi-colon separated list of binary files to be uploaded "
123 "to ${BIN} directory. "
124 "Binaries are globally available for all experiments unless "
125 "cleanHome is set to True (This will delete all files). ",
126 flags = Flags.Design)
127 code = Attribute("code",
128 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
129 flags = Flags.Design)
130 build = Attribute("build",
131 "Build commands to execute after deploying the sources. "
132 "Sources are uploaded to the ${SRC} directory and code "
133 "is uploaded to the ${APP_HOME} directory. \n"
134 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
135 "./configure && make && make clean.\n"
136 "Make sure to make the build commands return with a nonzero exit "
138 flags = Flags.Design)
139 install = Attribute("install",
140 "Commands to transfer built files to their final destinations. "
141 "Install commands are executed after build commands. ",
142 flags = Flags.Design)
143 stdin = Attribute("stdin", "Standard input for the 'command'",
144 flags = Flags.Design)
145 tear_down = Attribute("tearDown", "Command to be executed just before "
146 "releasing the resource",
147 flags = Flags.Design)
149 cls._register_attribute(command)
150 cls._register_attribute(forward_x11)
151 cls._register_attribute(env)
152 cls._register_attribute(sudo)
153 cls._register_attribute(depends)
154 cls._register_attribute(sources)
155 cls._register_attribute(code)
156 cls._register_attribute(files)
157 cls._register_attribute(bins)
158 cls._register_attribute(libs)
159 cls._register_attribute(build)
160 cls._register_attribute(install)
161 cls._register_attribute(stdin)
162 cls._register_attribute(tear_down)
165 def _register_traces(cls):
166 stdout = Trace("stdout", "Standard output stream", enabled = True)
167 stderr = Trace("stderr", "Standard error stream", enabled = True)
169 cls._register_trace(stdout)
170 cls._register_trace(stderr)
172 def __init__(self, ec, guid):
173 super(LinuxApplication, self).__init__(ec, guid)
177 self._home = "app-%s" % self.guid
179 # whether the command should run in foreground attached
181 self._in_foreground = False
183 # whether to use sudo to kill the application process
184 self._sudo_kill = False
186 # keep a reference to the running process handler when
187 # the command is not executed as remote daemon in background
190 # timestamp of last state check of the application
191 self._last_state_check = tnow()
193 def log_message(self, msg):
194 return " guid %d - host %s - %s " % (self.guid,
195 self.node.get("hostname"), msg)
200 node = self.get_connected(LinuxNode.get_rtype())
202 msg = "Application %s guid %d NOT connected to Node" % (
203 self._rtype, self.guid)
204 raise RuntimeError, msg
212 return os.path.join(self.node.exp_home, self._home)
216 return os.path.join(self.app_home, self.ec.run_id)
227 def in_foreground(self):
228 """ Returns True if the command needs to be executed in foreground.
229 This means that command will be executed using 'execute' instead of
230 'run' ('run' executes a command in background and detached from the
233 When using X11 forwarding option, the command can not run in background
234 and detached from a terminal, since we need to keep the terminal attached
237 return self.get("forwardX11") or self._in_foreground
239 def trace_filepath(self, filename):
240 return os.path.join(self.run_home, filename)
242 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
243 self.info("Retrieving '%s' trace %s " % (name, attr))
245 path = self.trace_filepath(name)
247 command = "(test -f %s && echo 'success') || echo 'error'" % path
248 (out, err), proc = self.node.execute(command)
250 if (err and proc.poll()) or out.find("error") != -1:
251 msg = " Couldn't find trace %s " % name
252 self.error(msg, out, err)
255 if attr == TraceAttr.PATH:
258 if attr == TraceAttr.ALL:
259 (out, err), proc = self.node.check_output(self.run_home, name)
262 msg = " Couldn't read trace %s " % name
263 self.error(msg, out, err)
268 if attr == TraceAttr.STREAM:
269 cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
270 elif attr == TraceAttr.SIZE:
271 cmd = "stat -c%%s %s " % path
273 (out, err), proc = self.node.execute(cmd)
276 msg = " Couldn't find trace %s " % name
277 self.error(msg, out, err)
280 if attr == TraceAttr.SIZE:
281 out = int(out.strip())
285 def do_provision(self):
286 # take a snapshot of the system if user is root
287 # to ensure that cleanProcess will not kill
288 # pre-existent processes
289 if self.node.get("username") == 'root':
292 ps_aux = "ps aux |awk '{print $2,$11}'"
293 (out, err), proc = self.node.execute(ps_aux)
295 for line in out.strip().split("\n"):
296 parts = line.strip().split(" ")
297 procs[parts[0]] = parts[1]
298 pickle.dump(procs, open("/tmp/save.proc", "wb"))
300 # create run dir for application
301 self.node.mkdir(self.run_home)
303 # List of all the provision methods to invoke
310 self.upload_binaries,
312 self.upload_libraries,
317 # install dependencies
318 self.install_dependencies,
326 # Since provisioning takes a long time, before
327 # each step we check that the EC is still
330 self.debug("Interrupting provisioning. EC says 'ABORT")
337 # upload deploy script
338 deploy_command = ";".join(command)
339 self.execute_deploy_command(deploy_command)
341 # upload start script
342 self.upload_start_command()
344 self.info("Provisioning finished")
346 super(LinuxApplication, self).do_provision()
348 def upload_start_command(self, overwrite = False):
349 # Upload command to remote bash script
350 # - only if command can be executed in background and detached
351 command = self.get("command")
353 if command and not self.in_foreground:
354 self.info("Uploading command '%s'" % command)
356 # replace application specific paths in the command
357 command = self.replace_paths(command)
359 # replace application specific paths in the environment
360 env = self.get("env")
361 env = env and self.replace_paths(env)
363 shfile = os.path.join(self.app_home, "start.sh")
365 self.node.upload_command(command,
368 overwrite = overwrite)
370 def execute_deploy_command(self, command, prefix="deploy"):
372 # replace application specific paths in the command
373 command = self.replace_paths(command)
375 # replace application specific paths in the environment
376 env = self.get("env")
377 env = env and self.replace_paths(env)
379 # Upload the command to a bash script and run it
380 # in background ( but wait until the command has
381 # finished to continue )
382 shfile = os.path.join(self.app_home, "%s.sh" % prefix)
383 self.node.run_and_wait(command, self.run_home,
386 pidfile = "%s_pidfile" % prefix,
387 ecodefile = "%s_exitcode" % prefix,
388 stdout = "%s_stdout" % prefix,
389 stderr = "%s_stderr" % prefix)
391 def upload_sources(self, sources = None, src_dir = None):
393 sources = self.get("sources")
398 src_dir = self.node.src_dir
401 self.info("Uploading sources ")
403 sources = map(str.strip, sources.split(";"))
405 # Separate sources that should be downloaded from
406 # the web, from sources that should be uploaded from
409 for source in list(sources):
410 if source.startswith("http") or source.startswith("https"):
411 # remove the hhtp source from the sources list
412 sources.remove(source)
414 command.append( " ( "
415 # Check if the source already exists
416 " ls %(src_dir)s/%(basename)s "
418 # If source doesn't exist, download it and check
419 # that it it downloaded ok
420 " wget -c --directory-prefix=%(src_dir)s %(source)s && "
421 " ls %(src_dir)s/%(basename)s "
423 "basename": os.path.basename(source),
428 command = " && ".join(command)
430 # replace application specific paths in the command
431 command = self.replace_paths(command)
434 sources = ';'.join(sources)
435 self.node.upload(sources, src_dir, overwrite = False)
439 def upload_files(self, files = None):
441 files = self.get("files")
444 self.info("Uploading files %s " % files)
445 self.node.upload(files, self.node.share_dir, overwrite = False)
447 def upload_libraries(self, libs = None):
449 libs = self.get("libs")
452 self.info("Uploading libraries %s " % libaries)
453 self.node.upload(libs, self.node.lib_dir, overwrite = False)
455 def upload_binaries(self, bins = None):
457 bins = self.get("bins")
460 self.info("Uploading binaries %s " % binaries)
461 self.node.upload(bins, self.node.bin_dir, overwrite = False)
463 def upload_code(self, code = None):
465 code = self.get("code")
468 self.info("Uploading code")
470 dst = os.path.join(self.app_home, "code")
471 self.node.upload(code, dst, overwrite = False, text = True)
473 def upload_stdin(self, stdin = None):
475 stdin = self.get("stdin")
478 # create dir for sources
479 self.info("Uploading stdin")
481 # upload stdin file to ${SHARE_DIR} directory
482 if os.path.isfile(stdin):
483 basename = os.path.basename(stdin)
484 dst = os.path.join(self.node.share_dir, basename)
486 dst = os.path.join(self.app_home, "stdin")
488 self.node.upload(stdin, dst, overwrite = False, text = True)
490 # create "stdin" symlink on ${APP_HOME} directory
491 command = "( cd %(app_home)s ; [ ! -f stdin ] && ln -s %(stdin)s stdin )" % ({
492 "app_home": self.app_home,
497 def install_dependencies(self, depends = None):
499 depends = self.get("depends")
502 self.info("Installing dependencies %s" % depends)
503 return self.node.install_packages_command(depends)
505 def build(self, build = None):
507 build = self.get("build")
510 self.info("Building sources ")
512 # replace application specific paths in the command
513 return self.replace_paths(build)
515 def install(self, install = None):
517 install = self.get("install")
520 self.info("Installing sources ")
522 # replace application specific paths in the command
523 return self.replace_paths(install)
526 # Wait until node is associated and deployed
528 if not node or node.state < ResourceState.READY:
529 self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
530 self.ec.schedule(reschedule_delay, self.deploy)
532 command = self.get("command") or ""
533 self.info("Deploying command '%s' " % command)
537 super(LinuxApplication, self).do_deploy()
540 command = self.get("command")
542 self.info("Starting command '%s'" % command)
545 # If no command was given (i.e. Application was used for dependency
546 # installation), then the application is directly marked as STOPPED
547 super(LinuxApplication, self).set_stopped()
549 if self.in_foreground:
550 self._run_in_foreground()
552 self._run_in_background()
554 super(LinuxApplication, self).do_start()
556 def _run_in_foreground(self):
557 command = self.get("command")
558 sudo = self.get("sudo") or False
559 x11 = self.get("forwardX11")
560 env = self.get("env")
562 # Command will be launched in foreground and attached to the
563 # terminal using the node 'execute' in non blocking mode.
565 # We save the reference to the process in self._proc
566 # to be able to kill the process from the stop method.
567 # We also set blocking = False, since we don't want the
568 # thread to block until the execution finishes.
569 (out, err), self._proc = self.execute_command(command,
575 if self._proc.poll():
576 self.error(msg, out, err)
577 raise RuntimeError, msg
579 def _run_in_background(self):
580 command = self.get("command")
581 env = self.get("env")
582 sudo = self.get("sudo") or False
586 stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
589 # Command will be run as a daemon in baground and detached from any
591 # The command to run was previously uploaded to a bash script
592 # during deployment, now we launch the remote script using 'run'
593 # method from the node.
594 cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
595 (out, err), proc = self.node.run(cmd, self.run_home,
601 # check if execution errors occurred
602 msg = " Failed to start command '%s' " % command
605 self.error(msg, out, err)
606 raise RuntimeError, msg
608 # Wait for pid file to be generated
609 pid, ppid = self.node.wait_pid(self.run_home)
610 if pid: self._pid = int(pid)
611 if ppid: self._ppid = int(ppid)
613 # If the process is not running, check for error information
614 # on the remote machine
615 if not self.pid or not self.ppid:
616 (out, err), proc = self.node.check_errors(self.run_home,
619 # Out is what was written in the stderr file
621 msg = " Failed to start command '%s' " % command
622 self.error(msg, out, err)
623 raise RuntimeError, msg
626 """ Stops application execution
628 command = self.get('command') or ''
630 if self.state == ResourceState.STARTED:
632 self.info("Stopping command '%s' " % command)
634 # If the command is running in foreground (it was launched using
635 # the node 'execute' method), then we use the handler to the Popen
636 # process to kill it. Else we send a kill signal using the pid and ppid
637 # retrieved after running the command with the node 'run' method
641 # Only try to kill the process if the pid and ppid
643 if self.pid and self.ppid:
644 (out, err), proc = self.node.kill(self.pid, self.ppid,
645 sudo = self._sudo_kill)
648 # TODO: check if execution errors occurred
649 if (proc and proc.poll()) or err:
650 msg = " Failed to STOP command '%s' " % self.get("command")
651 self.error(msg, out, err)
654 super(LinuxApplication, self).do_stop()
656 def do_release(self):
657 self.info("Releasing resource")
661 tear_down = self.get("tearDown")
663 self.node.execute(tear_down)
665 hard_release = self.get("hardRelease")
667 self.node.rmdir(self.app_home)
669 super(LinuxApplication, self).do_release()
673 """ Returns the state of the application
675 if self._state == ResourceState.STARTED:
676 if self.in_foreground:
677 # Check if the process we used to execute the command
678 # is still running ...
679 retcode = self._proc.poll()
681 # retcode == None -> running
682 # retcode > 0 -> error
683 # retcode == 0 -> finished
686 msg = " Failed to execute command '%s'" % self.get("command")
687 err = self._proc.stderr.read()
688 self.error(msg, out, err)
694 # We need to query the status of the command we launched in
695 # background. In order to avoid overwhelming the remote host and
696 # the local processor with too many ssh queries, the state is only
697 # requested every 'state_check_delay' seconds.
698 state_check_delay = 0.5
699 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
700 if self.pid and self.ppid:
701 # Make sure the process is still running in background
702 status = self.node.status(self.pid, self.ppid)
704 if status == ProcStatus.FINISHED:
705 # If the program finished, check if execution
707 (out, err), proc = self.node.check_errors(
711 msg = "Failed to execute command '%s'" % \
713 self.error(msg, out, err)
718 self._last_state_check = tnow()
722 def execute_command(self, command,
731 environ = self.node.format_environment(env, inline = True)
732 command = environ + command
733 command = self.replace_paths(command)
735 return self.node.execute(command,
738 forward_x11 = forward_x11,
741 def replace_paths(self, command):
743 Replace all special path tags with shell-escaped actual paths.
746 .replace("${USR}", self.node.usr_dir)
747 .replace("${LIB}", self.node.lib_dir)
748 .replace("${BIN}", self.node.bin_dir)
749 .replace("${SRC}", self.node.src_dir)
750 .replace("${SHARE}", self.node.share_dir)
751 .replace("${EXP}", self.node.exp_dir)
752 .replace("${EXP_HOME}", self.node.exp_home)
753 .replace("${APP_HOME}", self.app_home)
754 .replace("${RUN_HOME}", self.run_home)
755 .replace("${NODE_HOME}", self.node.node_home)
756 .replace("${HOME}", self.node.home_dir)
759 def valid_connection(self, guid):