2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sshfuncs import ProcStatus
26 from nepi.util.timefuncs import tnow, tdiffsec
31 # TODO: Resolve wildcards in commands!!
32 # TODO: During provisioning, everything that is not scp could be
33 # uploaded to a same script, http_sources download, etc...
34 # and like that require performing less ssh connections!!!
38 class LinuxApplication(ResourceManager):
40 .. class:: Class Args :
42 :param ec: The Experiment controller
43 :type ec: ExperimentController
44 :param guid: guid of the RM
49 A LinuxApplication RM represents a process that can be executed in
50 a remote Linux host using SSH.
52 The LinuxApplication RM takes care of uploadin sources and any files
53 needed to run the experiment, to the remote host.
54 It also allows to provide source compilation (build) and installation
55 instructions, and takes care of automating the sources build and
56 installation tasks for the user.
58 It is important to note that files uploaded to the remote host have
59 two possible scopes: single-experiment or multi-experiment.
60 Single experiment files are those that will not be re-used by other
61 experiments. Multi-experiment files are those that will.
62 Sources and shared files are always made available to all experiments.
66 The directory structure used by LinuxApplication RM at the Linux
67 host is the following:
69 ${HOME}/nepi-usr --> Base directory for multi-experiment files
71 ${LIB} |- /lib --> Base directory for libraries
72 ${BIN} |- /bin --> Base directory for binary files
73 ${SRC} |- /src --> Base directory for sources
74 ${SHARE} |- /share --> Base directory for other files
76 ${HOME}/nepi-exp --> Base directory for single-experiment files
78 ${EXP_HOME} |- /<exp-id> --> Base directory for experiment exp-id
80 ${APP_HOME} |- /<app-guid> --> Base directory for application
81 | specific files (e.g. command.sh, input)
83 ${RUN_HOME} |- /<run-id> --> Base directory for run specific
87 _rtype = "LinuxApplication"
90 def _register_attributes(cls):
91 command = Attribute("command", "Command to execute at application start. "
92 "Note that commands will be executed in the ${RUN_HOME} directory, "
93 "make sure to take this into account when using relative paths. ",
94 flags = Flags.ExecReadOnly)
95 forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections",
96 flags = Flags.ExecReadOnly)
97 env = Attribute("env", "Environment variables string for command execution",
98 flags = Flags.ExecReadOnly)
99 sudo = Attribute("sudo", "Run with root privileges",
100 flags = Flags.ExecReadOnly)
101 depends = Attribute("depends",
102 "Space-separated list of packages required to run the application",
103 flags = Flags.ExecReadOnly)
104 sources = Attribute("sources",
105 "Space-separated list of regular files to be uploaded to ${SRC} "
106 "directory prior to building. Archives won't be expanded automatically. "
107 "Sources are globally available for all experiments unless "
108 "cleanHome is set to True (This will delete all sources). ",
109 flags = Flags.ExecReadOnly)
110 files = Attribute("files",
111 "Space-separated list of regular miscellaneous files to be uploaded "
112 "to ${SHARE} directory. "
113 "Files are globally available for all experiments unless "
114 "cleanHome is set to True (This will delete all files). ",
115 flags = Flags.ExecReadOnly)
116 libs = Attribute("libs",
117 "Space-separated list of libraries (e.g. .so files) to be uploaded "
118 "to ${LIB} directory. "
119 "Libraries are globally available for all experiments unless "
120 "cleanHome is set to True (This will delete all files). ",
121 flags = Flags.ExecReadOnly)
122 bins = Attribute("bins",
123 "Space-separated list of binary files to be uploaded "
124 "to ${BIN} directory. "
125 "Binaries are globally available for all experiments unless "
126 "cleanHome is set to True (This will delete all files). ",
127 flags = Flags.ExecReadOnly)
128 code = Attribute("code",
129 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
130 flags = Flags.ExecReadOnly)
131 build = Attribute("build",
132 "Build commands to execute after deploying the sources. "
133 "Sources are uploaded to the ${SRC} directory and code "
134 "is uploaded to the ${APP_HOME} directory. \n"
135 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
136 "./configure && make && make clean.\n"
137 "Make sure to make the build commands return with a nonzero exit "
139 flags = Flags.ReadOnly)
140 install = Attribute("install",
141 "Commands to transfer built files to their final destinations. "
142 "Install commands are executed after build commands. ",
143 flags = Flags.ReadOnly)
144 stdin = Attribute("stdin", "Standard input for the 'command'",
145 flags = Flags.ExecReadOnly)
146 tear_down = Attribute("tearDown", "Command to be executed just before "
147 "releasing the resource",
148 flags = Flags.ReadOnly)
150 cls._register_attribute(command)
151 cls._register_attribute(forward_x11)
152 cls._register_attribute(env)
153 cls._register_attribute(sudo)
154 cls._register_attribute(depends)
155 cls._register_attribute(sources)
156 cls._register_attribute(code)
157 cls._register_attribute(files)
158 cls._register_attribute(bins)
159 cls._register_attribute(libs)
160 cls._register_attribute(build)
161 cls._register_attribute(install)
162 cls._register_attribute(stdin)
163 cls._register_attribute(tear_down)
166 def _register_traces(cls):
167 stdout = Trace("stdout", "Standard output stream")
168 stderr = Trace("stderr", "Standard error stream")
170 cls._register_trace(stdout)
171 cls._register_trace(stderr)
173 def __init__(self, ec, guid):
174 super(LinuxApplication, self).__init__(ec, guid)
177 self._home = "app-%s" % self.guid
178 self._in_foreground = False
180 # keep a reference to the running process handler when
181 # the command is not executed as remote daemon in background
184 # timestamp of last state check of the application
185 self._last_state_check = tnow()
187 def log_message(self, msg):
188 return " guid %d - host %s - %s " % (self.guid,
189 self.node.get("hostname"), msg)
193 node = self.get_connected(LinuxNode.rtype())
194 if node: return node[0]
199 return os.path.join(self.node.exp_home, self._home)
203 return os.path.join(self.app_home, self.ec.run_id)
214 def in_foreground(self):
215 """ Returns True if the command needs to be executed in foreground.
216 This means that command will be executed using 'execute' instead of
217 'run' ('run' executes a command in background and detached from the
220 When using X11 forwarding option, the command can not run in background
221 and detached from a terminal, since we need to keep the terminal attached
224 return self.get("forwardX11") or self._in_foreground
226 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
227 self.info("Retrieving '%s' trace %s " % (name, attr))
229 path = os.path.join(self.run_home, name)
231 command = "(test -f %s && echo 'success') || echo 'error'" % path
232 (out, err), proc = self.node.execute(command)
234 if (err and proc.poll()) or out.find("error") != -1:
235 msg = " Couldn't find trace %s " % name
236 self.error(msg, out, err)
239 if attr == TraceAttr.PATH:
242 if attr == TraceAttr.ALL:
243 (out, err), proc = self.node.check_output(self.run_home, name)
245 if err and proc.poll():
246 msg = " Couldn't read trace %s " % name
247 self.error(msg, out, err)
252 if attr == TraceAttr.STREAM:
253 cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
254 elif attr == TraceAttr.SIZE:
255 cmd = "stat -c%%s %s " % path
257 (out, err), proc = self.node.execute(cmd)
259 if err and proc.poll():
260 msg = " Couldn't find trace %s " % name
261 self.error(msg, out, err)
264 if attr == TraceAttr.SIZE:
265 out = int(out.strip())
270 # create run dir for application
271 self.node.mkdir(self.run_home)
279 self.upload_binaries,
281 self.upload_libraries,
286 # install dependencies
287 self.install_dependencies,
293 # Since provisioning takes a long time, before
294 # each step we check that the EC is still
297 raise RuntimeError, "EC finished"
301 # Upload command to remote bash script
302 # - only if command can be executed in background and detached
303 command = self.get("command")
305 if command and not self.in_foreground:
306 self.info("Uploading command '%s'" % command)
308 # replace application specific paths in the command
309 command = self.replace_paths(command)
311 # replace application specific paths in the environment
312 env = self.get("env")
313 env = env and self.replace_paths(env)
315 shfile = os.path.join(self.app_home, "app.sh")
317 self.node.upload_command(command,
321 self.info("Provisioning finished")
323 super(LinuxApplication, self).provision()
325 def upload_sources(self):
326 sources = self.get("sources")
329 self.info("Uploading sources ")
331 sources = sources.split(' ')
333 # Separate sources that should be downloaded from
334 # the web, from sources that should be uploaded from
337 for source in list(sources):
338 if source.startswith("http") or source.startswith("https"):
339 # remove the hhtp source from the sources list
340 sources.remove(source)
342 command.append( " ( "
343 # Check if the source already exists
344 " ls ${SRC}/%(basename)s "
346 # If source doesn't exist, download it and check
347 # that it it downloaded ok
348 " wget -c --directory-prefix=${SRC} %(source)s && "
349 " ls ${SRC}/%(basename)s "
351 "basename": os.path.basename(source),
356 command = " && ".join(command)
358 # replace application specific paths in the command
359 command = self.replace_paths(command)
361 # Upload the command to a bash script and run it
362 # in background ( but wait until the command has
363 # finished to continue )
364 self.node.run_and_wait(command, self.run_home,
365 shfile = os.path.join(self.app_home, "http_sources.sh"),
367 pidfile = "http_sources_pidfile",
368 ecodefile = "http_sources_exitcode",
369 stdout = "http_sources_stdout",
370 stderr = "http_sources_stderr")
373 sources = ' '.join(sources)
374 self.node.upload(sources, self.node.src_dir, overwrite = False)
376 def upload_files(self):
377 files = self.get("files")
380 self.info("Uploading files %s " % files)
381 self.node.upload(files, self.node.share_dir, overwrite = False)
383 def upload_libraries(self):
384 libs = self.get("libs")
387 self.info("Uploading libraries %s " % libaries)
388 self.node.upload(libs, self.node.lib_dir, overwrite = False)
390 def upload_binaries(self):
391 bins = self.get("bins")
394 self.info("Uploading binaries %s " % binaries)
395 self.node.upload(bins, self.node.bin_dir, overwrite = False)
397 def upload_code(self):
398 code = self.get("code")
401 self.info("Uploading code")
403 dst = os.path.join(self.app_home, "code")
404 self.node.upload(code, dst, overwrite = False, text = True)
406 def upload_stdin(self):
407 stdin = self.get("stdin")
409 # create dir for sources
410 self.info("Uploading stdin")
412 dst = os.path.join(self.app_home, "stdin")
413 self.node.upload(stdin, dst, overwrite = False, text = True)
415 def install_dependencies(self):
416 depends = self.get("depends")
418 self.info("Installing dependencies %s" % depends)
419 self.node.install_packages(depends, self.app_home, self.run_home)
422 build = self.get("build")
425 self.info("Building sources ")
427 # replace application specific paths in the command
428 command = self.replace_paths(build)
430 # Upload the command to a bash script and run it
431 # in background ( but wait until the command has
432 # finished to continue )
433 self.node.run_and_wait(command, self.run_home,
434 shfile = os.path.join(self.app_home, "build.sh"),
436 pidfile = "build_pidfile",
437 ecodefile = "build_exitcode",
438 stdout = "build_stdout",
439 stderr = "build_stderr")
442 install = self.get("install")
445 self.info("Installing sources ")
447 # replace application specific paths in the command
448 command = self.replace_paths(install)
450 # Upload the command to a bash script and run it
451 # in background ( but wait until the command has
452 # finished to continue )
453 self.node.run_and_wait(command, self.run_home,
454 shfile = os.path.join(self.app_home, "install.sh"),
456 pidfile = "install_pidfile",
457 ecodefile = "install_exitcode",
458 stdout = "install_stdout",
459 stderr = "install_stderr")
462 # Wait until node is associated and deployed
464 if not node or node.state < ResourceState.READY:
465 self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
466 self.ec.schedule(reschedule_delay, self.deploy)
469 command = self.get("command") or ""
470 self.info("Deploying command '%s' " % command)
474 self._state = ResourceState.FAILED
477 super(LinuxApplication, self).deploy()
480 command = self.get("command")
482 self.info("Starting command '%s'" % command)
485 # If no command was given (i.e. Application was used for dependency
486 # installation), then the application is directly marked as FINISHED
487 self._state = ResourceState.FINISHED
490 if self.in_foreground:
491 self._start_in_foreground()
493 self._start_in_background()
495 super(LinuxApplication, self).start()
497 def _start_in_foreground(self):
498 command = self.get("command")
499 sudo = self.get("sudo") or False
500 x11 = self.get("forwardX11")
502 # For a command being executed in foreground, if there is stdin,
503 # it is expected to be text string not a file or pipe
504 stdin = self.get("stdin") or None
506 # Command will be launched in foreground and attached to the
507 # terminal using the node 'execute' in non blocking mode.
510 env = self.get("env")
511 environ = self.node.format_environment(env, inline = True)
512 command = environ + command
513 command = self.replace_paths(command)
515 # We save the reference to the process in self._proc
516 # to be able to kill the process from the stop method.
517 # We also set blocking = False, since we don't want the
518 # thread to block until the execution finishes.
519 (out, err), self._proc = self.node.execute(command,
525 if self._proc.poll():
526 self._state = ResourceState.FAILED
527 self.error(msg, out, err)
528 raise RuntimeError, msg
530 def _start_in_background(self):
531 command = self.get("command")
532 env = self.get("env")
533 sudo = self.get("sudo") or False
537 stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
540 # Command will be run as a daemon in baground and detached from any
542 # The command to run was previously uploaded to a bash script
543 # during deployment, now we launch the remote script using 'run'
544 # method from the node.
545 cmd = "bash %s" % os.path.join(self.app_home, "app.sh")
546 (out, err), proc = self.node.run(cmd, self.run_home,
552 # check if execution errors occurred
553 msg = " Failed to start command '%s' " % command
556 self._state = ResourceState.FAILED
557 self.error(msg, out, err)
558 raise RuntimeError, msg
560 # Wait for pid file to be generated
561 pid, ppid = self.node.wait_pid(self.run_home)
562 if pid: self._pid = int(pid)
563 if ppid: self._ppid = int(ppid)
565 # If the process is not running, check for error information
566 # on the remote machine
567 if not self.pid or not self.ppid:
568 (out, err), proc = self.node.check_errors(self.run_home,
571 # Out is what was written in the stderr file
573 self._state = ResourceState.FAILED
574 msg = " Failed to start command '%s' " % command
575 self.error(msg, out, err)
576 raise RuntimeError, msg
579 """ Stops application execution
581 command = self.get('command') or ''
583 if self.state == ResourceState.STARTED:
586 self.info("Stopping command '%s'" % command)
588 # If the command is running in foreground (it was launched using
589 # the node 'execute' method), then we use the handler to the Popen
590 # process to kill it. Else we send a kill signal using the pid and ppid
591 # retrieved after running the command with the node 'run' method
596 # Only try to kill the process if the pid and ppid
598 if self.pid and self.ppid:
599 (out, err), proc = self.node.kill(self.pid, self.ppid)
602 # check if execution errors occurred
603 msg = " Failed to STOP command '%s' " % self.get("command")
604 self.error(msg, out, err)
605 self._state = ResourceState.FAILED
609 super(LinuxApplication, self).stop()
612 self.info("Releasing resource")
614 tear_down = self.get("tearDown")
616 self.node.execute(tear_down)
620 if self.state == ResourceState.STOPPED:
621 super(LinuxApplication, self).release()
625 """ Returns the state of the application
627 if self._state == ResourceState.STARTED:
628 if self.in_foreground:
629 # Check if the process we used to execute the command
630 # is still running ...
631 retcode = self._proc.poll()
633 # retcode == None -> running
634 # retcode > 0 -> error
635 # retcode == 0 -> finished
638 msg = " Failed to execute command '%s'" % self.get("command")
639 err = self._proc.stderr.read()
640 self.error(msg, out, err)
641 self._state = ResourceState.FAILED
643 self._state = ResourceState.FINISHED
646 # We need to query the status of the command we launched in
647 # background. In oredr to avoid overwhelming the remote host and
648 # the local processor with too many ssh queries, the state is only
649 # requested every 'state_check_delay' seconds.
650 state_check_delay = 0.5
651 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
652 # check if execution errors occurred
653 (out, err), proc = self.node.check_errors(self.run_home)
656 msg = " Failed to execute command '%s'" % self.get("command")
657 self.error(msg, out, err)
658 self._state = ResourceState.FAILED
660 elif self.pid and self.ppid:
661 # No execution errors occurred. Make sure the background
662 # process with the recorded pid is still running.
663 status = self.node.status(self.pid, self.ppid)
665 if status == ProcStatus.FINISHED:
666 self._state = ResourceState.FINISHED
668 self._last_state_check = tnow()
672 def replace_paths(self, command):
674 Replace all special path tags with shell-escaped actual paths.
677 .replace("${USR}", self.node.usr_dir)
678 .replace("${LIB}", self.node.lib_dir)
679 .replace("${BIN}", self.node.bin_dir)
680 .replace("${SRC}", self.node.src_dir)
681 .replace("${SHARE}", self.node.share_dir)
682 .replace("${EXP}", self.node.exp_dir)
683 .replace("${EXP_HOME}", self.node.exp_home)
684 .replace("${APP_HOME}", self.app_home)
685 .replace("${RUN_HOME}", self.run_home)
686 .replace("${NODE_HOME}", self.node.node_home)
687 .replace("${HOME}", self.node.home_dir)
690 def valid_connection(self, guid):