2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sshfuncs import ProcStatus
25 from nepi.util.timefuncs import strfnow, strfdiff
29 # TODO: Resolve wildcards in commands!!
33 class LinuxApplication(ResourceManager):
34 _rtype = "LinuxApplication"
37 def _register_attributes(cls):
38 command = Attribute("command", "Command to execute",
39 flags = Flags.ExecReadOnly)
40 forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections",
41 flags = Flags.ExecReadOnly)
42 env = Attribute("env", "Environment variables string for command execution",
43 flags = Flags.ExecReadOnly)
44 sudo = Attribute("sudo", "Run with root privileges",
45 flags = Flags.ExecReadOnly)
46 depends = Attribute("depends",
47 "Space-separated list of packages required to run the application",
48 flags = Flags.ExecReadOnly)
49 sources = Attribute("sources",
50 "Space-separated list of regular files to be deployed in the working "
51 "path prior to building. Archives won't be expanded automatically.",
52 flags = Flags.ExecReadOnly)
53 code = Attribute("code",
54 "Plain text source code to be uploaded to the server. It will be stored "
55 "under ${SOURCES}/code",
56 flags = Flags.ExecReadOnly)
57 build = Attribute("build",
58 "Build commands to execute after deploying the sources. "
59 "Sources will be in the ${SOURCES} folder. "
60 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
61 "Try to make the commands return with a nonzero exit code on error.\n"
62 "Also, do not install any programs here, use the 'install' attribute. This will "
63 "help keep the built files constrained to the build folder (which may "
64 "not be the home folder), and will result in faster deployment. Also, "
65 "make sure to clean up temporary files, to reduce bandwidth usage between "
66 "nodes when transferring built packages.",
67 flags = Flags.ReadOnly)
68 install = Attribute("install",
69 "Commands to transfer built files to their final destinations. "
70 "Sources will be in the initial working folder, and a special "
71 "tag ${SOURCES} can be used to reference the experiment's "
72 "home folder (where the application commands will run).\n"
73 "ALL sources and targets needed for execution must be copied there, "
74 "if building has been enabled.\n"
75 "That is, 'slave' nodes will not automatically get any source files. "
76 "'slave' nodes don't get build dependencies either, so if you need "
77 "make and other tools to install, be sure to provide them as "
78 "actual dependencies instead.",
79 flags = Flags.ReadOnly)
80 stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
81 stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
82 stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
83 tear_down = Attribute("tearDown", "Bash script to be executed before "
84 "releasing the resource",
85 flags = Flags.ReadOnly)
87 cls._register_attribute(command)
88 cls._register_attribute(forward_x11)
89 cls._register_attribute(env)
90 cls._register_attribute(sudo)
91 cls._register_attribute(depends)
92 cls._register_attribute(sources)
93 cls._register_attribute(code)
94 cls._register_attribute(build)
95 cls._register_attribute(install)
96 cls._register_attribute(stdin)
97 cls._register_attribute(stdout)
98 cls._register_attribute(stderr)
99 cls._register_attribute(tear_down)
102 def _register_traces(cls):
103 stdout = Trace("stdout", "Standard output stream")
104 stderr = Trace("stderr", "Standard error stream")
106 cls._register_trace(stdout)
107 cls._register_trace(stderr)
109 def __init__(self, ec, guid):
110 super(LinuxApplication, self).__init__(ec, guid)
113 self._home = "app-%s" % self.guid
114 self._in_foreground = False
116 # keep a reference to the running process handler when
117 # the command is not executed as remote daemon in background
120 # timestamp of last state check of the application
121 self._last_state_check = strfnow()
123 def log_message(self, msg):
124 return " guid %d - host %s - %s " % (self.guid,
125 self.node.get("hostname"), msg)
129 node = self.get_connected(LinuxNode.rtype())
130 if node: return node[0]
135 return os.path.join(self.node.exp_home, self._home)
139 return os.path.join(self.app_home, 'src')
143 return os.path.join(self.app_home, 'build')
154 def in_foreground(self):
155 """ Returns True if the command needs to be executed in foreground.
156 This means that command will be executed using 'execute' instead of
157 'run' ('run' executes a command in background and detached from the
160 When using X11 forwarding option, the command can not run in background
161 and detached from a terminal, since we need to keep the terminal attached
164 return self.get("forwardX11") or self._in_foreground
166 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
167 self.info("Retrieving '%s' trace %s " % (name, attr))
169 path = os.path.join(self.app_home, name)
171 command = "(test -f %s && echo 'success') || echo 'error'" % path
172 (out, err), proc = self.node.execute(command)
174 if (err and proc.poll()) or out.find("error") != -1:
175 msg = " Couldn't find trace %s " % name
176 self.error(msg, out, err)
179 if attr == TraceAttr.PATH:
182 if attr == TraceAttr.ALL:
183 (out, err), proc = self.node.check_output(self.app_home, name)
185 if err and proc.poll():
186 msg = " Couldn't read trace %s " % name
187 self.error(msg, out, err)
192 if attr == TraceAttr.STREAM:
193 cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
194 elif attr == TraceAttr.SIZE:
195 cmd = "stat -c%%s %s " % path
197 (out, err), proc = self.node.execute(cmd)
199 if err and proc.poll():
200 msg = " Couldn't find trace %s " % name
201 self.error(msg, out, err)
204 if attr == TraceAttr.SIZE:
205 out = int(out.strip())
210 # create home dir for application
211 self.node.mkdir(self.app_home)
214 self.upload_sources()
222 # install dependencies
223 self.install_dependencies()
231 # Upload command to remote bash script
232 # - only if command can be executed in background and detached
233 command = self.get("command")
235 if command and not self.in_foreground:
236 self.info("Uploading command '%s'" % command)
238 # replace application specific paths in the command
239 command = self.replace_paths(command)
241 # replace application specific paths in the environment
242 env = self.get("env")
243 env = env and self.replace_paths(env)
245 self.node.upload_command(command, self.app_home,
249 self.info("Provisioning finished")
251 super(LinuxApplication, self).provision()
253 def upload_sources(self):
254 sources = self.get("sources")
256 self.info("Uploading sources ")
258 # create dir for sources
259 self.node.mkdir(self.src_dir)
261 sources = sources.split(' ')
263 http_sources = list()
264 for source in list(sources):
265 if source.startswith("http") or source.startswith("https"):
266 http_sources.append(source)
267 sources.remove(source)
269 # Download http sources remotely
271 command = [" wget -c --directory-prefix=${SOURCES} "]
274 for source in http_sources:
275 command.append(" %s " % (source))
276 check.append(" ls ${SOURCES}/%s " % os.path.basename(source))
278 command = " ".join(command)
279 check = " ; ".join(check)
281 # Append the command to check that the sources were downloaded
282 command += " ; %s " % check
284 # replace application specific paths in the command
285 command = self.replace_paths(command)
287 # Upload the command to a bash script and run it
288 # in background ( but wait until the command has
289 # finished to continue )
290 self.node.run_and_wait(command, self.app_home,
291 shfile = "http_sources.sh",
292 pidfile = "http_sources_pidfile",
293 ecodefile = "http_sources_exitcode",
294 stdout = "http_sources_stdout",
295 stderr = "http_sources_stderr")
298 self.node.upload(sources, self.src_dir)
300 def upload_code(self):
301 code = self.get("code")
303 # create dir for sources
304 self.node.mkdir(self.src_dir)
306 self.info("Uploading code ")
308 dst = os.path.join(self.src_dir, "code")
309 self.node.upload(sources, dst, text = True)
311 def upload_stdin(self):
312 stdin = self.get("stdin")
314 # create dir for sources
315 self.info(" Uploading stdin ")
317 dst = os.path.join(self.app_home, "stdin")
320 # Check wether file already exists and if it exists
321 # wether the file we want to upload is the same
324 self.node.upload(stdin, dst, text = True)
326 def install_dependencies(self):
327 depends = self.get("depends")
329 self.info("Installing dependencies %s" % depends)
330 self.node.install_packages(depends, self.app_home)
333 build = self.get("build")
335 self.info("Building sources ")
337 # create dir for build
338 self.node.mkdir(self.build_dir)
340 # replace application specific paths in the command
341 command = self.replace_paths(build)
343 # Upload the command to a bash script and run it
344 # in background ( but wait until the command has
345 # finished to continue )
346 self.node.run_and_wait(command, self.app_home,
348 pidfile = "build_pidfile",
349 ecodefile = "build_exitcode",
350 stdout = "build_stdout",
351 stderr = "build_stderr")
354 install = self.get("install")
356 self.info("Installing sources ")
358 # replace application specific paths in the command
359 command = self.replace_paths(install)
361 # Upload the command to a bash script and run it
362 # in background ( but wait until the command has
363 # finished to continue )
364 self.node.run_and_wait(command, self.app_home,
365 shfile = "install.sh",
366 pidfile = "install_pidfile",
367 ecodefile = "install_exitcode",
368 stdout = "install_stdout",
369 stderr = "install_stderr")
372 # Wait until node is associated and deployed
374 if not node or node.state < ResourceState.READY:
375 self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
377 reschedule_delay = "0.5s"
378 self.ec.schedule(reschedule_delay, self.deploy)
381 command = self.get("command") or ""
382 self.info("Deploying command '%s' " % command)
386 self._state = ResourceState.FAILED
389 super(LinuxApplication, self).deploy()
392 command = self.get("command")
394 self.info("Starting command '%s'" % command)
397 # If no command was given (i.e. Application was used for dependency
398 # installation), then the application is directly marked as FINISHED
399 self._state = ResourceState.FINISHED
402 if self.in_foreground:
403 self._start_in_foreground()
405 self._start_in_background()
407 super(LinuxApplication, self).start()
409 def _start_in_foreground(self):
410 command = self.get("command")
411 stdin = "stdin" if self.get("stdin") else None
412 sudo = self.get("sudo") or False
413 x11 = self.get("forwardX11")
415 # Command will be launched in foreground and attached to the
416 # terminal using the node 'execute' in non blocking mode.
419 env = self.get("env")
420 environ = self.node.format_environment(env, inline = True)
421 command = environ + command
422 command = self.replace_paths(command)
424 # We save the reference to the process in self._proc
425 # to be able to kill the process from the stop method.
426 # We also set blocking = False, since we don't want the
427 # thread to block until the execution finishes.
428 (out, err), self._proc = self.node.execute(command,
434 if self._proc.poll():
435 self._state = ResourceState.FAILED
436 self.error(msg, out, err)
437 raise RuntimeError, msg
439 def _start_in_background(self):
440 command = self.get("command")
441 env = self.get("env")
442 stdin = "stdin" if self.get("stdin") else None
443 stdout = "stdout" if self.get("stdout") else "stdout"
444 stderr = "stderr" if self.get("stderr") else "stderr"
445 sudo = self.get("sudo") or False
447 # Command will be as a daemon in baground and detached from any terminal.
448 # The real command to run was previously uploaded to a bash script
449 # during deployment, now launch the remote script using 'run'
450 # method from the node
451 cmd = "bash ./app.sh"
452 (out, err), proc = self.node.run(cmd, self.app_home,
458 # check if execution errors occurred
459 msg = " Failed to start command '%s' " % command
462 self._state = ResourceState.FAILED
463 self.error(msg, out, err)
464 raise RuntimeError, msg
466 # Wait for pid file to be generated
467 pid, ppid = self.node.wait_pid(self.app_home)
468 if pid: self._pid = int(pid)
469 if ppid: self._ppid = int(ppid)
471 # If the process is not running, check for error information
472 # on the remote machine
473 if not self.pid or not self.ppid:
474 (out, err), proc = self.node.check_errors(self.app_home,
477 # Out is what was written in the stderr file
479 self._state = ResourceState.FAILED
480 msg = " Failed to start command '%s' " % command
481 self.error(msg, out, err)
482 raise RuntimeError, msg
485 """ Stops application execution
487 command = self.get('command') or ''
489 if self.state == ResourceState.STARTED:
492 self.info("Stopping command '%s'" % command)
494 # If the command is running in foreground (it was launched using
495 # the node 'execute' method), then we use the handler to the Popen
496 # process to kill it. Else we send a kill signal using the pid and ppid
497 # retrieved after running the command with the node 'run' method
502 # Only try to kill the process if the pid and ppid
504 if self.pid and self.ppid:
505 (out, err), proc = self.node.kill(self.pid, self.ppid)
508 # check if execution errors occurred
509 msg = " Failed to STOP command '%s' " % self.get("command")
510 self.error(msg, out, err)
511 self._state = ResourceState.FAILED
515 super(LinuxApplication, self).stop()
518 self.info("Releasing resource")
520 tear_down = self.get("tearDown")
522 self.node.execute(tear_down)
526 if self.state == ResourceState.STOPPED:
527 super(LinuxApplication, self).release()
531 """ Returns the state of the application
533 if self._state == ResourceState.STARTED:
534 if self.in_foreground:
535 # Check if the process we used to execute the command
536 # is still running ...
537 retcode = self._proc.poll()
539 # retcode == None -> running
540 # retcode > 0 -> error
541 # retcode == 0 -> finished
544 msg = " Failed to execute command '%s'" % self.get("command")
545 err = self._proc.stderr.read()
546 self.error(msg, out, err)
547 self._state = ResourceState.FAILED
549 self._state = ResourceState.FINISHED
552 # We need to query the status of the command we launched in
553 # background. In oredr to avoid overwhelming the remote host and
554 # the local processor with too many ssh queries, the state is only
555 # requested every 'state_check_delay' seconds.
556 state_check_delay = 0.5
557 if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
558 # check if execution errors occurred
559 (out, err), proc = self.node.check_errors(self.app_home)
562 msg = " Failed to execute command '%s'" % self.get("command")
563 self.error(msg, out, err)
564 self._state = ResourceState.FAILED
566 elif self.pid and self.ppid:
567 # No execution errors occurred. Make sure the background
568 # process with the recorded pid is still running.
569 status = self.node.status(self.pid, self.ppid)
571 if status == ProcStatus.FINISHED:
572 self._state = ResourceState.FINISHED
574 self._last_state_check = strfnow()
578 def replace_paths(self, command):
580 Replace all special path tags with shell-escaped actual paths.
583 return d if d.startswith("/") else os.path.join("${HOME}", d)
586 .replace("${SOURCES}", absolute_dir(self.src_dir))
587 .replace("${BUILD}", absolute_dir(self.build_dir))
588 .replace("${APP_HOME}", absolute_dir(self.app_home))
589 .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
590 .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
593 def valid_connection(self, guid):