2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sshfuncs import ProcStatus
25 from nepi.util.timefuncs import strfnow, strfdiff
29 # TODO: Resolve wildcards in commands!!
33 class LinuxApplication(ResourceManager):
34 _rtype = "LinuxApplication"
37 def _register_attributes(cls):
38 command = Attribute("command", "Command to execute",
39 flags = Flags.ExecReadOnly)
40 forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections",
41 flags = Flags.ExecReadOnly)
42 env = Attribute("env", "Environment variables string for command execution",
43 flags = Flags.ExecReadOnly)
44 sudo = Attribute("sudo", "Run with root privileges",
45 flags = Flags.ExecReadOnly)
46 depends = Attribute("depends",
47 "Space-separated list of packages required to run the application",
48 flags = Flags.ExecReadOnly)
49 sources = Attribute("sources",
50 "Space-separated list of regular files to be deployed in the working "
51 "path prior to building. Archives won't be expanded automatically.",
52 flags = Flags.ExecReadOnly)
53 code = Attribute("code",
54 "Plain text source code to be uploaded to the server. It will be stored "
55 "under ${SOURCES}/code",
56 flags = Flags.ExecReadOnly)
57 build = Attribute("build",
58 "Build commands to execute after deploying the sources. "
59 "Sources will be in the ${SOURCES} folder. "
60 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
61 "Try to make the commands return with a nonzero exit code on error.\n"
62 "Also, do not install any programs here, use the 'install' attribute. This will "
63 "help keep the built files constrained to the build folder (which may "
64 "not be the home folder), and will result in faster deployment. Also, "
65 "make sure to clean up temporary files, to reduce bandwidth usage between "
66 "nodes when transferring built packages.",
67 flags = Flags.ReadOnly)
68 install = Attribute("install",
69 "Commands to transfer built files to their final destinations. "
70 "Sources will be in the initial working folder, and a special "
71 "tag ${SOURCES} can be used to reference the experiment's "
72 "home folder (where the application commands will run).\n"
73 "ALL sources and targets needed for execution must be copied there, "
74 "if building has been enabled.\n"
75 "That is, 'slave' nodes will not automatically get any source files. "
76 "'slave' nodes don't get build dependencies either, so if you need "
77 "make and other tools to install, be sure to provide them as "
78 "actual dependencies instead.",
79 flags = Flags.ReadOnly)
80 stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
81 stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
82 stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
83 tear_down = Attribute("tearDown", "Bash script to be executed before "
84 "releasing the resource",
85 flags = Flags.ReadOnly)
87 cls._register_attribute(command)
88 cls._register_attribute(forward_x11)
89 cls._register_attribute(env)
90 cls._register_attribute(sudo)
91 cls._register_attribute(depends)
92 cls._register_attribute(sources)
93 cls._register_attribute(code)
94 cls._register_attribute(build)
95 cls._register_attribute(install)
96 cls._register_attribute(stdin)
97 cls._register_attribute(stdout)
98 cls._register_attribute(stderr)
99 cls._register_attribute(tear_down)
102 def _register_traces(cls):
103 stdout = Trace("stdout", "Standard output stream")
104 stderr = Trace("stderr", "Standard error stream")
106 cls._register_trace(stdout)
107 cls._register_trace(stderr)
109 def __init__(self, ec, guid):
110 super(LinuxApplication, self).__init__(ec, guid)
113 self._home = "app-%s" % self.guid
115 # keep a reference to the running process handler when
116 # the command is not executed as remote daemon in background
119 # timestamp of last state check of the application
120 self._last_state_check = strfnow()
122 def log_message(self, msg):
123 return " guid %d - host %s - %s " % (self.guid,
124 self.node.get("hostname"), msg)
128 node = self.get_connected(LinuxNode.rtype())
129 if node: return node[0]
134 return os.path.join(self.node.exp_home, self._home)
138 return os.path.join(self.app_home, 'src')
142 return os.path.join(self.app_home, 'build')
153 def in_foreground(self):
154 """ Returns True is the command needs to be executed in foreground.
155 This means that command will be executed using 'execute' instead of
158 When using X11 forwarding option, the command can not run in background
159 and detached from a terminal in the remote host, since we need to keep
160 the SSH connection to receive graphical data
162 return self.get("forwardX11") or False
164 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
165 self.info("Retrieving '%s' trace %s " % (name, attr))
167 path = os.path.join(self.app_home, name)
169 command = "(test -f %s && echo 'success') || echo 'error'" % path
170 (out, err), proc = self.node.execute(command)
172 if (err and proc.poll()) or out.find("error") != -1:
173 msg = " Couldn't find trace %s " % name
174 self.error(msg, out, err)
177 if attr == TraceAttr.PATH:
180 if attr == TraceAttr.ALL:
181 (out, err), proc = self.node.check_output(self.app_home, name)
183 if err and proc.poll():
184 msg = " Couldn't read trace %s " % name
185 self.error(msg, out, err)
190 if attr == TraceAttr.STREAM:
191 cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
192 elif attr == TraceAttr.SIZE:
193 cmd = "stat -c%%s %s " % path
195 (out, err), proc = self.node.execute(cmd)
197 if err and proc.poll():
198 msg = " Couldn't find trace %s " % name
199 self.error(msg, out, err)
202 if attr == TraceAttr.SIZE:
203 out = int(out.strip())
208 # create home dir for application
209 self.node.mkdir(self.app_home)
212 self.upload_sources()
220 # install dependencies
221 self.install_dependencies()
229 # Upload command to remote bash script
230 # - only if command can be executed in background and detached
231 command = self.get("command")
233 if command and not self.in_foreground:
234 self.info("Uploading command '%s'" % command)
236 # replace application specific paths in the command
237 command = self.replace_paths(command)
239 # replace application specific paths in the environment
240 env = self.get("env")
241 env = env and self.replace_paths(env)
243 self.node.upload_command(command, self.app_home,
247 self.info("Provisioning finished")
249 super(LinuxApplication, self).provision()
251 def upload_sources(self):
252 sources = self.get("sources")
254 self.info("Uploading sources ")
256 # create dir for sources
257 self.node.mkdir(self.src_dir)
259 sources = sources.split(' ')
261 http_sources = list()
262 for source in list(sources):
263 if source.startswith("http") or source.startswith("https"):
264 http_sources.append(source)
265 sources.remove(source)
267 # Download http sources remotely
269 command = [" wget -c --directory-prefix=${SOURCES} "]
272 for source in http_sources:
273 command.append(" %s " % (source))
274 check.append(" ls ${SOURCES}/%s " % os.path.basename(source))
276 command = " ".join(command)
277 check = " ; ".join(check)
279 # Append the command to check that the sources were downloaded
280 command += " ; %s " % check
282 # replace application specific paths in the command
283 command = self.replace_paths(command)
285 # Upload the command to a bash script and run it
286 # in background ( but wait until the command has
287 # finished to continue )
288 self.node.run_and_wait(command, self.app_home,
289 shfile = "http_sources.sh",
290 pidfile = "http_sources_pidfile",
291 ecodefile = "http_sources_exitcode",
292 stdout = "http_sources_stdout",
293 stderr = "http_sources_stderr")
296 self.node.upload(sources, self.src_dir)
298 def upload_code(self):
299 code = self.get("code")
301 # create dir for sources
302 self.node.mkdir(self.src_dir)
304 self.info("Uploading code ")
306 dst = os.path.join(self.src_dir, "code")
307 self.node.upload(sources, dst, text = True)
309 def upload_stdin(self):
310 stdin = self.get("stdin")
312 # create dir for sources
313 self.info(" Uploading stdin ")
315 dst = os.path.join(self.app_home, "stdin")
316 self.node.upload(stdin, dst, text = True)
318 def install_dependencies(self):
319 depends = self.get("depends")
321 self.info("Installing dependencies %s" % depends)
322 self.node.install_packages(depends, self.app_home)
325 build = self.get("build")
327 self.info("Building sources ")
329 # create dir for build
330 self.node.mkdir(self.build_dir)
332 # replace application specific paths in the command
333 command = self.replace_paths(build)
335 # Upload the command to a bash script and run it
336 # in background ( but wait until the command has
337 # finished to continue )
338 self.node.run_and_wait(command, self.app_home,
340 pidfile = "build_pidfile",
341 ecodefile = "build_exitcode",
342 stdout = "build_stdout",
343 stderr = "build_stderr")
346 install = self.get("install")
348 self.info("Installing sources ")
350 # replace application specific paths in the command
351 command = self.replace_paths(install)
353 # Upload the command to a bash script and run it
354 # in background ( but wait until the command has
355 # finished to continue )
356 self.node.run_and_wait(command, self.app_home,
357 shfile = "install.sh",
358 pidfile = "install_pidfile",
359 ecodefile = "install_exitcode",
360 stdout = "install_stdout",
361 stderr = "install_stderr")
364 # Wait until node is associated and deployed
366 if not node or node.state < ResourceState.READY:
367 self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
369 reschedule_delay = "0.5s"
370 self.ec.schedule(reschedule_delay, self.deploy)
373 command = self.get("command") or ""
374 self.info("Deploying command '%s' " % command)
378 self._state = ResourceState.FAILED
381 super(LinuxApplication, self).deploy()
384 command = self.get("command")
385 env = self.get("env")
386 stdin = "stdin" if self.get("stdin") else None
387 stdout = "stdout" if self.get("stdout") else "stdout"
388 stderr = "stderr" if self.get("stderr") else "stderr"
389 sudo = self.get("sudo") or False
392 self.info("Starting command '%s'" % command)
394 if self.in_foreground:
395 # If command should be ran in foreground, we invoke
396 # the node 'execute' method
398 msg = "No command is defined but X11 forwarding has been set"
400 self._state = ResourceState.FAILED
401 raise RuntimeError, msg
404 environ = "\n".join(map(lambda e: "export %s" % e, env.split(" ")))\
407 command = environ + command
408 command = self.replace_paths(command)
410 x11 = self.get("forwardX11")
412 # We save the reference to the process in self._proc
413 # to be able to kill the process from the stop method
414 (out, err), self._proc = self.node.execute(command,
420 if self._proc.poll():
422 err = self._proc.stderr.read()
423 self._state = ResourceState.FAILED
424 self.error(msg, out, err)
425 raise RuntimeError, msg
427 super(LinuxApplication, self).start()
430 # If command is set (i.e. application not used only for dependency
431 # installation), and it does not need to run in foreground, we use
432 # the 'run' method of the node to launch the application as a daemon
434 # The real command to execute was previously uploaded to a remote bash
435 # script during deployment, now run the remote script using 'run' method
437 cmd = "bash ./app.sh"
438 (out, err), proc = self.node.run(cmd, self.app_home,
444 # check if execution errors occurred
445 msg = " Failed to start command '%s' " % command
447 if proc.poll() and err:
448 self.error(msg, out, err)
449 raise RuntimeError, msg
451 # Check status of process running in background
452 pid, ppid = self.node.wait_pid(self.app_home)
453 if pid: self._pid = int(pid)
454 if ppid: self._ppid = int(ppid)
456 # If the process is not running, check for error information
457 # on the remote machine
458 if not self.pid or not self.ppid:
459 (out, err), proc = self.node.check_output(self.app_home, 'stderr')
460 self.error(msg, out, err)
462 msg2 = " Setting state to Failed"
464 self._state = ResourceState.FAILED
466 raise RuntimeError, msg
468 super(LinuxApplication, self).start()
471 # If no command was given (i.e. Application was used for dependency
472 # installation), then the application is directly marked as FINISHED
473 self._state = ResourceState.FINISHED
476 """ Stops application execution
478 command = self.get('command') or ''
481 if state == ResourceState.STARTED:
484 self.info("Stopping command '%s'" % command)
486 # If the command is running in foreground (it was launched using
487 # the node 'execute' method), then we use the handler to the Popen
488 # process to kill it. Else we send a kill signal using the pid and ppid
489 # retrieved after running the command with the node 'run' method
494 (out, err), proc = self.node.kill(self.pid, self.ppid)
497 # check if execution errors occurred
498 msg = " Failed to STOP command '%s' " % self.get("command")
499 self.error(msg, out, err)
500 self._state = ResourceState.FAILED
504 super(LinuxApplication, self).stop()
507 self.info("Releasing resource")
509 tear_down = self.get("tearDown")
511 self.node.execute(tear_down)
514 if self.state == ResourceState.STOPPED:
515 super(LinuxApplication, self).release()
519 if self._state == ResourceState.STARTED:
520 if self.in_foreground:
521 retcode = self._proc.poll()
523 # retcode == None -> running
524 # retcode > 0 -> error
525 # retcode == 0 -> finished
528 err = self._proc.stderr.read()
529 self._state = ResourceState.FAILED
530 self.error(msg, out, err)
532 self._state = ResourceState.FINISHED
535 # To avoid overwhelming the remote hosts and the local processor
536 # with too many ssh queries, the state is only requested
537 # every 'state_check_delay' seconds.
538 state_check_delay = 0.5
539 if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
540 # check if execution errors occurred
541 (out, err), proc = self.node.check_errors(self.app_home)
544 if err.find("No such file or directory") >= 0 :
545 # The resource is marked as started, but the
546 # command was not yet executed
547 return ResourceState.READY
549 msg = " Failed to execute command '%s'" % self.get("command")
550 self.error(msg, out, err)
551 self._state = ResourceState.FAILED
553 elif self.pid and self.ppid:
554 status = self.node.status(self.pid, self.ppid)
556 if status == ProcStatus.FINISHED:
557 self._state = ResourceState.FINISHED
559 self._last_state_check = strfnow()
563 def replace_paths(self, command):
565 Replace all special path tags with shell-escaped actual paths.
568 return d if d.startswith("/") else os.path.join("${HOME}", d)
571 .replace("${SOURCES}", absolute_dir(self.src_dir))
572 .replace("${BUILD}", absolute_dir(self.build_dir))
573 .replace("${APP_HOME}", absolute_dir(self.app_home))
574 .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
575 .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
578 def valid_connection(self, guid):