2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util import sshfuncs
25 from nepi.util.timefuncs import strfnow, strfdiff
29 reschedule_delay = "0.5s"
32 # TODO: Resolve wildcards in commands!!
33 # TODO: If command is not set give a warning but do not generate an error!
36 class LinuxApplication(ResourceManager):
37 _rtype = "LinuxApplication"
40 def _register_attributes(cls):
41 command = Attribute("command", "Command to execute",
42 flags = Flags.ExecReadOnly)
43 forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections",
44 flags = Flags.ExecReadOnly)
45 env = Attribute("env", "Environment variables string for command execution",
46 flags = Flags.ExecReadOnly)
47 sudo = Attribute("sudo", "Run with root privileges",
48 flags = Flags.ExecReadOnly)
49 depends = Attribute("depends",
50 "Space-separated list of packages required to run the application",
51 flags = Flags.ExecReadOnly)
52 sources = Attribute("sources",
53 "Space-separated list of regular files to be deployed in the working "
54 "path prior to building. Archives won't be expanded automatically.",
55 flags = Flags.ExecReadOnly)
56 code = Attribute("code",
57 "Plain text source code to be uploaded to the server. It will be stored "
58 "under ${SOURCES}/code",
59 flags = Flags.ExecReadOnly)
60 build = Attribute("build",
61 "Build commands to execute after deploying the sources. "
62 "Sources will be in the ${SOURCES} folder. "
63 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
64 "Try to make the commands return with a nonzero exit code on error.\n"
65 "Also, do not install any programs here, use the 'install' attribute. This will "
66 "help keep the built files constrained to the build folder (which may "
67 "not be the home folder), and will result in faster deployment. Also, "
68 "make sure to clean up temporary files, to reduce bandwidth usage between "
69 "nodes when transferring built packages.",
70 flags = Flags.ReadOnly)
71 install = Attribute("install",
72 "Commands to transfer built files to their final destinations. "
73 "Sources will be in the initial working folder, and a special "
74 "tag ${SOURCES} can be used to reference the experiment's "
75 "home folder (where the application commands will run).\n"
76 "ALL sources and targets needed for execution must be copied there, "
77 "if building has been enabled.\n"
78 "That is, 'slave' nodes will not automatically get any source files. "
79 "'slave' nodes don't get build dependencies either, so if you need "
80 "make and other tools to install, be sure to provide them as "
81 "actual dependencies instead.",
82 flags = Flags.ReadOnly)
83 stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
84 stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
85 stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
86 tear_down = Attribute("tearDown", "Bash script to be executed before "
87 "releasing the resource",
88 flags = Flags.ReadOnly)
90 cls._register_attribute(command)
91 cls._register_attribute(forward_x11)
92 cls._register_attribute(env)
93 cls._register_attribute(sudo)
94 cls._register_attribute(depends)
95 cls._register_attribute(sources)
96 cls._register_attribute(code)
97 cls._register_attribute(build)
98 cls._register_attribute(install)
99 cls._register_attribute(stdin)
100 cls._register_attribute(stdout)
101 cls._register_attribute(stderr)
102 cls._register_attribute(tear_down)
105 def _register_traces(cls):
106 stdout = Trace("stdout", "Standard output stream")
107 stderr = Trace("stderr", "Standard error stream")
108 buildlog = Trace("buildlog", "Output of the build process")
110 cls._register_trace(stdout)
111 cls._register_trace(stderr)
112 cls._register_trace(buildlog)
114 def __init__(self, ec, guid):
115 super(LinuxApplication, self).__init__(ec, guid)
118 self._home = "app-%s" % self.guid
120 # timestamp of last state check of the application
121 self._last_state_check = strfnow()
123 def log_message(self, msg):
124 return " guid %d - host %s - %s " % (self.guid,
125 self.node.get("hostname"), msg)
129 node = self.get_connected(LinuxNode.rtype())
130 if node: return node[0]
135 return os.path.join(self.node.exp_home, self._home)
139 return os.path.join(self.app_home, 'src')
143 return os.path.join(self.app_home, 'build')
153 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
154 self.info("Retrieving '%s' trace %s " % (name, attr))
156 path = os.path.join(self.app_home, name)
158 command = "(test -f %s && echo 'success') || echo 'error'" % path
159 (out, err), proc = self.node.execute(command)
161 if (err and proc.poll()) or out.find("error") != -1:
162 msg = " Couldn't find trace %s " % name
163 self.error(msg, out, err)
166 if attr == TraceAttr.PATH:
169 if attr == TraceAttr.ALL:
170 (out, err), proc = self.node.check_output(self.app_home, name)
172 if err and proc.poll():
173 msg = " Couldn't read trace %s " % name
174 self.error(msg, out, err)
179 if attr == TraceAttr.STREAM:
180 cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
181 elif attr == TraceAttr.SIZE:
182 cmd = "stat -c%%s %s " % path
184 (out, err), proc = self.node.execute(cmd)
186 if err and proc.poll():
187 msg = " Couldn't find trace %s " % name
188 self.error(msg, out, err)
191 if attr == TraceAttr.SIZE:
192 out = int(out.strip())
197 # create home dir for application
198 self.node.mkdir(self.app_home)
201 self.upload_sources()
209 # install dependencies
210 self.install_dependencies()
218 command = self.get("command")
219 x11 = self.get("forwardX11")
220 if not x11 and command:
221 self.info("Uploading command '%s'" % command)
226 for var in self.get("env").split(" "):
227 environ += 'export %s\n' % var
229 command = environ + command
231 # If the command runs asynchronous, pre upload the command
232 # to the app.sh file in the remote host
233 dst = os.path.join(self.app_home, "app.sh")
234 command = self.replace_paths(command)
235 self.node.upload(command, dst, text = True)
237 super(LinuxApplication, self).provision()
239 def upload_sources(self):
240 # TODO: check if sources need to be uploaded and upload them
241 sources = self.get("sources")
243 self.info(" Uploading sources ")
245 # create dir for sources
246 self.node.mkdir(self.src_dir)
248 sources = sources.split(' ')
250 http_sources = list()
251 for source in list(sources):
252 if source.startswith("http") or source.startswith("https"):
253 http_sources.append(source)
254 sources.remove(source)
256 # Download http sources
258 cmd = " wget -c --directory-prefix=${SOURCES} "
261 for source in http_sources:
262 cmd += " %s " % (source)
263 verif += " ls ${SOURCES}/%s ;" % os.path.basename(source)
265 # Wget output goes to stderr :S
266 cmd += " 2> /dev/null ; "
269 cmd += " %s " % verif
271 # Upload the command to a file, and execute asynchronously
272 self.upload_and_run(cmd,
273 "http_sources.sh", "http_sources_pid",
274 "http_sources_out", "http_sources_err")
276 self.node.upload(sources, self.src_dir)
278 def upload_code(self):
279 code = self.get("code")
281 # create dir for sources
282 self.node.mkdir(self.src_dir)
284 self.info(" Uploading code ")
286 dst = os.path.join(self.src_dir, "code")
287 self.node.upload(sources, dst, text = True)
289 def upload_stdin(self):
290 stdin = self.get("stdin")
292 # create dir for sources
293 self.info(" Uploading stdin ")
295 dst = os.path.join(self.app_home, "stdin")
296 self.node.upload(stdin, dst, text = True)
298 def install_dependencies(self):
299 depends = self.get("depends")
301 self.info(" Installing dependencies %s" % depends)
302 self.node.install_packages(depends, home = self.app_home)
305 build = self.get("build")
307 self.info(" Building sources ")
309 # create dir for build
310 self.node.mkdir(self.build_dir)
312 # Upload the command to a file, and execute asynchronously
313 self.upload_and_run(build,
314 "build.sh", "build_pid",
315 "build_out", "build_err")
318 install = self.get("install")
320 self.info(" Installing sources ")
322 # Upload the command to a file, and execute asynchronously
323 self.upload_and_run(install,
324 "install.sh", "install_pid",
325 "install_out", "install_err")
328 # Wait until node is associated and deployed
330 if not node or node.state < ResourceState.READY:
331 self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
332 self.ec.schedule(reschedule_delay, self.deploy)
335 command = self.get("command") or ""
336 self.info("Deploying command '%s' " % command)
340 self._state = ResourceState.FAILED
343 super(LinuxApplication, self).deploy()
346 command = self.get('command')
347 env = self.get('env')
348 stdin = 'stdin' if self.get('stdin') else None
349 stdout = 'stdout' if self.get('stdout') else 'stdout'
350 stderr = 'stderr' if self.get('stderr') else 'stderr'
351 sudo = self.get('sudo') or False
352 x11 = self.get('forwardX11') or False
355 super(LinuxApplication, self).start()
358 self.info("No command to start ")
359 self._state = ResourceState.FINISHED
362 self.info("Starting command '%s'" % command)
368 for var in env.split(" "):
369 environ += ' %s ' % var
371 command = "(" + environ + " ; " + command + ")"
372 command = self.replace_paths(command)
374 # If the command requires X11 forwarding, we
375 # can't run it asynchronously
376 (out, err), proc = self.node.execute(command,
381 self._state = ResourceState.FINISHED
383 if proc.poll() and err:
386 # Command was previously uploaded, now run the remote
387 # bash file asynchronously
388 cmd = "bash ./app.sh"
389 (out, err), proc = self.node.run(cmd, self.app_home,
395 if proc.poll() and err:
399 pid, ppid = self.node.wait_pid(home = self.app_home)
400 if pid: self._pid = int(pid)
401 if ppid: self._ppid = int(ppid)
403 if not self.pid or not self.ppid:
406 (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr')
408 if failed or out or chkerr:
409 # check if execution errors occurred
410 msg = " Failed to start command '%s' " % command
417 self.error(msg, out, err)
419 msg2 = " Setting state to Failed"
421 self._state = ResourceState.FAILED
423 raise RuntimeError, msg
426 command = self.get('command') or ''
429 if state == ResourceState.STARTED:
430 self.info("Stopping command '%s'" % command)
432 (out, err), proc = self.node.kill(self.pid, self.ppid)
435 # check if execution errors occurred
436 msg = " Failed to STOP command '%s' " % self.get("command")
437 self.error(msg, out, err)
438 self._state = ResourceState.FAILED
441 super(LinuxApplication, self).stop()
444 self.info("Releasing resource")
446 tear_down = self.get("tearDown")
448 self.node.execute(tear_down)
451 if self.state == ResourceState.STOPPED:
452 super(LinuxApplication, self).release()
456 if self._state == ResourceState.STARTED:
457 # To avoid overwhelming the remote hosts and the local processor
458 # with too many ssh queries, the state is only requested
459 # every 'state_check_delay' .
460 if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
461 # check if execution errors occurred
462 (out, err), proc = self.node.check_output(self.app_home, 'stderr')
465 if err.find("No such file or directory") >= 0 :
466 # The resource is marked as started, but the
467 # command was not yet executed
468 return ResourceState.READY
470 msg = " Failed to execute command '%s'" % self.get("command")
471 self.error(msg, out, err)
472 self._state = ResourceState.FAILED
474 elif self.pid and self.ppid:
475 status = self.node.status(self.pid, self.ppid)
477 if status == sshfuncs.FINISHED:
478 self._state = ResourceState.FINISHED
481 self._last_state_check = strfnow()
485 def upload_and_run(self, cmd, fname, pidfile, outfile, errfile):
486 dst = os.path.join(self.app_home, fname)
487 cmd = self.replace_paths(cmd)
488 self.node.upload(cmd, dst, text = True)
490 cmd = "bash ./%s" % fname
491 (out, err), proc = self.node.run_and_wait(cmd, self.app_home,
495 raise_on_error = True)
497 def replace_paths(self, command):
499 Replace all special path tags with shell-escaped actual paths.
502 return d if d.startswith("/") else os.path.join("${HOME}", d)
505 .replace("${SOURCES}", absolute_dir(self.src_dir))
506 .replace("${BUILD}", absolute_dir(self.build_dir))
507 .replace("${APP_HOME}", absolute_dir(self.app_home))
508 .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
509 .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
512 def valid_connection(self, guid):
515 # XXX: What if it is connected to more than one node?
516 resources = self.find_resources(exact_tags = [tags.NODE])
517 self._node = resources[0] if len(resources) == 1 else None