1 from neco.execution.attribute import Attribute, Flags, Types
2 from neco.execution.trace import Trace, TraceAttr
3 from neco.execution.resource import ResourceManager, clsinit, ResourceState
4 from neco.resources.linux.node import LinuxNode
5 from neco.util import sshfuncs
12 # TODO: Resolve wildcards in commands!!
15 class LinuxApplication(ResourceManager):
16 _rtype = "LinuxApplication"
19 def _register_attributes(cls):
20 command = Attribute("command", "Command to execute",
21 flags = Flags.ExecReadOnly)
22 forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections",
23 flags = Flags.ExecReadOnly)
24 env = Attribute("env", "Environment variables string for command execution",
25 flags = Flags.ExecReadOnly)
26 sudo = Attribute("sudo", "Run with root privileges",
27 flags = Flags.ExecReadOnly)
28 depends = Attribute("depends",
29 "Space-separated list of packages required to run the application",
30 flags = Flags.ExecReadOnly)
31 sources = Attribute("sources",
32 "Space-separated list of regular files to be deployed in the working "
33 "path prior to building. Archives won't be expanded automatically.",
34 flags = Flags.ExecReadOnly)
35 code = Attribute("code",
36 "Plain text source code to be uploaded to the server. It will be stored "
37 "under ${SOURCES}/code",
38 flags = Flags.ExecReadOnly)
39 build = Attribute("build",
40 "Build commands to execute after deploying the sources. "
41 "Sources will be in the ${SOURCES} folder. "
42 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
43 "Try to make the commands return with a nonzero exit code on error.\n"
44 "Also, do not install any programs here, use the 'install' attribute. This will "
45 "help keep the built files constrained to the build folder (which may "
46 "not be the home folder), and will result in faster deployment. Also, "
47 "make sure to clean up temporary files, to reduce bandwidth usage between "
48 "nodes when transferring built packages.",
49 flags = Flags.ReadOnly)
50 install = Attribute("install",
51 "Commands to transfer built files to their final destinations. "
52 "Sources will be in the initial working folder, and a special "
53 "tag ${SOURCES} can be used to reference the experiment's "
54 "home folder (where the application commands will run).\n"
55 "ALL sources and targets needed for execution must be copied there, "
56 "if building has been enabled.\n"
57 "That is, 'slave' nodes will not automatically get any source files. "
58 "'slave' nodes don't get build dependencies either, so if you need "
59 "make and other tools to install, be sure to provide them as "
60 "actual dependencies instead.",
61 flags = Flags.ReadOnly)
62 stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
63 stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
64 stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
65 update_home = Attribute("updateHome", "If application hash has changed remove old directory and"
66 "re-upload before starting experiment. If not keep the same directory",
69 flags = Flags.ExecReadOnly)
71 tear_down = Attribute("tearDown", "Bash script to be executed before "
72 "releasing the resource",
73 flags = Flags.ReadOnly)
75 cls._register_attribute(command)
76 cls._register_attribute(forward_x11)
77 cls._register_attribute(env)
78 cls._register_attribute(sudo)
79 cls._register_attribute(depends)
80 cls._register_attribute(sources)
81 cls._register_attribute(code)
82 cls._register_attribute(build)
83 cls._register_attribute(install)
84 cls._register_attribute(stdin)
85 cls._register_attribute(stdout)
86 cls._register_attribute(stderr)
87 cls._register_attribute(update_home)
88 cls._register_attribute(tear_down)
91 def _register_traces(cls):
92 stdout = Trace("stdout", "Standard output stream")
93 stderr = Trace("stderr", "Standard error stream")
94 buildlog = Trace("buildlog", "Output of the build process")
96 cls._register_trace(stdout)
97 cls._register_trace(stderr)
98 cls._register_trace(buildlog)
100 def __init__(self, ec, guid):
101 super(LinuxApplication, self).__init__(ec, guid)
104 self._home = "app-%s" % self.guid
106 self._logger = logging.getLogger("neco.linux.Application.%d" % guid)
110 node = self.get_connected(LinuxNode.rtype())
111 if node: return node[0]
116 return os.path.join(self.node.exp_dir, self._home)
120 return os.path.join(self.home, 'src')
124 return os.path.join(self.home, 'build')
134 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
135 path = os.path.join(self.home, name)
137 cmd = "(test -f %s && echo 'success') || echo 'error'" % path
138 (out, err), proc = self.node.execute(cmd)
140 if (err and proc.poll()) or out.find("error") != -1:
141 err_msg = " Couldn't find trace %s on host %s. Error: %s" % (
142 name, self.node.get("hostname"), err)
143 self.logger.error(err_msg)
146 if attr == TraceAttr.PATH:
149 if attr == TraceAttr.ALL:
150 (out, err), proc = self.node.check_output(self.home, name)
152 if err and proc.poll():
153 err_msg = " Couldn't read trace %s on host %s. Error: %s" % (
154 name, self.node.get("hostname"), err)
155 self.logger.error(err_msg)
160 if attr == TraceAttr.STREAM:
161 cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
162 elif attr == TraceAttr.SIZE:
163 cmd = "stat -c%%s %s " % path
165 (out, err), proc = self.node.execute(cmd)
167 if err and proc.poll():
168 err_msg = " Couldn't find trace %s on host %s. Error: %s" % (
169 name, self.node.get("hostname"), err)
170 self.logger.error(err_msg)
173 if attr == TraceAttr.SIZE:
174 out = int(out.strip())
178 def provision(self, filters = None):
179 # TODO: verify home hash or clean home
181 # create home dir for application
182 self.node.mkdir(self.home)
185 self.upload_sources()
190 # install dependencies
191 self.install_dependencies()
199 super(LinuxApplication, self).provision()
201 def upload_sources(self):
202 # check if sources need to be uploaded and upload them
203 sources = self.get("sources")
205 self.logger.debug(" Uploading sources %s" % sources)
207 # create dir for sources
208 self.node.mkdir(self.src_dir)
210 sources = self.sources.split(' ')
212 http_sources = list()
213 for source in list(sources):
214 if source.startswith("http") or source.startswith("https"):
215 http_sources.append(source)
216 sources.remove(source)
218 # Download http sources
219 for source in http_sources:
220 dst = os.path.join(self.src_dir, source.split("/")[-1])
221 command = "wget -o %s %s" % (dst, source)
222 self.node.execute(command)
224 self.node.upload(sources, self.src_dir)
226 def upload_code(self):
227 code = self.get("code")
229 # create dir for sources
230 self.node.mkdir(self.src_dir)
232 self.logger.debug(" Uploading code '%s'" % code)
234 dst = os.path.join(self.src_dir, "code")
235 self.node.upload(sources, dst, text = True)
237 def install_dependencies(self):
238 depends = self.get("depends")
240 self.logger.debug(" Installing dependencies %s" % depends)
241 self.node.install_packages(depends, home = self.home)
244 build = self.get("build")
246 self.logger.debug(" Building sources '%s'" % build)
248 # create dir for build
249 self.node.mkdir(self.build_dir)
251 cmd = self.replace_paths(build)
253 (out, err), proc = self.run_and_wait(cmd, self.home,
254 pidfile = "build_pid",
255 stdout = "build_log",
256 stderr = "build_err",
257 raise_on_error = True)
260 install = self.get("install")
262 self.logger.debug(" Installing sources '%s'" % install)
264 cmd = self.replace_paths(install)
266 (out, err), proc = self.run_and_wait(cmd, self.home,
267 pidfile = "install_pid",
268 stdout = "install_log",
269 stderr = "install_err",
270 raise_on_error = True)
273 # Wait until node is associated and deployed
275 if not node or node.state < ResourceState.READY:
276 self.ec.schedule(DELAY, self.deploy)
281 super(LinuxApplication, self).deploy()
284 command = self.replace_paths(self.get("command"))
285 env = self.get("env")
286 stdin = 'stdin' if self.get("stdin") else None
287 sudo = self.get('sudo') or False
288 x11 = self.get("forwardX11") or False
289 err_msg = "Failed to run command %s on host %s" % (
290 command, self.node.get("hostname"))
293 super(LinuxApplication, self).start()
296 (out, err), proc = self.node.execute(command,
304 if proc.poll() and err:
307 (out, err), proc = self.node.run(command, self.home,
311 if proc.poll() and err:
315 pid, ppid = self.node.wait_pid(home = self.home)
316 if pid: self._pid = int(pid)
317 if ppid: self._ppid = int(ppid)
319 if not self.pid or not self.ppid:
322 (out, chkerr), proc = self.node.check_output(self.home, 'stderr')
324 if failed or out or chkerr:
325 # check if execution errors occurred
327 err_msg = "%s. Proc error: %s" % (err_msg, err)
329 err_msg = "%s. Run error: %s " % (err_msg, out)
332 err_msg = "%s. Failed to check error: %s" % (err_msg, chkerr)
334 self.logger.error(err_msg)
335 self.state = ResourceState.FAILED
339 if state == ResourceState.STARTED:
340 (out, err), proc = self.node.kill(self.pid, self.ppid)
343 # check if execution errors occurred
344 err_msg = " Failed to STOP command '%s' on host %s. Check error: %s. Run error: %s" % (
345 self.get("command"), self.node.get("hostname"), err, out)
346 self.logger.error(err_msg)
347 self._state = ResourceState.FAILED
350 super(LinuxApplication, self).stop()
353 tear_down = self.get("tearDown")
355 self.node.execute(tear_down)
358 if self.state == ResourceState.STOPPED:
359 super(LinuxApplication, self).release()
363 if self._state == ResourceState.STARTED:
364 (out, err), proc = self.node.check_output(self.home, 'stderr')
367 # check if execution errors occurred
368 err_msg = " Failed to execute command '%s' on host %s. Check error: %s. Run error: %s" % (
369 self.get("command"), self.node.get("hostname"), err, out)
370 self.logger.error(err_msg)
371 self._state = ResourceState.FAILED
373 elif self.pid and self.ppid:
374 status = self.node.status(self.pid, self.ppid)
376 if status == sshfuncs.FINISHED:
377 self._state = ResourceState.FINISHED
381 def valid_connection(self, guid):
384 # XXX: What if it is connected to more than one node?
385 resources = self.find_resources(exact_tags = [tags.NODE])
386 self._node = resources[0] if len(resources) == 1 else None
390 """ Generates a hash representing univokely the application.
391 Is used to determine whether the home directory should be cleaned
395 command = self.get("command")
396 forwards_x11 = self.get("forwardX11")
397 env = self.get("env")
398 sudo = self.get("sudo")
399 depends = self.get("depends")
400 sources = self.get("sources")
401 cls._register_attribute(sources)
402 cls._register_attribute(build)
403 cls._register_attribute(install)
404 cls._register_attribute(stdin)
405 cls._register_attribute(stdout)
406 cls._register_attribute(stderr)
407 cls._register_attribute(tear_down)
408 skey = "".join(map(str, args))
409 return hashlib.md5(skey).hexdigest()
411 def replace_paths(self, command):
413 Replace all special path tags with shell-escaped actual paths.
416 .replace("${SOURCES}", self.src_dir)
417 .replace("${BUILD}", self.build_dir)
418 .replace("${APPHOME}", self.home)
419 .replace("${NODEHOME}", self.node.home) )