1 from neco.execution.attribute import Attribute, Flags, Types
2 from neco.execution.trace import Trace, TraceAttr
3 from neco.execution.resource import ResourceManager, clsinit, ResourceState
4 from neco.resources.linux.node import LinuxNode
5 from neco.util import sshfuncs
10 reschedule_delay = "0.5s"
12 # TODO: Resolve wildcards in commands!!
15 class LinuxApplication(ResourceManager):
16 _rtype = "LinuxApplication"
19 def _register_attributes(cls):
20 command = Attribute("command", "Command to execute",
21 flags = Flags.ExecReadOnly)
22 forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections",
23 flags = Flags.ExecReadOnly)
24 env = Attribute("env", "Environment variables string for command execution",
25 flags = Flags.ExecReadOnly)
26 sudo = Attribute("sudo", "Run with root privileges",
27 flags = Flags.ExecReadOnly)
28 depends = Attribute("depends",
29 "Space-separated list of packages required to run the application",
30 flags = Flags.ExecReadOnly)
31 sources = Attribute("sources",
32 "Space-separated list of regular files to be deployed in the working "
33 "path prior to building. Archives won't be expanded automatically.",
34 flags = Flags.ExecReadOnly)
35 code = Attribute("code",
36 "Plain text source code to be uploaded to the server. It will be stored "
37 "under ${SOURCES}/code",
38 flags = Flags.ExecReadOnly)
39 build = Attribute("build",
40 "Build commands to execute after deploying the sources. "
41 "Sources will be in the ${SOURCES} folder. "
42 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
43 "Try to make the commands return with a nonzero exit code on error.\n"
44 "Also, do not install any programs here, use the 'install' attribute. This will "
45 "help keep the built files constrained to the build folder (which may "
46 "not be the home folder), and will result in faster deployment. Also, "
47 "make sure to clean up temporary files, to reduce bandwidth usage between "
48 "nodes when transferring built packages.",
49 flags = Flags.ReadOnly)
50 install = Attribute("install",
51 "Commands to transfer built files to their final destinations. "
52 "Sources will be in the initial working folder, and a special "
53 "tag ${SOURCES} can be used to reference the experiment's "
54 "home folder (where the application commands will run).\n"
55 "ALL sources and targets needed for execution must be copied there, "
56 "if building has been enabled.\n"
57 "That is, 'slave' nodes will not automatically get any source files. "
58 "'slave' nodes don't get build dependencies either, so if you need "
59 "make and other tools to install, be sure to provide them as "
60 "actual dependencies instead.",
61 flags = Flags.ReadOnly)
62 stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
63 stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
64 stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
65 tear_down = Attribute("tearDown", "Bash script to be executed before "
66 "releasing the resource",
67 flags = Flags.ReadOnly)
69 cls._register_attribute(command)
70 cls._register_attribute(forward_x11)
71 cls._register_attribute(env)
72 cls._register_attribute(sudo)
73 cls._register_attribute(depends)
74 cls._register_attribute(sources)
75 cls._register_attribute(code)
76 cls._register_attribute(build)
77 cls._register_attribute(install)
78 cls._register_attribute(stdin)
79 cls._register_attribute(stdout)
80 cls._register_attribute(stderr)
81 cls._register_attribute(tear_down)
84 def _register_traces(cls):
85 stdout = Trace("stdout", "Standard output stream")
86 stderr = Trace("stderr", "Standard error stream")
87 buildlog = Trace("buildlog", "Output of the build process")
89 cls._register_trace(stdout)
90 cls._register_trace(stderr)
91 cls._register_trace(buildlog)
93 def __init__(self, ec, guid):
94 super(LinuxApplication, self).__init__(ec, guid)
97 self._home = "app-%s" % self.guid
99 self._logger = logging.getLogger("LinuxApplication")
101 def log_message(self, msg):
102 return " guid %d - host %s - %s " % (self.guid,
103 self.node.get("hostname"), msg)
107 node = self.get_connected(LinuxNode.rtype())
108 if node: return node[0]
113 return os.path.join(self.node.exp_home, self._home)
117 return os.path.join(self.app_home, 'src')
121 return os.path.join(self.app_home, 'build')
131 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
132 self.info("Retrieving '%s' trace %s " % (name, attr))
134 path = os.path.join(self.app_home, name)
136 command = "(test -f %s && echo 'success') || echo 'error'" % path
137 (out, err), proc = self.node.execute(command)
139 if (err and proc.poll()) or out.find("error") != -1:
140 msg = " Couldn't find trace %s " % name
141 self.error(msg, out, err)
144 if attr == TraceAttr.PATH:
147 if attr == TraceAttr.ALL:
148 (out, err), proc = self.node.check_output(self.app_home, name)
150 if err and proc.poll():
151 msg = " Couldn't read trace %s " % name
152 self.error(msg, out, err)
157 if attr == TraceAttr.STREAM:
158 cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
159 elif attr == TraceAttr.SIZE:
160 cmd = "stat -c%%s %s " % path
162 (out, err), proc = self.node.execute(cmd)
164 if err and proc.poll():
165 msg = " Couldn't find trace %s " % name
166 self.error(msg, out, err)
169 if attr == TraceAttr.SIZE:
170 out = int(out.strip())
174 def provision(self, filters = None):
175 # create home dir for application
176 self.node.mkdir(self.app_home)
179 self.upload_sources()
184 # install dependencies
185 self.install_dependencies()
193 command = self.get("command")
194 x11 = self.get("forwardX11")
195 if not x11 and command:
196 self.info("Uploading command '%s'" % command)
198 # TODO: missing set PATH and PYTHONPATH!!
200 # If the command runs asynchronous, pre upload the command
201 # to the app.sh file in the remote host
202 dst = os.path.join(self.app_home, "app.sh")
203 command = self.replace_paths(command)
204 self.node.upload(command, dst, text = True)
206 super(LinuxApplication, self).provision()
208 def upload_sources(self):
209 # TODO: check if sources need to be uploaded and upload them
210 sources = self.get("sources")
212 self.info(" Uploading sources ")
214 # create dir for sources
215 self.node.mkdir(self.src_dir)
217 sources = sources.split(' ')
219 http_sources = list()
220 for source in list(sources):
221 if source.startswith("http") or source.startswith("https"):
222 http_sources.append(source)
223 sources.remove(source)
225 # Download http sources
227 cmd = " wget -c --directory-prefix=${SOURCES} "
230 for source in http_sources:
231 cmd += " %s " % (source)
232 verif += " ls ${SOURCES}/%s ;" % os.path.basename(source)
234 # Wget output goes to stderr :S
235 cmd += " 2> /dev/null ; "
238 cmd += " %s " % verif
240 # Upload the command to a file, and execute asynchronously
241 self.upload_and_run(cmd,
242 "http_sources.sh", "http_sources_pid",
243 "http_sources_out", "http_sources_err")
245 self.node.upload(sources, self.src_dir)
247 def upload_code(self):
248 code = self.get("code")
250 # create dir for sources
251 self.node.mkdir(self.src_dir)
253 self.info(" Uploading code ")
255 dst = os.path.join(self.src_dir, "code")
256 self.node.upload(sources, dst, text = True)
258 def install_dependencies(self):
259 depends = self.get("depends")
261 self.info(" Installing dependencies %s" % depends)
262 self.node.install_packages(depends, home = self.app_home)
265 build = self.get("build")
267 self.info(" Building sources ")
269 # create dir for build
270 self.node.mkdir(self.build_dir)
272 # Upload the command to a file, and execute asynchronously
273 self.upload_and_run(build,
274 "build.sh", "build_pid",
275 "build_out", "build_err")
278 install = self.get("install")
280 self.info(" Installing sources ")
282 # Upload the command to a file, and execute asynchronously
283 self.upload_and_run(install,
284 "install.sh", "install_pid",
285 "install_out", "install_err")
288 # Wait until node is associated and deployed
290 if not node or node.state < ResourceState.READY:
291 self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
292 self.ec.schedule(reschedule_delay, self.deploy)
295 command = self.get("command") or ""
296 self.info(" Deploying command '%s' " % command)
300 self._state = ResourceState.FAILED
303 super(LinuxApplication, self).deploy()
306 command = self.get("command")
307 env = self.get("env")
308 stdin = 'stdin' if self.get("stdin") else None
309 stdout = 'stdout' if self.get("stdout") else 'stdout'
310 stderr = 'stderr' if self.get("stderr") else 'stderr'
311 sudo = self.get('sudo') or False
312 x11 = self.get("forwardX11") or False
315 super(LinuxApplication, self).start()
318 self.info("No command to start ")
319 self._state = ResourceState.FINISHED
322 self.info("Starting command '%s'" % command)
325 # If the command requires X11 forwarding, we
326 # can't run it asynchronously
327 (out, err), proc = self.node.execute(command,
335 if proc.poll() and err:
338 # Command was previously uploaded, now run the remote
339 # bash file asynchronously
341 env = self.replace_paths(env)
343 cmd = "bash ./app.sh"
344 (out, err), proc = self.node.run(cmd, self.app_home,
350 if proc.poll() and err:
354 pid, ppid = self.node.wait_pid(home = self.app_home)
355 if pid: self._pid = int(pid)
356 if ppid: self._ppid = int(ppid)
358 if not self.pid or not self.ppid:
361 (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr')
363 if failed or out or chkerr:
364 # check if execution errors occurred
365 msg = " Failed to start command '%s' " % command
372 self.error(msg, out, err)
374 msg2 = " Setting state to Failed"
376 self._state = ResourceState.FAILED
378 raise RuntimeError, msg
382 if state == ResourceState.STARTED:
383 self.info("Stopping command %s" % command)
385 (out, err), proc = self.node.kill(self.pid, self.ppid)
388 # check if execution errors occurred
389 msg = " Failed to STOP command '%s' " % self.get("command")
390 self.error(msg, out, err)
391 self._state = ResourceState.FAILED
394 super(LinuxApplication, self).stop()
397 self.info("Releasing resource")
399 tear_down = self.get("tearDown")
401 self.node.execute(tear_down)
404 if self.state == ResourceState.STOPPED:
405 super(LinuxApplication, self).release()
409 if self._state == ResourceState.STARTED:
410 (out, err), proc = self.node.check_output(self.app_home, 'stderr')
413 if err.find("No such file or directory") >= 0 :
414 # The resource is marked as started, but the
415 # command was not yet executed
416 return ResourceState.READY
418 # check if execution errors occurred
419 msg = " Failed to execute command '%s'" % self.get("command")
420 self.error(msg, out, err)
421 self._state = ResourceState.FAILED
423 elif self.pid and self.ppid:
424 status = self.node.status(self.pid, self.ppid)
426 if status == sshfuncs.FINISHED:
427 self._state = ResourceState.FINISHED
431 def upload_and_run(self, cmd, fname, pidfile, outfile, errfile):
432 dst = os.path.join(self.app_home, fname)
433 cmd = self.replace_paths(cmd)
434 self.node.upload(cmd, dst, text = True)
436 cmd = "bash ./%s" % fname
437 (out, err), proc = self.node.run_and_wait(cmd, self.app_home,
441 raise_on_error = True)
443 def replace_paths(self, command):
445 Replace all special path tags with shell-escaped actual paths.
448 return d if d.startswith("/") else os.path.join("${HOME}", d)
451 .replace("${SOURCES}", absolute_dir(self.src_dir))
452 .replace("${BUILD}", absolute_dir(self.build_dir))
453 .replace("${APP_HOME}", absolute_dir(self.app_home))
454 .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
455 .replace("${EXP_HOME}", self.node.exp_home) )
457 def valid_connection(self, guid):
460 # XXX: What if it is connected to more than one node?
461 resources = self.find_resources(exact_tags = [tags.NODE])
462 self._node = resources[0] if len(resources) == 1 else None
466 """ Generates a hash representing univokely the application.
467 Is used to determine whether the home directory should be cleaned
471 command = self.get("command")
472 forwards_x11 = self.get("forwardX11")
473 env = self.get("env")
474 sudo = self.get("sudo")
475 depends = self.get("depends")
476 sources = self.get("sources")
477 cls._register_attribute(sources)
478 cls._register_attribute(build)
479 cls._register_attribute(install)
480 cls._register_attribute(stdin)
481 cls._register_attribute(stdout)
482 cls._register_attribute(stderr)
483 cls._register_attribute(tear_down)
484 skey = "".join(map(str, args))
485 return hashlib.md5(skey).hexdigest()