2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sshfuncs import ProcStatus
25 from nepi.util.timefuncs import strfnow, strfdiff
29 # TODO: Resolve wildcards in commands!!
33 class LinuxApplication(ResourceManager):
34 _rtype = "LinuxApplication"
37 def _register_attributes(cls):
38 command = Attribute("command", "Command to execute",
39 flags = Flags.ExecReadOnly)
40 forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections",
41 flags = Flags.ExecReadOnly)
42 env = Attribute("env", "Environment variables string for command execution",
43 flags = Flags.ExecReadOnly)
44 sudo = Attribute("sudo", "Run with root privileges",
45 flags = Flags.ExecReadOnly)
46 depends = Attribute("depends",
47 "Space-separated list of packages required to run the application",
48 flags = Flags.ExecReadOnly)
49 sources = Attribute("sources",
50 "Space-separated list of regular files to be deployed in the working "
51 "path prior to building. Archives won't be expanded automatically.",
52 flags = Flags.ExecReadOnly)
53 code = Attribute("code",
54 "Plain text source code to be uploaded to the server. It will be stored "
55 "under ${SOURCES}/code",
56 flags = Flags.ExecReadOnly)
57 build = Attribute("build",
58 "Build commands to execute after deploying the sources. "
59 "Sources will be in the ${SOURCES} folder. "
60 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
61 "Try to make the commands return with a nonzero exit code on error.\n"
62 "Also, do not install any programs here, use the 'install' attribute. This will "
63 "help keep the built files constrained to the build folder (which may "
64 "not be the home folder), and will result in faster deployment. Also, "
65 "make sure to clean up temporary files, to reduce bandwidth usage between "
66 "nodes when transferring built packages.",
67 flags = Flags.ReadOnly)
68 install = Attribute("install",
69 "Commands to transfer built files to their final destinations. "
70 "Sources will be in the initial working folder, and a special "
71 "tag ${SOURCES} can be used to reference the experiment's "
72 "home folder (where the application commands will run).\n"
73 "ALL sources and targets needed for execution must be copied there, "
74 "if building has been enabled.\n"
75 "That is, 'slave' nodes will not automatically get any source files. "
76 "'slave' nodes don't get build dependencies either, so if you need "
77 "make and other tools to install, be sure to provide them as "
78 "actual dependencies instead.",
79 flags = Flags.ReadOnly)
80 stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
81 stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
82 stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
83 tear_down = Attribute("tearDown", "Bash script to be executed before "
84 "releasing the resource",
85 flags = Flags.ReadOnly)
87 cls._register_attribute(command)
88 cls._register_attribute(forward_x11)
89 cls._register_attribute(env)
90 cls._register_attribute(sudo)
91 cls._register_attribute(depends)
92 cls._register_attribute(sources)
93 cls._register_attribute(code)
94 cls._register_attribute(build)
95 cls._register_attribute(install)
96 cls._register_attribute(stdin)
97 cls._register_attribute(stdout)
98 cls._register_attribute(stderr)
99 cls._register_attribute(tear_down)
102 def _register_traces(cls):
103 stdout = Trace("stdout", "Standard output stream")
104 stderr = Trace("stderr", "Standard error stream")
106 cls._register_trace(stdout)
107 cls._register_trace(stderr)
109 def __init__(self, ec, guid):
110 super(LinuxApplication, self).__init__(ec, guid)
113 self._home = "app-%s" % self.guid
115 # timestamp of last state check of the application
116 self._last_state_check = strfnow()
118 def log_message(self, msg):
119 return " guid %d - host %s - %s " % (self.guid,
120 self.node.get("hostname"), msg)
124 node = self.get_connected(LinuxNode.rtype())
125 if node: return node[0]
130 return os.path.join(self.node.exp_home, self._home)
134 return os.path.join(self.app_home, 'src')
138 return os.path.join(self.app_home, 'build')
148 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
149 self.info("Retrieving '%s' trace %s " % (name, attr))
151 path = os.path.join(self.app_home, name)
153 command = "(test -f %s && echo 'success') || echo 'error'" % path
154 (out, err), proc = self.node.execute(command)
156 if (err and proc.poll()) or out.find("error") != -1:
157 msg = " Couldn't find trace %s " % name
158 self.error(msg, out, err)
161 if attr == TraceAttr.PATH:
164 if attr == TraceAttr.ALL:
165 (out, err), proc = self.node.check_output(self.app_home, name)
167 if err and proc.poll():
168 msg = " Couldn't read trace %s " % name
169 self.error(msg, out, err)
174 if attr == TraceAttr.STREAM:
175 cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
176 elif attr == TraceAttr.SIZE:
177 cmd = "stat -c%%s %s " % path
179 (out, err), proc = self.node.execute(cmd)
181 if err and proc.poll():
182 msg = " Couldn't find trace %s " % name
183 self.error(msg, out, err)
186 if attr == TraceAttr.SIZE:
187 out = int(out.strip())
192 # create home dir for application
193 self.node.mkdir(self.app_home)
196 self.upload_sources()
204 # install dependencies
205 self.install_dependencies()
214 command = self.get("command")
215 x11 = self.get("forwardX11")
216 env = self.get("env")
218 if command and not x11:
219 self.info("Uploading command '%s'" % command)
221 # replace application specific paths in the command
222 command = self.replace_paths(command)
223 env = env and self.replace_paths(env)
225 self.node.upload_command(command, self.app_home,
229 super(LinuxApplication, self).provision()
231 def upload_sources(self):
232 # TODO: check if sources need to be uploaded and upload them
233 sources = self.get("sources")
235 self.info(" Uploading sources ")
237 # create dir for sources
238 self.node.mkdir(self.src_dir)
240 sources = sources.split(' ')
242 http_sources = list()
243 for source in list(sources):
244 if source.startswith("http") or source.startswith("https"):
245 http_sources.append(source)
246 sources.remove(source)
248 # Download http sources remotely
250 command = [" wget -c --directory-prefix=${SOURCES} "]
253 for source in http_sources:
254 command.append(" %s " % (source))
255 check.append(" ls ${SOURCES}/%s " % os.path.basename(source))
257 command = " ".join(command)
258 check = " ; ".join(check)
260 # Append the command to check that the sources were downloaded
261 command += " ; %s " % check
263 # replace application specific paths in the command
264 command = self.replace_paths(command)
266 # Upload the command to a file, and execute asynchronously
267 self.node.run_and_wait(command, self.app_home,
268 shfile = "http_sources.sh",
269 pidfile = "http_sources_pidfile",
270 ecodefile = "http_sources_exitcode",
271 stdout = "http_sources_stdout",
272 stderr = "http_sources_stderr")
275 self.node.upload(sources, self.src_dir)
277 def upload_code(self):
278 code = self.get("code")
280 # create dir for sources
281 self.node.mkdir(self.src_dir)
283 self.info(" Uploading code ")
285 dst = os.path.join(self.src_dir, "code")
286 self.node.upload(sources, dst, text = True)
288 def upload_stdin(self):
289 stdin = self.get("stdin")
291 # create dir for sources
292 self.info(" Uploading stdin ")
294 dst = os.path.join(self.app_home, "stdin")
295 self.node.upload(stdin, dst, text = True)
297 def install_dependencies(self):
298 depends = self.get("depends")
300 self.info(" Installing dependencies %s" % depends)
301 self.node.install_packages(depends, self.app_home)
304 build = self.get("build")
306 self.info(" Building sources ")
308 # create dir for build
309 self.node.mkdir(self.build_dir)
311 # replace application specific paths in the command
312 command = self.replace_paths(build)
314 # Upload the command to a file, and execute asynchronously
315 self.node.run_and_wait(command, self.app_home,
317 pidfile = "build_pidfile",
318 ecodefile = "build_exitcode",
319 stdout = "build_stdout",
320 stderr = "build_stderr")
323 install = self.get("install")
325 self.info(" Installing sources ")
327 # replace application specific paths in the command
328 command = self.replace_paths(install)
330 # Upload the command to a file, and execute asynchronously
331 self.node.run_and_wait(command, self.app_home,
332 shfile = "install.sh",
333 pidfile = "install_pidfile",
334 ecodefile = "install_exitcode",
335 stdout = "install_stdout",
336 stderr = "install_stderr")
339 # Wait until node is associated and deployed
341 if not node or node.state < ResourceState.READY:
342 self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
344 reschedule_delay = "0.5s"
345 self.ec.schedule(reschedule_delay, self.deploy)
348 command = self.get("command") or ""
349 self.info("Deploying command '%s' " % command)
353 self._state = ResourceState.FAILED
356 super(LinuxApplication, self).deploy()
359 command = self.get('command')
360 env = self.get('env')
361 stdin = 'stdin' if self.get('stdin') else None
362 stdout = 'stdout' if self.get('stdout') else 'stdout'
363 stderr = 'stderr' if self.get('stderr') else 'stderr'
364 sudo = self.get('sudo') or False
365 x11 = self.get('forwardX11') or False
369 # If no command was given, then the application
370 # is directly marked as FINISHED
371 self._state = ResourceState.FINISHED
373 super(LinuxApplication, self).start()
375 self.info("Starting command '%s'" % command)
378 # If X11 forwarding was specified, then the application
379 # can not run detached, so instead of invoking asynchronous
380 # 'run' we invoke synchronous 'execute'.
382 msg = "No command is defined but X11 forwarding has been set"
384 self._state = ResourceState.FAILED
385 raise RuntimeError, msg
390 for var in env.split(" "):
391 environ += ' %s ' % var
393 command = "{" + environ + " ; " + command + " ; }"
394 command = self.replace_paths(command)
396 # If the command requires X11 forwarding, we
397 # can't run it asynchronously
398 (out, err), proc = self.node.execute(command,
403 self._state = ResourceState.FINISHED
405 if proc.poll() and err:
408 # Command was previously uploaded, now run the remote
409 # bash file asynchronously
410 cmd = "bash ./app.sh"
411 (out, err), proc = self.node.run(cmd, self.app_home,
417 # check if execution errors occurred
418 msg = " Failed to start command '%s' " % command
420 if proc.poll() and err:
421 self.error(msg, out, err)
422 raise RuntimeError, msg
424 # Check status of process running in background
425 pid, ppid = self.node.wait_pid(self.app_home)
426 if pid: self._pid = int(pid)
427 if ppid: self._ppid = int(ppid)
429 # If the process is not running, check for error information
430 # on the remote machine
431 if not self.pid or not self.ppid:
432 (out, err), proc = self.node.check_output(self.app_home, 'stderr')
433 self.error(msg, out, err)
435 msg2 = " Setting state to Failed"
437 self._state = ResourceState.FAILED
439 raise RuntimeError, msg
442 command = self.get('command') or ''
445 if state == ResourceState.STARTED:
446 self.info("Stopping command '%s'" % command)
448 (out, err), proc = self.node.kill(self.pid, self.ppid)
451 # check if execution errors occurred
452 msg = " Failed to STOP command '%s' " % self.get("command")
453 self.error(msg, out, err)
454 self._state = ResourceState.FAILED
457 super(LinuxApplication, self).stop()
460 self.info("Releasing resource")
462 tear_down = self.get("tearDown")
464 self.node.execute(tear_down)
467 if self.state == ResourceState.STOPPED:
468 super(LinuxApplication, self).release()
472 if self._state == ResourceState.STARTED:
473 # To avoid overwhelming the remote hosts and the local processor
474 # with too many ssh queries, the state is only requested
475 # every 'state_check_delay' seconds.
476 state_check_delay = 0.5
477 if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
478 # check if execution errors occurred
479 (out, err), proc = self.node.check_errors(self.app_home)
482 if err.find("No such file or directory") >= 0 :
483 # The resource is marked as started, but the
484 # command was not yet executed
485 return ResourceState.READY
487 msg = " Failed to execute command '%s'" % self.get("command")
488 self.error(msg, out, err)
489 self._state = ResourceState.FAILED
491 elif self.pid and self.ppid:
492 status = self.node.status(self.pid, self.ppid)
494 if status == ProcStatus.FINISHED:
495 self._state = ResourceState.FINISHED
498 self._last_state_check = strfnow()
502 def replace_paths(self, command):
504 Replace all special path tags with shell-escaped actual paths.
507 return d if d.startswith("/") else os.path.join("${HOME}", d)
510 .replace("${SOURCES}", absolute_dir(self.src_dir))
511 .replace("${BUILD}", absolute_dir(self.build_dir))
512 .replace("${APP_HOME}", absolute_dir(self.app_home))
513 .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
514 .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
517 def valid_connection(self, guid):