2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sshfuncs import ProcStatus
26 from nepi.util.timefuncs import tnow, tdiffsec
31 # TODO: Resolve wildcards in commands!!
32 # TODO: During provisioning, everything that is not scp could be
33 # uploaded to a same script, http_sources download, etc...
34 # and like that require performing less ssh connections!!!
35 # TODO: Make stdin be a symlink to the original file in ${SHARE}
36 # - later use md5sum to check wether the file needs to be re-upload
40 class LinuxApplication(ResourceManager):
42 .. class:: Class Args :
44 :param ec: The Experiment controller
45 :type ec: ExperimentController
46 :param guid: guid of the RM
51 A LinuxApplication RM represents a process that can be executed in
52 a remote Linux host using SSH.
54 The LinuxApplication RM takes care of uploadin sources and any files
55 needed to run the experiment, to the remote host.
56 It also allows to provide source compilation (build) and installation
57 instructions, and takes care of automating the sources build and
58 installation tasks for the user.
60 It is important to note that files uploaded to the remote host have
61 two possible scopes: single-experiment or multi-experiment.
62 Single experiment files are those that will not be re-used by other
63 experiments. Multi-experiment files are those that will.
64 Sources and shared files are always made available to all experiments.
68 The directory structure used by LinuxApplication RM at the Linux
69 host is the following:
71 ${HOME}/nepi-usr --> Base directory for multi-experiment files
73 ${LIB} |- /lib --> Base directory for libraries
74 ${BIN} |- /bin --> Base directory for binary files
75 ${SRC} |- /src --> Base directory for sources
76 ${SHARE} |- /share --> Base directory for other files
78 ${HOME}/nepi-exp --> Base directory for single-experiment files
80 ${EXP_HOME} |- /<exp-id> --> Base directory for experiment exp-id
82 ${APP_HOME} |- /<app-guid> --> Base directory for application
83 | specific files (e.g. command.sh, input)
85 ${RUN_HOME} |- /<run-id> --> Base directory for run specific
89 _rtype = "LinuxApplication"
92 def _register_attributes(cls):
93 command = Attribute("command", "Command to execute at application start. "
94 "Note that commands will be executed in the ${RUN_HOME} directory, "
95 "make sure to take this into account when using relative paths. ",
96 flags = Flags.ExecReadOnly)
97 forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections",
98 flags = Flags.ExecReadOnly)
99 env = Attribute("env", "Environment variables string for command execution",
100 flags = Flags.ExecReadOnly)
101 sudo = Attribute("sudo", "Run with root privileges",
102 flags = Flags.ExecReadOnly)
103 depends = Attribute("depends",
104 "Space-separated list of packages required to run the application",
105 flags = Flags.ExecReadOnly)
106 sources = Attribute("sources",
107 "Space-separated list of regular files to be uploaded to ${SRC} "
108 "directory prior to building. Archives won't be expanded automatically. "
109 "Sources are globally available for all experiments unless "
110 "cleanHome is set to True (This will delete all sources). ",
111 flags = Flags.ExecReadOnly)
112 files = Attribute("files",
113 "Space-separated list of regular miscellaneous files to be uploaded "
114 "to ${SHARE} directory. "
115 "Files are globally available for all experiments unless "
116 "cleanHome is set to True (This will delete all files). ",
117 flags = Flags.ExecReadOnly)
118 libs = Attribute("libs",
119 "Space-separated list of libraries (e.g. .so files) to be uploaded "
120 "to ${LIB} directory. "
121 "Libraries are globally available for all experiments unless "
122 "cleanHome is set to True (This will delete all files). ",
123 flags = Flags.ExecReadOnly)
124 bins = Attribute("bins",
125 "Space-separated list of binary files to be uploaded "
126 "to ${BIN} directory. "
127 "Binaries are globally available for all experiments unless "
128 "cleanHome is set to True (This will delete all files). ",
129 flags = Flags.ExecReadOnly)
130 code = Attribute("code",
131 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
132 flags = Flags.ExecReadOnly)
133 build = Attribute("build",
134 "Build commands to execute after deploying the sources. "
135 "Sources are uploaded to the ${SRC} directory and code "
136 "is uploaded to the ${APP_HOME} directory. \n"
137 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
138 "./configure && make && make clean.\n"
139 "Make sure to make the build commands return with a nonzero exit "
141 flags = Flags.ReadOnly)
142 install = Attribute("install",
143 "Commands to transfer built files to their final destinations. "
144 "Install commands are executed after build commands. ",
145 flags = Flags.ReadOnly)
146 stdin = Attribute("stdin", "Standard input for the 'command'",
147 flags = Flags.ExecReadOnly)
148 tear_down = Attribute("tearDown", "Command to be executed just before "
149 "releasing the resource",
150 flags = Flags.ReadOnly)
152 cls._register_attribute(command)
153 cls._register_attribute(forward_x11)
154 cls._register_attribute(env)
155 cls._register_attribute(sudo)
156 cls._register_attribute(depends)
157 cls._register_attribute(sources)
158 cls._register_attribute(code)
159 cls._register_attribute(files)
160 cls._register_attribute(bins)
161 cls._register_attribute(libs)
162 cls._register_attribute(build)
163 cls._register_attribute(install)
164 cls._register_attribute(stdin)
165 cls._register_attribute(tear_down)
168 def _register_traces(cls):
169 stdout = Trace("stdout", "Standard output stream")
170 stderr = Trace("stderr", "Standard error stream")
172 cls._register_trace(stdout)
173 cls._register_trace(stderr)
175 def __init__(self, ec, guid):
176 super(LinuxApplication, self).__init__(ec, guid)
179 self._home = "app-%s" % self.guid
180 self._in_foreground = False
182 # keep a reference to the running process handler when
183 # the command is not executed as remote daemon in background
186 # timestamp of last state check of the application
187 self._last_state_check = tnow()
189 def log_message(self, msg):
190 return " guid %d - host %s - %s " % (self.guid,
191 self.node.get("hostname"), msg)
195 node = self.get_connected(LinuxNode.rtype())
196 if node: return node[0]
201 return os.path.join(self.node.exp_home, self._home)
205 return os.path.join(self.app_home, self.ec.run_id)
216 def in_foreground(self):
217 """ Returns True if the command needs to be executed in foreground.
218 This means that command will be executed using 'execute' instead of
219 'run' ('run' executes a command in background and detached from the
222 When using X11 forwarding option, the command can not run in background
223 and detached from a terminal, since we need to keep the terminal attached
226 return self.get("forwardX11") or self._in_foreground
228 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
229 self.info("Retrieving '%s' trace %s " % (name, attr))
231 path = os.path.join(self.run_home, name)
233 command = "(test -f %s && echo 'success') || echo 'error'" % path
234 (out, err), proc = self.node.execute(command)
236 if (err and proc.poll()) or out.find("error") != -1:
237 msg = " Couldn't find trace %s " % name
238 self.error(msg, out, err)
241 if attr == TraceAttr.PATH:
244 if attr == TraceAttr.ALL:
245 (out, err), proc = self.node.check_output(self.run_home, name)
247 if err and proc.poll():
248 msg = " Couldn't read trace %s " % name
249 self.error(msg, out, err)
254 if attr == TraceAttr.STREAM:
255 cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
256 elif attr == TraceAttr.SIZE:
257 cmd = "stat -c%%s %s " % path
259 (out, err), proc = self.node.execute(cmd)
261 if err and proc.poll():
262 msg = " Couldn't find trace %s " % name
263 self.error(msg, out, err)
266 if attr == TraceAttr.SIZE:
267 out = int(out.strip())
272 # create run dir for application
273 self.node.mkdir(self.run_home)
281 self.upload_binaries,
283 self.upload_libraries,
288 # install dependencies
289 self.install_dependencies,
295 # Since provisioning takes a long time, before
296 # each step we check that the EC is still
299 raise RuntimeError, "EC finished"
303 self.upload_start_command()
305 self.info("Provisioning finished")
307 super(LinuxApplication, self).provision()
309 def upload_start_command(self):
310 # Upload command to remote bash script
311 # - only if command can be executed in background and detached
312 command = self.get("command")
314 if command and not self.in_foreground:
315 self.info("Uploading command '%s'" % command)
317 # replace application specific paths in the command
318 command = self.replace_paths(command)
320 # replace application specific paths in the environment
321 env = self.get("env")
322 env = env and self.replace_paths(env)
324 shfile = os.path.join(self.app_home, "start.sh")
326 self.node.upload_command(command,
330 def upload_sources(self):
331 sources = self.get("sources")
334 self.info("Uploading sources ")
336 sources = sources.split(' ')
338 # Separate sources that should be downloaded from
339 # the web, from sources that should be uploaded from
342 for source in list(sources):
343 if source.startswith("http") or source.startswith("https"):
344 # remove the hhtp source from the sources list
345 sources.remove(source)
347 command.append( " ( "
348 # Check if the source already exists
349 " ls ${SRC}/%(basename)s "
351 # If source doesn't exist, download it and check
352 # that it it downloaded ok
353 " wget -c --directory-prefix=${SRC} %(source)s && "
354 " ls ${SRC}/%(basename)s "
356 "basename": os.path.basename(source),
361 command = " && ".join(command)
363 # replace application specific paths in the command
364 command = self.replace_paths(command)
366 # Upload the command to a bash script and run it
367 # in background ( but wait until the command has
368 # finished to continue )
369 self.node.run_and_wait(command, self.run_home,
370 shfile = os.path.join(self.app_home, "http_sources.sh"),
372 pidfile = "http_sources_pidfile",
373 ecodefile = "http_sources_exitcode",
374 stdout = "http_sources_stdout",
375 stderr = "http_sources_stderr")
378 sources = ' '.join(sources)
379 self.node.upload(sources, self.node.src_dir, overwrite = False)
381 def upload_files(self):
382 files = self.get("files")
385 self.info("Uploading files %s " % files)
386 self.node.upload(files, self.node.share_dir, overwrite = False)
388 def upload_libraries(self):
389 libs = self.get("libs")
392 self.info("Uploading libraries %s " % libaries)
393 self.node.upload(libs, self.node.lib_dir, overwrite = False)
395 def upload_binaries(self):
396 bins = self.get("bins")
399 self.info("Uploading binaries %s " % binaries)
400 self.node.upload(bins, self.node.bin_dir, overwrite = False)
402 def upload_code(self):
403 code = self.get("code")
406 self.info("Uploading code")
408 dst = os.path.join(self.app_home, "code")
409 self.node.upload(code, dst, overwrite = False, text = True)
411 def upload_stdin(self):
412 stdin = self.get("stdin")
414 # create dir for sources
415 self.info("Uploading stdin")
417 dst = os.path.join(self.app_home, "stdin")
418 self.node.upload(stdin, dst, overwrite = False, text = True)
420 def install_dependencies(self):
421 depends = self.get("depends")
423 self.info("Installing dependencies %s" % depends)
424 self.node.install_packages(depends, self.app_home, self.run_home)
427 build = self.get("build")
430 self.info("Building sources ")
432 # replace application specific paths in the command
433 command = self.replace_paths(build)
435 # Upload the command to a bash script and run it
436 # in background ( but wait until the command has
437 # finished to continue )
438 self.node.run_and_wait(command, self.run_home,
439 shfile = os.path.join(self.app_home, "build.sh"),
441 pidfile = "build_pidfile",
442 ecodefile = "build_exitcode",
443 stdout = "build_stdout",
444 stderr = "build_stderr")
447 install = self.get("install")
450 self.info("Installing sources ")
452 # replace application specific paths in the command
453 command = self.replace_paths(install)
455 # Upload the command to a bash script and run it
456 # in background ( but wait until the command has
457 # finished to continue )
458 self.node.run_and_wait(command, self.run_home,
459 shfile = os.path.join(self.app_home, "install.sh"),
461 pidfile = "install_pidfile",
462 ecodefile = "install_exitcode",
463 stdout = "install_stdout",
464 stderr = "install_stderr")
467 # Wait until node is associated and deployed
469 if not node or node.state < ResourceState.READY:
470 self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
471 self.ec.schedule(reschedule_delay, self.deploy)
474 command = self.get("command") or ""
475 self.info("Deploying command '%s' " % command)
479 self._state = ResourceState.FAILED
482 super(LinuxApplication, self).deploy()
485 command = self.get("command")
487 self.info("Starting command '%s'" % command)
490 # If no command was given (i.e. Application was used for dependency
491 # installation), then the application is directly marked as FINISHED
492 self._state = ResourceState.FINISHED
495 if self.in_foreground:
496 self._start_in_foreground()
498 self._start_in_background()
500 super(LinuxApplication, self).start()
502 def _start_in_foreground(self):
503 command = self.get("command")
504 sudo = self.get("sudo") or False
505 x11 = self.get("forwardX11")
507 # For a command being executed in foreground, if there is stdin,
508 # it is expected to be text string not a file or pipe
509 stdin = self.get("stdin") or None
511 # Command will be launched in foreground and attached to the
512 # terminal using the node 'execute' in non blocking mode.
515 env = self.get("env")
516 environ = self.node.format_environment(env, inline = True)
517 command = environ + command
518 command = self.replace_paths(command)
520 # We save the reference to the process in self._proc
521 # to be able to kill the process from the stop method.
522 # We also set blocking = False, since we don't want the
523 # thread to block until the execution finishes.
524 (out, err), self._proc = self.node.execute(command,
530 if self._proc.poll():
531 self._state = ResourceState.FAILED
532 self.error(msg, out, err)
533 raise RuntimeError, msg
535 def _start_in_background(self):
536 command = self.get("command")
537 env = self.get("env")
538 sudo = self.get("sudo") or False
542 stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
545 # Command will be run as a daemon in baground and detached from any
547 # The command to run was previously uploaded to a bash script
548 # during deployment, now we launch the remote script using 'run'
549 # method from the node.
550 cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
551 (out, err), proc = self.node.run(cmd, self.run_home,
557 # check if execution errors occurred
558 msg = " Failed to start command '%s' " % command
561 self._state = ResourceState.FAILED
562 self.error(msg, out, err)
563 raise RuntimeError, msg
565 # Wait for pid file to be generated
566 pid, ppid = self.node.wait_pid(self.run_home)
567 if pid: self._pid = int(pid)
568 if ppid: self._ppid = int(ppid)
570 # If the process is not running, check for error information
571 # on the remote machine
572 if not self.pid or not self.ppid:
573 (out, err), proc = self.node.check_errors(self.run_home,
576 # Out is what was written in the stderr file
578 self._state = ResourceState.FAILED
579 msg = " Failed to start command '%s' " % command
580 self.error(msg, out, err)
581 raise RuntimeError, msg
584 """ Stops application execution
586 command = self.get('command') or ''
588 if self.state == ResourceState.STARTED:
591 self.info("Stopping command '%s'" % command)
593 # If the command is running in foreground (it was launched using
594 # the node 'execute' method), then we use the handler to the Popen
595 # process to kill it. Else we send a kill signal using the pid and ppid
596 # retrieved after running the command with the node 'run' method
601 # Only try to kill the process if the pid and ppid
603 if self.pid and self.ppid:
604 (out, err), proc = self.node.kill(self.pid, self.ppid)
607 # check if execution errors occurred
608 msg = " Failed to STOP command '%s' " % self.get("command")
609 self.error(msg, out, err)
610 self._state = ResourceState.FAILED
614 super(LinuxApplication, self).stop()
617 self.info("Releasing resource")
619 tear_down = self.get("tearDown")
621 self.node.execute(tear_down)
625 if self.state == ResourceState.STOPPED:
626 super(LinuxApplication, self).release()
630 """ Returns the state of the application
632 if self._state == ResourceState.STARTED:
633 if self.in_foreground:
634 # Check if the process we used to execute the command
635 # is still running ...
636 retcode = self._proc.poll()
638 # retcode == None -> running
639 # retcode > 0 -> error
640 # retcode == 0 -> finished
643 msg = " Failed to execute command '%s'" % self.get("command")
644 err = self._proc.stderr.read()
645 self.error(msg, out, err)
646 self._state = ResourceState.FAILED
648 self._state = ResourceState.FINISHED
651 # We need to query the status of the command we launched in
652 # background. In oredr to avoid overwhelming the remote host and
653 # the local processor with too many ssh queries, the state is only
654 # requested every 'state_check_delay' seconds.
655 state_check_delay = 0.5
656 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
657 # check if execution errors occurred
658 (out, err), proc = self.node.check_errors(self.run_home)
661 msg = " Failed to execute command '%s'" % self.get("command")
662 self.error(msg, out, err)
663 self._state = ResourceState.FAILED
665 elif self.pid and self.ppid:
666 # No execution errors occurred. Make sure the background
667 # process with the recorded pid is still running.
668 status = self.node.status(self.pid, self.ppid)
670 if status == ProcStatus.FINISHED:
671 self._state = ResourceState.FINISHED
673 self._last_state_check = tnow()
677 def replace_paths(self, command):
679 Replace all special path tags with shell-escaped actual paths.
682 .replace("${USR}", self.node.usr_dir)
683 .replace("${LIB}", self.node.lib_dir)
684 .replace("${BIN}", self.node.bin_dir)
685 .replace("${SRC}", self.node.src_dir)
686 .replace("${SHARE}", self.node.share_dir)
687 .replace("${EXP}", self.node.exp_dir)
688 .replace("${EXP_HOME}", self.node.exp_home)
689 .replace("${APP_HOME}", self.app_home)
690 .replace("${RUN_HOME}", self.run_home)
691 .replace("${NODE_HOME}", self.node.node_home)
692 .replace("${HOME}", self.node.home_dir)
695 def valid_connection(self, guid):