2 NEPI, a framework to manage network experiments
3 Copyright (C) 2013 INRIA
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util import sshfuncs
25 from nepi.util.timefuncs import strfnow, strfdiff
29 reschedule_delay = "0.5s"
32 # TODO: Resolve wildcards in commands!!
35 class LinuxApplication(ResourceManager):
36 _rtype = "LinuxApplication"
39 def _register_attributes(cls):
40 command = Attribute("command", "Command to execute",
41 flags = Flags.ExecReadOnly)
42 forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections",
43 flags = Flags.ExecReadOnly)
44 env = Attribute("env", "Environment variables string for command execution",
45 flags = Flags.ExecReadOnly)
46 sudo = Attribute("sudo", "Run with root privileges",
47 flags = Flags.ExecReadOnly)
48 depends = Attribute("depends",
49 "Space-separated list of packages required to run the application",
50 flags = Flags.ExecReadOnly)
51 sources = Attribute("sources",
52 "Space-separated list of regular files to be deployed in the working "
53 "path prior to building. Archives won't be expanded automatically.",
54 flags = Flags.ExecReadOnly)
55 code = Attribute("code",
56 "Plain text source code to be uploaded to the server. It will be stored "
57 "under ${SOURCES}/code",
58 flags = Flags.ExecReadOnly)
59 build = Attribute("build",
60 "Build commands to execute after deploying the sources. "
61 "Sources will be in the ${SOURCES} folder. "
62 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
63 "Try to make the commands return with a nonzero exit code on error.\n"
64 "Also, do not install any programs here, use the 'install' attribute. This will "
65 "help keep the built files constrained to the build folder (which may "
66 "not be the home folder), and will result in faster deployment. Also, "
67 "make sure to clean up temporary files, to reduce bandwidth usage between "
68 "nodes when transferring built packages.",
69 flags = Flags.ReadOnly)
70 install = Attribute("install",
71 "Commands to transfer built files to their final destinations. "
72 "Sources will be in the initial working folder, and a special "
73 "tag ${SOURCES} can be used to reference the experiment's "
74 "home folder (where the application commands will run).\n"
75 "ALL sources and targets needed for execution must be copied there, "
76 "if building has been enabled.\n"
77 "That is, 'slave' nodes will not automatically get any source files. "
78 "'slave' nodes don't get build dependencies either, so if you need "
79 "make and other tools to install, be sure to provide them as "
80 "actual dependencies instead.",
81 flags = Flags.ReadOnly)
82 stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
83 stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
84 stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
85 tear_down = Attribute("tearDown", "Bash script to be executed before "
86 "releasing the resource",
87 flags = Flags.ReadOnly)
89 cls._register_attribute(command)
90 cls._register_attribute(forward_x11)
91 cls._register_attribute(env)
92 cls._register_attribute(sudo)
93 cls._register_attribute(depends)
94 cls._register_attribute(sources)
95 cls._register_attribute(code)
96 cls._register_attribute(build)
97 cls._register_attribute(install)
98 cls._register_attribute(stdin)
99 cls._register_attribute(stdout)
100 cls._register_attribute(stderr)
101 cls._register_attribute(tear_down)
104 def _register_traces(cls):
105 stdout = Trace("stdout", "Standard output stream")
106 stderr = Trace("stderr", "Standard error stream")
107 buildlog = Trace("buildlog", "Output of the build process")
109 cls._register_trace(stdout)
110 cls._register_trace(stderr)
111 cls._register_trace(buildlog)
113 def __init__(self, ec, guid):
114 super(LinuxApplication, self).__init__(ec, guid)
117 self._home = "app-%s" % self.guid
119 # timestamp of last state check of the application
120 self._last_state_check = strfnow()
122 def log_message(self, msg):
123 return " guid %d - host %s - %s " % (self.guid,
124 self.node.get("hostname"), msg)
128 node = self.get_connected(LinuxNode.rtype())
129 if node: return node[0]
134 return os.path.join(self.node.exp_home, self._home)
138 return os.path.join(self.app_home, 'src')
142 return os.path.join(self.app_home, 'build')
152 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
153 self.info("Retrieving '%s' trace %s " % (name, attr))
155 path = os.path.join(self.app_home, name)
157 command = "(test -f %s && echo 'success') || echo 'error'" % path
158 (out, err), proc = self.node.execute(command)
160 if (err and proc.poll()) or out.find("error") != -1:
161 msg = " Couldn't find trace %s " % name
162 self.error(msg, out, err)
165 if attr == TraceAttr.PATH:
168 if attr == TraceAttr.ALL:
169 (out, err), proc = self.node.check_output(self.app_home, name)
171 if err and proc.poll():
172 msg = " Couldn't read trace %s " % name
173 self.error(msg, out, err)
178 if attr == TraceAttr.STREAM:
179 cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
180 elif attr == TraceAttr.SIZE:
181 cmd = "stat -c%%s %s " % path
183 (out, err), proc = self.node.execute(cmd)
185 if err and proc.poll():
186 msg = " Couldn't find trace %s " % name
187 self.error(msg, out, err)
190 if attr == TraceAttr.SIZE:
191 out = int(out.strip())
196 # create home dir for application
197 self.node.mkdir(self.app_home)
200 self.upload_sources()
208 # install dependencies
209 self.install_dependencies()
217 command = self.get("command")
218 x11 = self.get("forwardX11")
219 if not x11 and command:
220 self.info("Uploading command '%s'" % command)
224 env = self.get("env") or ""
225 for var in env.split(" "):
226 environ += 'export %s\n' % var
228 command = environ + command
230 # If the command runs asynchronous, pre upload the command
231 # to the app.sh file in the remote host
232 dst = os.path.join(self.app_home, "app.sh")
233 command = self.replace_paths(command)
234 self.node.upload(command, dst, text = True)
236 super(LinuxApplication, self).provision()
238 def upload_sources(self):
239 # TODO: check if sources need to be uploaded and upload them
240 sources = self.get("sources")
242 self.info(" Uploading sources ")
244 # create dir for sources
245 self.node.mkdir(self.src_dir)
247 sources = sources.split(' ')
249 http_sources = list()
250 for source in list(sources):
251 if source.startswith("http") or source.startswith("https"):
252 http_sources.append(source)
253 sources.remove(source)
255 # Download http sources
257 cmd = " wget -c --directory-prefix=${SOURCES} "
260 for source in http_sources:
261 cmd += " %s " % (source)
262 verif += " ls ${SOURCES}/%s ;" % os.path.basename(source)
264 # Wget output goes to stderr :S
265 cmd += " 2> /dev/null ; "
268 cmd += " %s " % verif
270 # Upload the command to a file, and execute asynchronously
271 self.upload_and_run(cmd,
272 "http_sources.sh", "http_sources_pid",
273 "http_sources_out", "http_sources_err")
275 self.node.upload(sources, self.src_dir)
277 def upload_code(self):
278 code = self.get("code")
280 # create dir for sources
281 self.node.mkdir(self.src_dir)
283 self.info(" Uploading code ")
285 dst = os.path.join(self.src_dir, "code")
286 self.node.upload(sources, dst, text = True)
288 def upload_stdin(self):
289 stdin = self.get("stdin")
291 # create dir for sources
292 self.info(" Uploading stdin ")
294 dst = os.path.join(self.app_home, "stdin")
295 self.node.upload(stdin, dst, text = True)
297 def install_dependencies(self):
298 depends = self.get("depends")
300 self.info(" Installing dependencies %s" % depends)
301 self.node.install_packages(depends, home = self.app_home)
304 build = self.get("build")
306 self.info(" Building sources ")
308 # create dir for build
309 self.node.mkdir(self.build_dir)
311 # Upload the command to a file, and execute asynchronously
312 self.upload_and_run(build,
313 "build.sh", "build_pid",
314 "build_out", "build_err")
317 install = self.get("install")
319 self.info(" Installing sources ")
321 # Upload the command to a file, and execute asynchronously
322 self.upload_and_run(install,
323 "install.sh", "install_pid",
324 "install_out", "install_err")
327 # Wait until node is associated and deployed
329 if not node or node.state < ResourceState.READY:
330 self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
331 self.ec.schedule(reschedule_delay, self.deploy)
334 command = self.get("command") or ""
335 self.info("Deploying command '%s' " % command)
339 self._state = ResourceState.FAILED
342 super(LinuxApplication, self).deploy()
345 command = self.get('command')
346 env = self.get('env')
347 stdin = 'stdin' if self.get('stdin') else None
348 stdout = 'stdout' if self.get('stdout') else 'stdout'
349 stderr = 'stderr' if self.get('stderr') else 'stderr'
350 sudo = self.get('sudo') or False
351 x11 = self.get('forwardX11') or False
354 super(LinuxApplication, self).start()
357 self.info("No command to start ")
358 self._state = ResourceState.FINISHED
361 self.info("Starting command '%s'" % command)
367 for var in env.split(" "):
368 environ += ' %s ' % var
370 command = "(" + environ + " ; " + command + ")"
371 command = self.replace_paths(command)
373 # If the command requires X11 forwarding, we
374 # can't run it asynchronously
375 (out, err), proc = self.node.execute(command,
380 self._state = ResourceState.FINISHED
382 if proc.poll() and err:
385 # Command was previously uploaded, now run the remote
386 # bash file asynchronously
387 cmd = "bash ./app.sh"
388 (out, err), proc = self.node.run(cmd, self.app_home,
394 if proc.poll() and err:
398 pid, ppid = self.node.wait_pid(home = self.app_home)
399 if pid: self._pid = int(pid)
400 if ppid: self._ppid = int(ppid)
402 if not self.pid or not self.ppid:
405 (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr')
407 if failed or out or chkerr:
408 # check if execution errors occurred
409 msg = " Failed to start command '%s' " % command
416 self.error(msg, out, err)
418 msg2 = " Setting state to Failed"
420 self._state = ResourceState.FAILED
422 raise RuntimeError, msg
425 command = self.get('command') or ''
428 if state == ResourceState.STARTED:
429 self.info("Stopping command '%s'" % command)
431 (out, err), proc = self.node.kill(self.pid, self.ppid)
434 # check if execution errors occurred
435 msg = " Failed to STOP command '%s' " % self.get("command")
436 self.error(msg, out, err)
437 self._state = ResourceState.FAILED
440 super(LinuxApplication, self).stop()
443 self.info("Releasing resource")
445 tear_down = self.get("tearDown")
447 self.node.execute(tear_down)
450 if self.state == ResourceState.STOPPED:
451 super(LinuxApplication, self).release()
455 if self._state == ResourceState.STARTED:
456 # To avoid overwhelming the remote hosts and the local processor
457 # with too many ssh queries, the state is only requested
458 # every 'state_check_delay' .
459 if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
460 # check if execution errors occurred
461 (out, err), proc = self.node.check_output(self.app_home, 'stderr')
464 if err.find("No such file or directory") >= 0 :
465 # The resource is marked as started, but the
466 # command was not yet executed
467 return ResourceState.READY
469 msg = " Failed to execute command '%s'" % self.get("command")
470 self.error(msg, out, err)
471 self._state = ResourceState.FAILED
473 elif self.pid and self.ppid:
474 status = self.node.status(self.pid, self.ppid)
476 if status == sshfuncs.FINISHED:
477 self._state = ResourceState.FINISHED
480 self._last_state_check = strfnow()
484 def upload_and_run(self, cmd, fname, pidfile, outfile, errfile):
485 dst = os.path.join(self.app_home, fname)
486 cmd = self.replace_paths(cmd)
487 self.node.upload(cmd, dst, text = True)
489 cmd = "bash ./%s" % fname
490 (out, err), proc = self.node.run_and_wait(cmd, self.app_home,
494 raise_on_error = True)
496 def replace_paths(self, command):
498 Replace all special path tags with shell-escaped actual paths.
501 return d if d.startswith("/") else os.path.join("${HOME}", d)
504 .replace("${SOURCES}", absolute_dir(self.src_dir))
505 .replace("${BUILD}", absolute_dir(self.build_dir))
506 .replace("${APP_HOME}", absolute_dir(self.app_home))
507 .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
508 .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
511 def valid_connection(self, guid):
514 # XXX: What if it is connected to more than one node?
515 resources = self.find_resources(exact_tags = [tags.NODE])
516 self._node = resources[0] if len(resources) == 1 else None