2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sshfuncs import ProcStatus
25 from nepi.util.timefuncs import strfnow, strfdiff
29 # TODO: Resolve wildcards in commands!!
33 class LinuxApplication(ResourceManager):
34 _rtype = "LinuxApplication"
37 def _register_attributes(cls):
38 command = Attribute("command", "Command to execute",
39 flags = Flags.ExecReadOnly)
40 forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections",
41 flags = Flags.ExecReadOnly)
42 env = Attribute("env", "Environment variables string for command execution",
43 flags = Flags.ExecReadOnly)
44 sudo = Attribute("sudo", "Run with root privileges",
45 flags = Flags.ExecReadOnly)
46 depends = Attribute("depends",
47 "Space-separated list of packages required to run the application",
48 flags = Flags.ExecReadOnly)
49 sources = Attribute("sources",
50 "Space-separated list of regular files to be deployed in the working "
51 "path prior to building. Archives won't be expanded automatically.",
52 flags = Flags.ExecReadOnly)
53 code = Attribute("code",
54 "Plain text source code to be uploaded to the server. It will be stored "
55 "under ${SOURCES}/code",
56 flags = Flags.ExecReadOnly)
57 build = Attribute("build",
58 "Build commands to execute after deploying the sources. "
59 "Sources will be in the ${SOURCES} folder. "
60 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
61 "Try to make the commands return with a nonzero exit code on error.\n"
62 "Also, do not install any programs here, use the 'install' attribute. This will "
63 "help keep the built files constrained to the build folder (which may "
64 "not be the home folder), and will result in faster deployment. Also, "
65 "make sure to clean up temporary files, to reduce bandwidth usage between "
66 "nodes when transferring built packages.",
67 flags = Flags.ReadOnly)
68 install = Attribute("install",
69 "Commands to transfer built files to their final destinations. "
70 "Sources will be in the initial working folder, and a special "
71 "tag ${SOURCES} can be used to reference the experiment's "
72 "home folder (where the application commands will run).\n"
73 "ALL sources and targets needed for execution must be copied there, "
74 "if building has been enabled.\n"
75 "That is, 'slave' nodes will not automatically get any source files. "
76 "'slave' nodes don't get build dependencies either, so if you need "
77 "make and other tools to install, be sure to provide them as "
78 "actual dependencies instead.",
79 flags = Flags.ReadOnly)
80 stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
81 stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
82 stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
83 tear_down = Attribute("tearDown", "Bash script to be executed before "
84 "releasing the resource",
85 flags = Flags.ReadOnly)
87 cls._register_attribute(command)
88 cls._register_attribute(forward_x11)
89 cls._register_attribute(env)
90 cls._register_attribute(sudo)
91 cls._register_attribute(depends)
92 cls._register_attribute(sources)
93 cls._register_attribute(code)
94 cls._register_attribute(build)
95 cls._register_attribute(install)
96 cls._register_attribute(stdin)
97 cls._register_attribute(stdout)
98 cls._register_attribute(stderr)
99 cls._register_attribute(tear_down)
102 def _register_traces(cls):
103 stdout = Trace("stdout", "Standard output stream")
104 stderr = Trace("stderr", "Standard error stream")
106 cls._register_trace(stdout)
107 cls._register_trace(stderr)
109 def __init__(self, ec, guid):
110 super(LinuxApplication, self).__init__(ec, guid)
113 self._home = "app-%s" % self.guid
115 # keep a reference to the running process handler when
116 # the command is not executed as remote daemon in background
119 # timestamp of last state check of the application
120 self._last_state_check = strfnow()
122 def log_message(self, msg):
123 return " guid %d - host %s - %s " % (self.guid,
124 self.node.get("hostname"), msg)
128 node = self.get_connected(LinuxNode.rtype())
129 if node: return node[0]
134 return os.path.join(self.node.exp_home, self._home)
138 return os.path.join(self.app_home, 'src')
142 return os.path.join(self.app_home, 'build')
153 def in_foreground(self):
154 """ Returns True if the command needs to be executed in foreground.
155 This means that command will be executed using 'execute' instead of
156 'run' ('run' executes a command in background and detached from the
159 When using X11 forwarding option, the command can not run in background
160 and detached from a terminal, since we need to keep the terminal attached
163 return self.get("forwardX11") or False
165 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
166 self.info("Retrieving '%s' trace %s " % (name, attr))
168 path = os.path.join(self.app_home, name)
170 command = "(test -f %s && echo 'success') || echo 'error'" % path
171 (out, err), proc = self.node.execute(command)
173 if (err and proc.poll()) or out.find("error") != -1:
174 msg = " Couldn't find trace %s " % name
175 self.error(msg, out, err)
178 if attr == TraceAttr.PATH:
181 if attr == TraceAttr.ALL:
182 (out, err), proc = self.node.check_output(self.app_home, name)
184 if err and proc.poll():
185 msg = " Couldn't read trace %s " % name
186 self.error(msg, out, err)
191 if attr == TraceAttr.STREAM:
192 cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
193 elif attr == TraceAttr.SIZE:
194 cmd = "stat -c%%s %s " % path
196 (out, err), proc = self.node.execute(cmd)
198 if err and proc.poll():
199 msg = " Couldn't find trace %s " % name
200 self.error(msg, out, err)
203 if attr == TraceAttr.SIZE:
204 out = int(out.strip())
209 # create home dir for application
210 self.node.mkdir(self.app_home)
213 self.upload_sources()
221 # install dependencies
222 self.install_dependencies()
230 # Upload command to remote bash script
231 # - only if command can be executed in background and detached
232 command = self.get("command")
234 if command and not self.in_foreground:
235 self.info("Uploading command '%s'" % command)
237 # replace application specific paths in the command
238 command = self.replace_paths(command)
240 # replace application specific paths in the environment
241 env = self.get("env")
242 env = env and self.replace_paths(env)
244 self.node.upload_command(command, self.app_home,
248 self.info("Provisioning finished")
250 super(LinuxApplication, self).provision()
252 def upload_sources(self):
253 sources = self.get("sources")
255 self.info("Uploading sources ")
257 # create dir for sources
258 self.node.mkdir(self.src_dir)
260 sources = sources.split(' ')
262 http_sources = list()
263 for source in list(sources):
264 if source.startswith("http") or source.startswith("https"):
265 http_sources.append(source)
266 sources.remove(source)
268 # Download http sources remotely
270 command = [" wget -c --directory-prefix=${SOURCES} "]
273 for source in http_sources:
274 command.append(" %s " % (source))
275 check.append(" ls ${SOURCES}/%s " % os.path.basename(source))
277 command = " ".join(command)
278 check = " ; ".join(check)
280 # Append the command to check that the sources were downloaded
281 command += " ; %s " % check
283 # replace application specific paths in the command
284 command = self.replace_paths(command)
286 # Upload the command to a bash script and run it
287 # in background ( but wait until the command has
288 # finished to continue )
289 self.node.run_and_wait(command, self.app_home,
290 shfile = "http_sources.sh",
291 pidfile = "http_sources_pidfile",
292 ecodefile = "http_sources_exitcode",
293 stdout = "http_sources_stdout",
294 stderr = "http_sources_stderr")
297 self.node.upload(sources, self.src_dir)
299 def upload_code(self):
300 code = self.get("code")
302 # create dir for sources
303 self.node.mkdir(self.src_dir)
305 self.info("Uploading code ")
307 dst = os.path.join(self.src_dir, "code")
308 self.node.upload(sources, dst, text = True)
310 def upload_stdin(self):
311 stdin = self.get("stdin")
313 # create dir for sources
314 self.info(" Uploading stdin ")
316 dst = os.path.join(self.app_home, "stdin")
317 self.node.upload(stdin, dst, text = True)
319 def install_dependencies(self):
320 depends = self.get("depends")
322 self.info("Installing dependencies %s" % depends)
323 self.node.install_packages(depends, self.app_home)
326 build = self.get("build")
328 self.info("Building sources ")
330 # create dir for build
331 self.node.mkdir(self.build_dir)
333 # replace application specific paths in the command
334 command = self.replace_paths(build)
336 # Upload the command to a bash script and run it
337 # in background ( but wait until the command has
338 # finished to continue )
339 self.node.run_and_wait(command, self.app_home,
341 pidfile = "build_pidfile",
342 ecodefile = "build_exitcode",
343 stdout = "build_stdout",
344 stderr = "build_stderr")
347 install = self.get("install")
349 self.info("Installing sources ")
351 # replace application specific paths in the command
352 command = self.replace_paths(install)
354 # Upload the command to a bash script and run it
355 # in background ( but wait until the command has
356 # finished to continue )
357 self.node.run_and_wait(command, self.app_home,
358 shfile = "install.sh",
359 pidfile = "install_pidfile",
360 ecodefile = "install_exitcode",
361 stdout = "install_stdout",
362 stderr = "install_stderr")
365 # Wait until node is associated and deployed
367 if not node or node.state < ResourceState.READY:
368 self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
370 reschedule_delay = "0.5s"
371 self.ec.schedule(reschedule_delay, self.deploy)
374 command = self.get("command") or ""
375 self.info("Deploying command '%s' " % command)
379 self._state = ResourceState.FAILED
382 super(LinuxApplication, self).deploy()
385 command = self.get("command")
387 self.info("Starting command '%s'" % command)
390 # If no command was given (i.e. Application was used for dependency
391 # installation), then the application is directly marked as FINISHED
392 self._state = ResourceState.FINISHED
394 if self.in_foreground:
395 self._start_in_foreground()
397 self._start_in_background()
399 super(LinuxApplication, self).start()
401 def _start_in_foreground(self):
402 command = self.get("command")
403 env = self.get("env")
404 stdin = "stdin" if self.get("stdin") else None
405 sudo = self.get("sudo") or False
406 x11 = self.get("forwardX11")
408 # Command will be launched in foreground and attached to the
409 # terminal using the node 'execute' in non blocking mode.
412 environ = self.node.format_environment(env, inline = True)
413 command = environ + command
414 command = self.replace_paths(command)
416 self.info("Starting command IN FOREGROUND '%s'" % command)
418 # We save the reference to the process in self._proc
419 # to be able to kill the process from the stop method.
420 # We also set blocking = False, since we don't want the
421 # thread to block until the execution finishes.
422 (out, err), self._proc = self.node.execute(command,
428 if self._proc.poll():
429 self._state = ResourceState.FAILED
430 self.error(msg, out, err)
431 raise RuntimeError, msg
433 def _start_in_background(self):
434 command = self.get("command")
435 env = self.get("env")
436 stdin = "stdin" if self.get("stdin") else None
437 stdout = "stdout" if self.get("stdout") else "stdout"
438 stderr = "stderr" if self.get("stderr") else "stderr"
439 sudo = self.get("sudo") or False
441 # Command will be as a daemon in baground and detached from any terminal.
442 # The real command to run was previously uploaded to a bash script
443 # during deployment, now launch the remote script using 'run'
444 # method from the node
445 cmd = "bash ./app.sh"
446 (out, err), proc = self.node.run(cmd, self.app_home,
452 # check if execution errors occurred
453 msg = " Failed to start command '%s' " % command
456 self._state = ResourceState.FAILED
457 self.error(msg, out, err)
458 raise RuntimeError, msg
460 # Wait for pid file to be generated
461 pid, ppid = self.node.wait_pid(self.app_home)
462 if pid: self._pid = int(pid)
463 if ppid: self._ppid = int(ppid)
465 # If the process is not running, check for error information
466 # on the remote machine
467 if not self.pid or not self.ppid:
468 (out, err), proc = self.check_errors(home, ecodefile, stderr)
470 # Out is what was written in the stderr file
472 self._state = ResourceState.FAILED
473 msg = " Failed to start command '%s' " % command
474 self.error(msg, out, err)
475 raise RuntimeError, msg
478 """ Stops application execution
480 command = self.get('command') or ''
482 if self.state == ResourceState.STARTED:
485 self.info("Stopping command '%s'" % command)
487 # If the command is running in foreground (it was launched using
488 # the node 'execute' method), then we use the handler to the Popen
489 # process to kill it. Else we send a kill signal using the pid and ppid
490 # retrieved after running the command with the node 'run' method
495 (out, err), proc = self.node.kill(self.pid, self.ppid)
498 # check if execution errors occurred
499 msg = " Failed to STOP command '%s' " % self.get("command")
500 self.error(msg, out, err)
501 self._state = ResourceState.FAILED
505 super(LinuxApplication, self).stop()
508 self.info("Releasing resource")
510 tear_down = self.get("tearDown")
512 self.node.execute(tear_down)
516 if self.state == ResourceState.STOPPED:
517 super(LinuxApplication, self).release()
521 """ Returns the state of the application
523 if self._state == ResourceState.STARTED:
524 if self.in_foreground:
525 # Check if the process we used to execute the command
526 # is still running ...
527 retcode = self._proc.poll()
529 # retcode == None -> running
530 # retcode > 0 -> error
531 # retcode == 0 -> finished
534 msg = " Failed to execute command '%s'" % self.get("command")
535 err = self._proc.stderr.read()
536 self.error(msg, out, err)
537 self._state = ResourceState.FAILED
539 self._state = ResourceState.FINISHED
542 # We need to query the status of the command we launched in
543 # background. In oredr to avoid overwhelming the remote host and
544 # the local processor with too many ssh queries, the state is only
545 # requested every 'state_check_delay' seconds.
546 state_check_delay = 0.5
547 if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
548 # check if execution errors occurred
549 (out, err), proc = self.node.check_errors(self.app_home)
552 msg = " Failed to execute command '%s'" % self.get("command")
553 self.error(msg, out, err)
554 self._state = ResourceState.FAILED
556 elif self.pid and self.ppid:
557 # No execution errors occurred. Make sure the background
558 # process with the recorded pid is still running.
559 status = self.node.status(self.pid, self.ppid)
561 if status == ProcStatus.FINISHED:
562 self._state = ResourceState.FINISHED
564 self._last_state_check = strfnow()
568 def replace_paths(self, command):
570 Replace all special path tags with shell-escaped actual paths.
573 return d if d.startswith("/") else os.path.join("${HOME}", d)
576 .replace("${SOURCES}", absolute_dir(self.src_dir))
577 .replace("${BUILD}", absolute_dir(self.build_dir))
578 .replace("${APP_HOME}", absolute_dir(self.app_home))
579 .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
580 .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
583 def valid_connection(self, guid):