2 NEPI, a framework to manage network experiments
3 Copyright (C) 2013 INRIA
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util import sshfuncs
25 from nepi.util.timefuncs import strfnow, strfdiff
30 reschedule_delay = "0.5s"
33 # TODO: Resolve wildcards in commands!!
36 class LinuxApplication(ResourceManager):
37 _rtype = "LinuxApplication"
40 def _register_attributes(cls):
41 command = Attribute("command", "Command to execute",
42 flags = Flags.ExecReadOnly)
43 forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections",
44 flags = Flags.ExecReadOnly)
45 env = Attribute("env", "Environment variables string for command execution",
46 flags = Flags.ExecReadOnly)
47 sudo = Attribute("sudo", "Run with root privileges",
48 flags = Flags.ExecReadOnly)
49 depends = Attribute("depends",
50 "Space-separated list of packages required to run the application",
51 flags = Flags.ExecReadOnly)
52 sources = Attribute("sources",
53 "Space-separated list of regular files to be deployed in the working "
54 "path prior to building. Archives won't be expanded automatically.",
55 flags = Flags.ExecReadOnly)
56 code = Attribute("code",
57 "Plain text source code to be uploaded to the server. It will be stored "
58 "under ${SOURCES}/code",
59 flags = Flags.ExecReadOnly)
60 build = Attribute("build",
61 "Build commands to execute after deploying the sources. "
62 "Sources will be in the ${SOURCES} folder. "
63 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
64 "Try to make the commands return with a nonzero exit code on error.\n"
65 "Also, do not install any programs here, use the 'install' attribute. This will "
66 "help keep the built files constrained to the build folder (which may "
67 "not be the home folder), and will result in faster deployment. Also, "
68 "make sure to clean up temporary files, to reduce bandwidth usage between "
69 "nodes when transferring built packages.",
70 flags = Flags.ReadOnly)
71 install = Attribute("install",
72 "Commands to transfer built files to their final destinations. "
73 "Sources will be in the initial working folder, and a special "
74 "tag ${SOURCES} can be used to reference the experiment's "
75 "home folder (where the application commands will run).\n"
76 "ALL sources and targets needed for execution must be copied there, "
77 "if building has been enabled.\n"
78 "That is, 'slave' nodes will not automatically get any source files. "
79 "'slave' nodes don't get build dependencies either, so if you need "
80 "make and other tools to install, be sure to provide them as "
81 "actual dependencies instead.",
82 flags = Flags.ReadOnly)
83 stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
84 stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
85 stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
86 tear_down = Attribute("tearDown", "Bash script to be executed before "
87 "releasing the resource",
88 flags = Flags.ReadOnly)
90 cls._register_attribute(command)
91 cls._register_attribute(forward_x11)
92 cls._register_attribute(env)
93 cls._register_attribute(sudo)
94 cls._register_attribute(depends)
95 cls._register_attribute(sources)
96 cls._register_attribute(code)
97 cls._register_attribute(build)
98 cls._register_attribute(install)
99 cls._register_attribute(stdin)
100 cls._register_attribute(stdout)
101 cls._register_attribute(stderr)
102 cls._register_attribute(tear_down)
105 def _register_traces(cls):
106 stdout = Trace("stdout", "Standard output stream")
107 stderr = Trace("stderr", "Standard error stream")
108 buildlog = Trace("buildlog", "Output of the build process")
110 cls._register_trace(stdout)
111 cls._register_trace(stderr)
112 cls._register_trace(buildlog)
114 def __init__(self, ec, guid):
115 super(LinuxApplication, self).__init__(ec, guid)
118 self._home = "app-%s" % self.guid
120 # timestamp of last state check of the application
121 self._last_state_check = strfnow()
123 self._logger = logging.getLogger("LinuxApplication")
125 def log_message(self, msg):
126 return " guid %d - host %s - %s " % (self.guid,
127 self.node.get("hostname"), msg)
131 node = self.get_connected(LinuxNode.rtype())
132 if node: return node[0]
137 return os.path.join(self.node.exp_home, self._home)
141 return os.path.join(self.app_home, 'src')
145 return os.path.join(self.app_home, 'build')
155 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
156 self.info("Retrieving '%s' trace %s " % (name, attr))
158 path = os.path.join(self.app_home, name)
160 command = "(test -f %s && echo 'success') || echo 'error'" % path
161 (out, err), proc = self.node.execute(command)
163 if (err and proc.poll()) or out.find("error") != -1:
164 msg = " Couldn't find trace %s " % name
165 self.error(msg, out, err)
168 if attr == TraceAttr.PATH:
171 if attr == TraceAttr.ALL:
172 (out, err), proc = self.node.check_output(self.app_home, name)
174 if err and proc.poll():
175 msg = " Couldn't read trace %s " % name
176 self.error(msg, out, err)
181 if attr == TraceAttr.STREAM:
182 cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
183 elif attr == TraceAttr.SIZE:
184 cmd = "stat -c%%s %s " % path
186 (out, err), proc = self.node.execute(cmd)
188 if err and proc.poll():
189 msg = " Couldn't find trace %s " % name
190 self.error(msg, out, err)
193 if attr == TraceAttr.SIZE:
194 out = int(out.strip())
198 def provision(self, filters = None):
199 # create home dir for application
200 self.node.mkdir(self.app_home)
203 self.upload_sources()
211 # install dependencies
212 self.install_dependencies()
220 command = self.get("command")
221 x11 = self.get("forwardX11")
222 if not x11 and command:
223 self.info("Uploading command '%s'" % command)
227 env = self.get("env") or ""
228 for var in env.split(" "):
229 environ += 'export %s\n' % var
231 command = environ + command
233 # If the command runs asynchronous, pre upload the command
234 # to the app.sh file in the remote host
235 dst = os.path.join(self.app_home, "app.sh")
236 command = self.replace_paths(command)
237 self.node.upload(command, dst, text = True)
239 super(LinuxApplication, self).provision()
241 def upload_sources(self):
242 # TODO: check if sources need to be uploaded and upload them
243 sources = self.get("sources")
245 self.info(" Uploading sources ")
247 # create dir for sources
248 self.node.mkdir(self.src_dir)
250 sources = sources.split(' ')
252 http_sources = list()
253 for source in list(sources):
254 if source.startswith("http") or source.startswith("https"):
255 http_sources.append(source)
256 sources.remove(source)
258 # Download http sources
260 cmd = " wget -c --directory-prefix=${SOURCES} "
263 for source in http_sources:
264 cmd += " %s " % (source)
265 verif += " ls ${SOURCES}/%s ;" % os.path.basename(source)
267 # Wget output goes to stderr :S
268 cmd += " 2> /dev/null ; "
271 cmd += " %s " % verif
273 # Upload the command to a file, and execute asynchronously
274 self.upload_and_run(cmd,
275 "http_sources.sh", "http_sources_pid",
276 "http_sources_out", "http_sources_err")
278 self.node.upload(sources, self.src_dir)
280 def upload_code(self):
281 code = self.get("code")
283 # create dir for sources
284 self.node.mkdir(self.src_dir)
286 self.info(" Uploading code ")
288 dst = os.path.join(self.src_dir, "code")
289 self.node.upload(sources, dst, text = True)
291 def upload_stdin(self):
292 stdin = self.get("stdin")
294 # create dir for sources
295 self.info(" Uploading stdin ")
297 dst = os.path.join(self.app_home, "stdin")
298 self.node.upload(stdin, dst, text = True)
300 def install_dependencies(self):
301 depends = self.get("depends")
303 self.info(" Installing dependencies %s" % depends)
304 self.node.install_packages(depends, home = self.app_home)
307 build = self.get("build")
309 self.info(" Building sources ")
311 # create dir for build
312 self.node.mkdir(self.build_dir)
314 # Upload the command to a file, and execute asynchronously
315 self.upload_and_run(build,
316 "build.sh", "build_pid",
317 "build_out", "build_err")
320 install = self.get("install")
322 self.info(" Installing sources ")
324 # Upload the command to a file, and execute asynchronously
325 self.upload_and_run(install,
326 "install.sh", "install_pid",
327 "install_out", "install_err")
330 # Wait until node is associated and deployed
332 if not node or node.state < ResourceState.READY:
333 self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
334 self.ec.schedule(reschedule_delay, self.deploy)
337 command = self.get("command") or ""
338 self.info(" Deploying command '%s' " % command)
342 self._state = ResourceState.FAILED
345 super(LinuxApplication, self).deploy()
348 command = self.get('command')
349 env = self.get('env')
350 stdin = 'stdin' if self.get('stdin') else None
351 stdout = 'stdout' if self.get('stdout') else 'stdout'
352 stderr = 'stderr' if self.get('stderr') else 'stderr'
353 sudo = self.get('sudo') or False
354 x11 = self.get('forwardX11') or False
357 super(LinuxApplication, self).start()
360 self.info("No command to start ")
361 self._state = ResourceState.FINISHED
364 self.info("Starting command '%s'" % command)
370 for var in env.split(" "):
371 environ += ' %s ' % var
373 command = "(" + environ + " ; " + command + ")"
374 command = self.replace_paths(command)
376 # If the command requires X11 forwarding, we
377 # can't run it asynchronously
378 (out, err), proc = self.node.execute(command,
383 self._state = ResourceState.FINISHED
385 if proc.poll() and err:
388 # Command was previously uploaded, now run the remote
389 # bash file asynchronously
390 cmd = "bash ./app.sh"
391 (out, err), proc = self.node.run(cmd, self.app_home,
397 if proc.poll() and err:
401 pid, ppid = self.node.wait_pid(home = self.app_home)
402 if pid: self._pid = int(pid)
403 if ppid: self._ppid = int(ppid)
405 if not self.pid or not self.ppid:
408 (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr')
410 if failed or out or chkerr:
411 # check if execution errors occurred
412 msg = " Failed to start command '%s' " % command
419 self.error(msg, out, err)
421 msg2 = " Setting state to Failed"
423 self._state = ResourceState.FAILED
425 raise RuntimeError, msg
428 command = self.get('command') or ''
431 if state == ResourceState.STARTED:
432 self.info("Stopping command '%s'" % command)
434 (out, err), proc = self.node.kill(self.pid, self.ppid)
437 # check if execution errors occurred
438 msg = " Failed to STOP command '%s' " % self.get("command")
439 self.error(msg, out, err)
440 self._state = ResourceState.FAILED
443 super(LinuxApplication, self).stop()
446 self.info("Releasing resource")
448 tear_down = self.get("tearDown")
450 self.node.execute(tear_down)
453 if self.state == ResourceState.STOPPED:
454 super(LinuxApplication, self).release()
458 if self._state == ResourceState.STARTED:
459 # To avoid overwhelming the remote hosts and the local processor
460 # with too many ssh queries, the state is only requested
461 # every 'state_check_delay' .
462 if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
463 # check if execution errors occurred
464 (out, err), proc = self.node.check_output(self.app_home, 'stderr')
467 if err.find("No such file or directory") >= 0 :
468 # The resource is marked as started, but the
469 # command was not yet executed
470 return ResourceState.READY
472 msg = " Failed to execute command '%s'" % self.get("command")
473 self.error(msg, out, err)
474 self._state = ResourceState.FAILED
476 elif self.pid and self.ppid:
477 status = self.node.status(self.pid, self.ppid)
479 if status == sshfuncs.FINISHED:
480 self._state = ResourceState.FINISHED
483 self._last_state_check = strfnow()
487 def upload_and_run(self, cmd, fname, pidfile, outfile, errfile):
488 dst = os.path.join(self.app_home, fname)
489 cmd = self.replace_paths(cmd)
490 self.node.upload(cmd, dst, text = True)
492 cmd = "bash ./%s" % fname
493 (out, err), proc = self.node.run_and_wait(cmd, self.app_home,
497 raise_on_error = True)
499 def replace_paths(self, command):
501 Replace all special path tags with shell-escaped actual paths.
504 return d if d.startswith("/") else os.path.join("${HOME}", d)
507 .replace("${SOURCES}", absolute_dir(self.src_dir))
508 .replace("${BUILD}", absolute_dir(self.build_dir))
509 .replace("${APP_HOME}", absolute_dir(self.app_home))
510 .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
511 .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
514 def valid_connection(self, guid):
517 # XXX: What if it is connected to more than one node?
518 resources = self.find_resources(exact_tags = [tags.NODE])
519 self._node = resources[0] if len(resources) == 1 else None