2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.trace import Trace, TraceAttr
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sshfuncs import ProcStatus
25 from nepi.util.timefuncs import tnow, tdiffsec
30 # TODO: Resolve wildcards in commands!!
31 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
34 class LinuxApplication(ResourceManager):
36 .. class:: Class Args :
38 :param ec: The Experiment controller
39 :type ec: ExperimentController
40 :param guid: guid of the RM
45 A LinuxApplication RM represents a process that can be executed in
46 a remote Linux host using SSH.
48 The LinuxApplication RM takes care of uploadin sources and any files
49 needed to run the experiment, to the remote host.
50 It also allows to provide source compilation (build) and installation
51 instructions, and takes care of automating the sources build and
52 installation tasks for the user.
54 It is important to note that files uploaded to the remote host have
55 two possible scopes: single-experiment or multi-experiment.
56 Single experiment files are those that will not be re-used by other
57 experiments. Multi-experiment files are those that will.
58 Sources and shared files are always made available to all experiments.
62 The directory structure used by LinuxApplication RM at the Linux
63 host is the following:
65 ${HOME}/.nepi/nepi-usr --> Base directory for multi-experiment files
67 ${LIB} |- /lib --> Base directory for libraries
68 ${BIN} |- /bin --> Base directory for binary files
69 ${SRC} |- /src --> Base directory for sources
70 ${SHARE} |- /share --> Base directory for other files
72 ${HOME}/.nepi/nepi-exp --> Base directory for single-experiment files
74 ${EXP_HOME} |- /<exp-id> --> Base directory for experiment exp-id
76 ${APP_HOME} |- /<app-guid> --> Base directory for application
77 | specific files (e.g. command.sh, input)
79 ${RUN_HOME} |- /<run-id> --> Base directory for run specific
83 _rtype = "linux::Application"
84 _help = "Runs an application on a Linux host with a BASH command "
88 def _register_attributes(cls):
89 command = Attribute("command", "Command to execute at application start. "
90 "Note that commands will be executed in the ${RUN_HOME} directory, "
91 "make sure to take this into account when using relative paths. ",
93 forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections",
95 env = Attribute("env", "Environment variables string for command execution",
97 sudo = Attribute("sudo", "Run with root privileges",
99 depends = Attribute("depends",
100 "Space-separated list of packages required to run the application",
101 flags = Flags.Design)
102 sources = Attribute("sources",
103 "semi-colon separated list of regular files to be uploaded to ${SRC} "
104 "directory prior to building. Archives won't be expanded automatically. "
105 "Sources are globally available for all experiments unless "
106 "cleanHome is set to True (This will delete all sources). ",
107 flags = Flags.Design)
108 files = Attribute("files",
109 "semi-colon separated list of regular miscellaneous files to be uploaded "
110 "to ${SHARE} directory. "
111 "Files are globally available for all experiments unless "
112 "cleanHome is set to True (This will delete all files). ",
113 flags = Flags.Design)
114 libs = Attribute("libs",
115 "semi-colon separated list of libraries (e.g. .so files) to be uploaded "
116 "to ${LIB} directory. "
117 "Libraries are globally available for all experiments unless "
118 "cleanHome is set to True (This will delete all files). ",
119 flags = Flags.Design)
120 bins = Attribute("bins",
121 "semi-colon separated list of binary files to be uploaded "
122 "to ${BIN} directory. "
123 "Binaries are globally available for all experiments unless "
124 "cleanHome is set to True (This will delete all files). ",
125 flags = Flags.Design)
126 code = Attribute("code",
127 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
128 flags = Flags.Design)
129 build = Attribute("build",
130 "Build commands to execute after deploying the sources. "
131 "Sources are uploaded to the ${SRC} directory and code "
132 "is uploaded to the ${APP_HOME} directory. \n"
133 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
134 "./configure && make && make clean.\n"
135 "Make sure to make the build commands return with a nonzero exit "
137 flags = Flags.Design)
138 install = Attribute("install",
139 "Commands to transfer built files to their final destinations. "
140 "Install commands are executed after build commands. ",
141 flags = Flags.Design)
142 stdin = Attribute("stdin", "Standard input for the 'command'",
143 flags = Flags.Design)
144 tear_down = Attribute("tearDown", "Command to be executed just before "
145 "releasing the resource",
146 flags = Flags.Design)
148 cls._register_attribute(command)
149 cls._register_attribute(forward_x11)
150 cls._register_attribute(env)
151 cls._register_attribute(sudo)
152 cls._register_attribute(depends)
153 cls._register_attribute(sources)
154 cls._register_attribute(code)
155 cls._register_attribute(files)
156 cls._register_attribute(bins)
157 cls._register_attribute(libs)
158 cls._register_attribute(build)
159 cls._register_attribute(install)
160 cls._register_attribute(stdin)
161 cls._register_attribute(tear_down)
164 def _register_traces(cls):
165 stdout = Trace("stdout", "Standard output stream", enabled = True)
166 stderr = Trace("stderr", "Standard error stream", enabled = True)
168 cls._register_trace(stdout)
169 cls._register_trace(stderr)
171 def __init__(self, ec, guid):
172 super(LinuxApplication, self).__init__(ec, guid)
176 self._home = "app-%s" % self.guid
178 # whether the command should run in foreground attached
180 self._in_foreground = False
182 # whether to use sudo to kill the application process
183 self._sudo_kill = False
185 # keep a reference to the running process handler when
186 # the command is not executed as remote daemon in background
189 # timestamp of last state check of the application
190 self._last_state_check = tnow()
192 def log_message(self, msg):
193 return " guid %d - host %s - %s " % (self.guid,
194 self.node.get("hostname"), msg)
199 node = self.get_connected(LinuxNode.get_rtype())
201 msg = "Application %s guid %d NOT connected to Node" % (
202 self._rtype, self.guid)
203 raise RuntimeError, msg
211 return os.path.join(self.node.exp_home, self._home)
215 return os.path.join(self.app_home, self.ec.run_id)
226 def in_foreground(self):
227 """ Returns True if the command needs to be executed in foreground.
228 This means that command will be executed using 'execute' instead of
229 'run' ('run' executes a command in background and detached from the
232 When using X11 forwarding option, the command can not run in background
233 and detached from a terminal, since we need to keep the terminal attached
236 return self.get("forwardX11") or self._in_foreground
238 def trace_filepath(self, filename):
239 return os.path.join(self.run_home, filename)
241 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
242 self.info("Retrieving '%s' trace %s " % (name, attr))
244 path = self.trace_filepath(name)
246 command = "(test -f %s && echo 'success') || echo 'error'" % path
247 (out, err), proc = self.node.execute(command)
249 if (err and proc.poll()) or out.find("error") != -1:
250 msg = " Couldn't find trace %s " % name
251 self.error(msg, out, err)
254 if attr == TraceAttr.PATH:
257 if attr == TraceAttr.ALL:
258 (out, err), proc = self.node.check_output(self.run_home, name)
261 msg = " Couldn't read trace %s " % name
262 self.error(msg, out, err)
267 if attr == TraceAttr.STREAM:
268 cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
269 elif attr == TraceAttr.SIZE:
270 cmd = "stat -c%%s %s " % path
272 (out, err), proc = self.node.execute(cmd)
275 msg = " Couldn't find trace %s " % name
276 self.error(msg, out, err)
279 if attr == TraceAttr.SIZE:
280 out = int(out.strip())
284 def do_provision(self):
285 # take a snapshot of the system if user is root
286 # to ensure that cleanProcess will not kill
287 # pre-existent processes
288 if self.node.get("username") == 'root':
291 ps_aux = "ps aux |awk '{print $2,$11}'"
292 (out, err), proc = self.node.execute(ps_aux)
294 for line in out.strip().split("\n"):
295 parts = line.strip().split(" ")
296 procs[parts[0]] = parts[1]
297 pickle.dump(procs, open("/tmp/save.proc", "wb"))
299 # create run dir for application
300 self.node.mkdir(self.run_home)
302 # List of all the provision methods to invoke
309 self.upload_binaries,
311 self.upload_libraries,
316 # install dependencies
317 self.install_dependencies,
325 # Since provisioning takes a long time, before
326 # each step we check that the EC is still
329 self.debug("Interrupting provisioning. EC says 'ABORT")
336 # upload deploy script
337 deploy_command = ";".join(command)
338 self.execute_deploy_command(deploy_command)
340 # upload start script
341 self.upload_start_command()
343 self.info("Provisioning finished")
345 super(LinuxApplication, self).do_provision()
347 def upload_start_command(self, overwrite = False):
348 # Upload command to remote bash script
349 # - only if command can be executed in background and detached
350 command = self.get("command")
352 if command and not self.in_foreground:
353 self.info("Uploading command '%s'" % command)
355 # replace application specific paths in the command
356 command = self.replace_paths(command)
357 # replace application specific paths in the environment
358 env = self.get("env")
359 env = env and self.replace_paths(env)
361 shfile = os.path.join(self.app_home, "start.sh")
363 self.node.upload_command(command,
366 overwrite = overwrite)
368 def execute_deploy_command(self, command, prefix="deploy"):
370 # replace application specific paths in the command
371 command = self.replace_paths(command)
373 # replace application specific paths in the environment
374 env = self.get("env")
375 env = env and self.replace_paths(env)
377 # Upload the command to a bash script and run it
378 # in background ( but wait until the command has
379 # finished to continue )
380 shfile = os.path.join(self.app_home, "%s.sh" % prefix)
381 self.node.run_and_wait(command, self.run_home,
384 pidfile = "%s_pidfile" % prefix,
385 ecodefile = "%s_exitcode" % prefix,
386 stdout = "%s_stdout" % prefix,
387 stderr = "%s_stderr" % prefix)
389 def upload_sources(self, sources = None, src_dir = None):
391 sources = self.get("sources")
396 src_dir = self.node.src_dir
399 self.info("Uploading sources ")
401 sources = map(str.strip, sources.split(";"))
403 # Separate sources that should be downloaded from
404 # the web, from sources that should be uploaded from
407 for source in list(sources):
408 if source.startswith("http") or source.startswith("https"):
409 # remove the hhtp source from the sources list
410 sources.remove(source)
412 command.append( " ( "
413 # Check if the source already exists
414 " ls %(src_dir)s/%(basename)s "
416 # If source doesn't exist, download it and check
417 # that it it downloaded ok
418 " wget -c --directory-prefix=%(src_dir)s %(source)s && "
419 " ls %(src_dir)s/%(basename)s "
421 "basename": os.path.basename(source),
426 command = " && ".join(command)
428 # replace application specific paths in the command
429 command = self.replace_paths(command)
432 sources = ';'.join(sources)
433 self.node.upload(sources, src_dir, overwrite = False)
437 def upload_files(self, files = None):
439 files = self.get("files")
442 self.info("Uploading files %s " % files)
443 self.node.upload(files, self.node.share_dir, overwrite = False)
445 def upload_libraries(self, libs = None):
447 libs = self.get("libs")
450 self.info("Uploading libraries %s " % libaries)
451 self.node.upload(libs, self.node.lib_dir, overwrite = False)
453 def upload_binaries(self, bins = None):
455 bins = self.get("bins")
458 self.info("Uploading binaries %s " % binaries)
459 self.node.upload(bins, self.node.bin_dir, overwrite = False)
461 def upload_code(self, code = None):
463 code = self.get("code")
466 self.info("Uploading code")
468 dst = os.path.join(self.app_home, "code")
469 self.node.upload(code, dst, overwrite = False, text = True)
471 def upload_stdin(self, stdin = None):
473 stdin = self.get("stdin")
476 # create dir for sources
477 self.info("Uploading stdin")
479 # upload stdin file to ${SHARE_DIR} directory
480 if os.path.isfile(stdin):
481 basename = os.path.basename(stdin)
482 dst = os.path.join(self.node.share_dir, basename)
484 dst = os.path.join(self.app_home, "stdin")
486 self.node.upload(stdin, dst, overwrite = False, text = True)
488 # create "stdin" symlink on ${APP_HOME} directory
489 command = "( cd %(app_home)s ; [ ! -f stdin ] && ln -s %(stdin)s stdin )" % ({
490 "app_home": self.app_home,
495 def install_dependencies(self, depends = None):
497 depends = self.get("depends")
500 self.info("Installing dependencies %s" % depends)
501 return self.node.install_packages_command(depends)
503 def build(self, build = None):
505 build = self.get("build")
508 self.info("Building sources ")
510 # replace application specific paths in the command
511 return self.replace_paths(build)
513 def install(self, install = None):
515 install = self.get("install")
518 self.info("Installing sources ")
520 # replace application specific paths in the command
521 return self.replace_paths(install)
524 # Wait until node is associated and deployed
526 if not node or node.state < ResourceState.READY:
527 self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state)
528 self.ec.schedule(self.reschedule_delay, self.deploy)
530 command = self.get("command") or ""
531 self.info("Deploying command '%s' " % command)
535 super(LinuxApplication, self).do_deploy()
538 command = self.get("command")
540 self.info("Starting command '%s'" % command)
543 # If no command was given (i.e. Application was used for dependency
544 # installation), then the application is directly marked as STOPPED
545 super(LinuxApplication, self).set_stopped()
547 if self.in_foreground:
548 self._run_in_foreground()
550 self._run_in_background()
552 super(LinuxApplication, self).do_start()
554 def _run_in_foreground(self):
555 command = self.get("command")
556 sudo = self.get("sudo") or False
557 x11 = self.get("forwardX11")
558 env = self.get("env")
560 # Command will be launched in foreground and attached to the
561 # terminal using the node 'execute' in non blocking mode.
563 # We save the reference to the process in self._proc
564 # to be able to kill the process from the stop method.
565 # We also set blocking = False, since we don't want the
566 # thread to block until the execution finishes.
567 (out, err), self._proc = self.execute_command(command,
573 if self._proc.poll():
574 self.error(msg, out, err)
575 raise RuntimeError, msg
577 def _run_in_background(self):
578 command = self.get("command")
579 env = self.get("env")
580 sudo = self.get("sudo") or False
584 stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
587 # Command will be run as a daemon in baground and detached from any
589 # The command to run was previously uploaded to a bash script
590 # during deployment, now we launch the remote script using 'run'
591 # method from the node.
592 cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
593 (out, err), proc = self.node.run(cmd, self.run_home,
599 # check if execution errors occurred
600 msg = " Failed to start command '%s' " % command
603 self.error(msg, out, err)
604 raise RuntimeError, msg
606 # Wait for pid file to be generated
607 pid, ppid = self.node.wait_pid(self.run_home)
608 if pid: self._pid = int(pid)
609 if ppid: self._ppid = int(ppid)
611 # If the process is not running, check for error information
612 # on the remote machine
613 if not self.pid or not self.ppid:
614 (out, err), proc = self.node.check_errors(self.run_home,
617 # Out is what was written in the stderr file
619 msg = " Failed to start command '%s' " % command
620 self.error(msg, out, err)
621 raise RuntimeError, msg
624 """ Stops application execution
626 command = self.get('command') or ''
628 if self.state == ResourceState.STARTED:
630 self.info("Stopping command '%s' " % command)
632 # If the command is running in foreground (it was launched using
633 # the node 'execute' method), then we use the handler to the Popen
634 # process to kill it. Else we send a kill signal using the pid and ppid
635 # retrieved after running the command with the node 'run' method
639 # Only try to kill the process if the pid and ppid
641 if self.pid and self.ppid:
642 (out, err), proc = self.node.kill(self.pid, self.ppid,
643 sudo = self._sudo_kill)
646 # TODO: check if execution errors occurred
647 if (proc and proc.poll()) or err:
648 msg = " Failed to STOP command '%s' " % self.get("command")
649 self.error(msg, out, err)
652 super(LinuxApplication, self).do_stop()
654 def do_release(self):
655 self.info("Releasing resource")
659 tear_down = self.get("tearDown")
661 self.node.execute(tear_down)
663 hard_release = self.get("hardRelease")
665 self.node.rmdir(self.app_home)
667 super(LinuxApplication, self).do_release()
671 """ Returns the state of the application
673 if self._state == ResourceState.STARTED:
674 if self.in_foreground:
675 # Check if the process we used to execute the command
676 # is still running ...
677 retcode = self._proc.poll()
679 # retcode == None -> running
680 # retcode > 0 -> error
681 # retcode == 0 -> finished
684 msg = " Failed to execute command '%s'" % self.get("command")
685 err = self._proc.stderr.read()
686 self.error(msg, out, err)
692 # We need to query the status of the command we launched in
693 # background. In order to avoid overwhelming the remote host and
694 # the local processor with too many ssh queries, the state is only
695 # requested every 'state_check_delay' seconds.
696 state_check_delay = 0.5
697 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
698 if self.pid and self.ppid:
699 # Make sure the process is still running in background
700 status = self.node.status(self.pid, self.ppid)
702 if status == ProcStatus.FINISHED:
703 # If the program finished, check if execution
705 (out, err), proc = self.node.check_errors(
709 msg = "Failed to execute command '%s'" % \
711 self.error(msg, out, err)
716 self._last_state_check = tnow()
720 def execute_command(self, command,
729 environ = self.node.format_environment(env, inline=True)
730 command = environ + command
731 command = self.replace_paths(command)
733 return self.node.execute(command,
736 forward_x11=forward_x11,
739 def replace_paths(self, command, node=None, app_home=None, run_home=None):
741 Replace all special path tags with shell-escaped actual paths.
747 app_home=self.app_home
750 run_home = self.run_home
753 .replace("${USR}", node.usr_dir)
754 .replace("${LIB}", node.lib_dir)
755 .replace("${BIN}", node.bin_dir)
756 .replace("${SRC}", node.src_dir)
757 .replace("${SHARE}", node.share_dir)
758 .replace("${EXP}", node.exp_dir)
759 .replace("${EXP_HOME}", node.exp_home)
760 .replace("${APP_HOME}", app_home)
761 .replace("${RUN_HOME}", run_home)
762 .replace("${NODE_HOME}", node.node_home)
763 .replace("${HOME}", node.home_dir)
766 def valid_connection(self, guid):