2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sshfuncs import ProcStatus
25 from nepi.util.timefuncs import strfnow, strfdiff
29 # TODO: Resolve wildcards in commands!!
33 class LinuxApplication(ResourceManager):
34 _rtype = "LinuxApplication"
37 def _register_attributes(cls):
38 command = Attribute("command", "Command to execute",
39 flags = Flags.ExecReadOnly)
40 forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections",
41 flags = Flags.ExecReadOnly)
42 env = Attribute("env", "Environment variables string for command execution",
43 flags = Flags.ExecReadOnly)
44 sudo = Attribute("sudo", "Run with root privileges",
45 flags = Flags.ExecReadOnly)
46 depends = Attribute("depends",
47 "Space-separated list of packages required to run the application",
48 flags = Flags.ExecReadOnly)
49 sources = Attribute("sources",
50 "Space-separated list of regular files to be deployed in the working "
51 "path prior to building. Archives won't be expanded automatically.",
52 flags = Flags.ExecReadOnly)
53 code = Attribute("code",
54 "Plain text source code to be uploaded to the server. It will be stored "
55 "under ${SOURCES}/code",
56 flags = Flags.ExecReadOnly)
57 build = Attribute("build",
58 "Build commands to execute after deploying the sources. "
59 "Sources will be in the ${SOURCES} folder. "
60 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
61 "Try to make the commands return with a nonzero exit code on error.\n"
62 "Also, do not install any programs here, use the 'install' attribute. This will "
63 "help keep the built files constrained to the build folder (which may "
64 "not be the home folder), and will result in faster deployment. Also, "
65 "make sure to clean up temporary files, to reduce bandwidth usage between "
66 "nodes when transferring built packages.",
67 flags = Flags.ReadOnly)
68 install = Attribute("install",
69 "Commands to transfer built files to their final destinations. "
70 "Sources will be in the initial working folder, and a special "
71 "tag ${SOURCES} can be used to reference the experiment's "
72 "home folder (where the application commands will run).\n"
73 "ALL sources and targets needed for execution must be copied there, "
74 "if building has been enabled.\n"
75 "That is, 'slave' nodes will not automatically get any source files. "
76 "'slave' nodes don't get build dependencies either, so if you need "
77 "make and other tools to install, be sure to provide them as "
78 "actual dependencies instead.",
79 flags = Flags.ReadOnly)
80 stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
81 stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
82 stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
83 tear_down = Attribute("tearDown", "Bash script to be executed before "
84 "releasing the resource",
85 flags = Flags.ReadOnly)
87 cls._register_attribute(command)
88 cls._register_attribute(forward_x11)
89 cls._register_attribute(env)
90 cls._register_attribute(sudo)
91 cls._register_attribute(depends)
92 cls._register_attribute(sources)
93 cls._register_attribute(code)
94 cls._register_attribute(build)
95 cls._register_attribute(install)
96 cls._register_attribute(stdin)
97 cls._register_attribute(stdout)
98 cls._register_attribute(stderr)
99 cls._register_attribute(tear_down)
102 def _register_traces(cls):
103 stdout = Trace("stdout", "Standard output stream")
104 stderr = Trace("stderr", "Standard error stream")
105 buildlog = Trace("buildlog", "Output of the build process")
107 cls._register_trace(stdout)
108 cls._register_trace(stderr)
109 cls._register_trace(buildlog)
111 def __init__(self, ec, guid):
112 super(LinuxApplication, self).__init__(ec, guid)
115 self._home = "app-%s" % self.guid
117 # timestamp of last state check of the application
118 self._last_state_check = strfnow()
120 def log_message(self, msg):
121 return " guid %d - host %s - %s " % (self.guid,
122 self.node.get("hostname"), msg)
126 node = self.get_connected(LinuxNode.rtype())
127 if node: return node[0]
132 return os.path.join(self.node.exp_home, self._home)
136 return os.path.join(self.app_home, 'src')
140 return os.path.join(self.app_home, 'build')
150 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
151 self.info("Retrieving '%s' trace %s " % (name, attr))
153 path = os.path.join(self.app_home, name)
155 command = "(test -f %s && echo 'success') || echo 'error'" % path
156 (out, err), proc = self.node.execute(command)
158 if (err and proc.poll()) or out.find("error") != -1:
159 msg = " Couldn't find trace %s " % name
160 self.error(msg, out, err)
163 if attr == TraceAttr.PATH:
166 if attr == TraceAttr.ALL:
167 (out, err), proc = self.node.check_output(self.app_home, name)
169 if err and proc.poll():
170 msg = " Couldn't read trace %s " % name
171 self.error(msg, out, err)
176 if attr == TraceAttr.STREAM:
177 cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
178 elif attr == TraceAttr.SIZE:
179 cmd = "stat -c%%s %s " % path
181 (out, err), proc = self.node.execute(cmd)
183 if err and proc.poll():
184 msg = " Couldn't find trace %s " % name
185 self.error(msg, out, err)
188 if attr == TraceAttr.SIZE:
189 out = int(out.strip())
194 # create home dir for application
195 self.node.mkdir(self.app_home)
198 self.upload_sources()
206 # install dependencies
207 self.install_dependencies()
216 command = self.get("command")
217 x11 = self.get("forwardX11")
218 env = self.get("env")
220 if command and not x11:
221 self.info("Uploading command '%s'" % command)
223 # replace application specific paths in the command
224 command = self.replace_paths(command)
226 self.node.upload_command(command, self.app_home,
230 super(LinuxApplication, self).provision()
232 def upload_sources(self):
233 # TODO: check if sources need to be uploaded and upload them
234 sources = self.get("sources")
236 self.info(" Uploading sources ")
238 # create dir for sources
239 self.node.mkdir(self.src_dir)
241 sources = sources.split(' ')
243 http_sources = list()
244 for source in list(sources):
245 if source.startswith("http") or source.startswith("https"):
246 http_sources.append(source)
247 sources.remove(source)
249 # Download http sources remotely
251 command = " wget -c --directory-prefix=${SOURCES} "
254 for source in http_sources:
255 command += " %s " % (source)
256 check += " ls ${SOURCES}/%s ;" % os.path.basename(source)
258 # Append the command to check that the sources were downloaded
259 command += " ; %s " % check
261 # replace application specific paths in the command
262 command = self.replace_paths(command)
264 # Upload the command to a file, and execute asynchronously
265 self.node.run_and_wait(command, self.app_home,
266 shfile = "http_sources.sh",
267 pidfile = "http_sources_pidfile",
268 ecodefile = "http_sources_exitcode",
269 stdout = "http_sources_stdout",
270 stderr = "http_sources_stderr")
273 self.node.upload(sources, self.src_dir)
275 def upload_code(self):
276 code = self.get("code")
278 # create dir for sources
279 self.node.mkdir(self.src_dir)
281 self.info(" Uploading code ")
283 dst = os.path.join(self.src_dir, "code")
284 self.node.upload(sources, dst, text = True)
286 def upload_stdin(self):
287 stdin = self.get("stdin")
289 # create dir for sources
290 self.info(" Uploading stdin ")
292 dst = os.path.join(self.app_home, "stdin")
293 self.node.upload(stdin, dst, text = True)
295 def install_dependencies(self):
296 depends = self.get("depends")
298 self.info(" Installing dependencies %s" % depends)
299 self.node.install_packages(depends, self.app_home)
302 build = self.get("build")
304 self.info(" Building sources ")
306 # create dir for build
307 self.node.mkdir(self.build_dir)
309 # replace application specific paths in the command
310 command = self.replace_paths(command)
312 # Upload the command to a file, and execute asynchronously
313 self.node.run_and_wait(command, self.app_home,
315 pidfile = "build_pidfile",
316 ecodefile = "build_exitcode",
317 stdout = "build_stdout",
318 stderr = "build_stderr")
321 install = self.get("install")
323 self.info(" Installing sources ")
325 # replace application specific paths in the command
326 command = self.replace_paths(command)
328 # Upload the command to a file, and execute asynchronously
329 self.node.run_and_wait(command, self.app_home,
330 shfile = "install.sh",
331 pidfile = "install_pidfile",
332 ecodefile = "install_exitcode",
333 stdout = "install_stdout",
334 stderr = "install_stderr")
337 # Wait until node is associated and deployed
339 if not node or node.state < ResourceState.READY:
340 self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
342 reschedule_delay = "0.5s"
343 self.ec.schedule(reschedule_delay, self.deploy)
346 command = self.get("command") or ""
347 self.info("Deploying command '%s' " % command)
351 self._state = ResourceState.FAILED
354 super(LinuxApplication, self).deploy()
357 command = self.get('command')
358 env = self.get('env')
359 stdin = 'stdin' if self.get('stdin') else None
360 stdout = 'stdout' if self.get('stdout') else 'stdout'
361 stderr = 'stderr' if self.get('stderr') else 'stderr'
362 sudo = self.get('sudo') or False
363 x11 = self.get('forwardX11') or False
367 # If no command was given, then the application
368 # is directly marked as FINISHED
369 self._state = ResourceState.FINISHED
371 super(LinuxApplication, self).start()
373 self.info("Starting command '%s'" % command)
376 # If X11 forwarding was specified, then the application
377 # can not run detached, so instead of invoking asynchronous
378 # 'run' we invoke synchronous 'execute'.
380 msg = "No command is defined but X11 forwarding has been set"
382 self._state = ResourceState.FAILED
383 raise RuntimeError, msg
388 for var in env.split(" "):
389 environ += ' %s ' % var
391 command = "(" + environ + " ; " + command + ")"
392 command = self.replace_paths(command)
394 # If the command requires X11 forwarding, we
395 # can't run it asynchronously
396 (out, err), proc = self.node.execute(command,
401 self._state = ResourceState.FINISHED
403 if proc.poll() and err:
406 # Command was previously uploaded, now run the remote
407 # bash file asynchronously
408 cmd = "bash ./app.sh"
409 (out, err), proc = self.node.run(cmd, self.app_home,
415 # check if execution errors occurred
416 msg = " Failed to start command '%s' " % command
418 if proc.poll() and err:
419 self.error(msg, out, err)
420 raise RuntimeError, msg
422 # Check status of process running in background
423 pid, ppid = self.node.wait_pid(self.app_home)
424 if pid: self._pid = int(pid)
425 if ppid: self._ppid = int(ppid)
427 # If the process is not running, check for error information
428 # on the remote machine
429 if not self.pid or not self.ppid:
430 (out, err), proc = self.node.check_output(self.app_home, 'stderr')
431 self.error(msg, out, err)
433 msg2 = " Setting state to Failed"
435 self._state = ResourceState.FAILED
437 raise RuntimeError, msg
440 command = self.get('command') or ''
443 if state == ResourceState.STARTED:
444 self.info("Stopping command '%s'" % command)
446 (out, err), proc = self.node.kill(self.pid, self.ppid)
449 # check if execution errors occurred
450 msg = " Failed to STOP command '%s' " % self.get("command")
451 self.error(msg, out, err)
452 self._state = ResourceState.FAILED
455 super(LinuxApplication, self).stop()
458 self.info("Releasing resource")
460 tear_down = self.get("tearDown")
462 self.node.execute(tear_down)
465 if self.state == ResourceState.STOPPED:
466 super(LinuxApplication, self).release()
470 if self._state == ResourceState.STARTED:
471 # To avoid overwhelming the remote hosts and the local processor
472 # with too many ssh queries, the state is only requested
473 # every 'state_check_delay' seconds.
474 state_check_delay = 0.5
475 if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
476 # check if execution errors occurred
477 (out, err), proc = self.node.check_errors(self.app_home)
480 if err.find("No such file or directory") >= 0 :
481 # The resource is marked as started, but the
482 # command was not yet executed
483 return ResourceState.READY
485 msg = " Failed to execute command '%s'" % self.get("command")
486 self.error(msg, out, err)
487 self._state = ResourceState.FAILED
489 elif self.pid and self.ppid:
490 status = self.node.status(self.pid, self.ppid)
492 if status == ProcStatus.FINISHED:
493 self._state = ResourceState.FINISHED
496 self._last_state_check = strfnow()
500 def replace_paths(self, command):
502 Replace all special path tags with shell-escaped actual paths.
505 return d if d.startswith("/") else os.path.join("${HOME}", d)
508 .replace("${SOURCES}", absolute_dir(self.src_dir))
509 .replace("${BUILD}", absolute_dir(self.build_dir))
510 .replace("${APP_HOME}", absolute_dir(self.app_home))
511 .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
512 .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
515 def valid_connection(self, guid):
518 # XXX: What if it is connected to more than one node?
519 resources = self.find_resources(exact_tags = [tags.NODE])
520 self._node = resources[0] if len(resources) == 1 else None