1 from neco.execution.attribute import Attribute, Flags, Types
2 from neco.execution.trace import Trace, TraceAttr
3 from neco.execution.resource import ResourceManager, clsinit, ResourceState
4 from neco.resources.linux.node import LinuxNode
5 from neco.util import sshfuncs
6 from neco.util.timefuncs import strfnow, strfdiff
11 reschedule_delay = "0.5s"
14 # TODO: Resolve wildcards in commands!!
17 class LinuxApplication(ResourceManager):
18 _rtype = "LinuxApplication"
21 def _register_attributes(cls):
22 command = Attribute("command", "Command to execute",
23 flags = Flags.ExecReadOnly)
24 forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections",
25 flags = Flags.ExecReadOnly)
26 env = Attribute("env", "Environment variables string for command execution",
27 flags = Flags.ExecReadOnly)
28 sudo = Attribute("sudo", "Run with root privileges",
29 flags = Flags.ExecReadOnly)
30 depends = Attribute("depends",
31 "Space-separated list of packages required to run the application",
32 flags = Flags.ExecReadOnly)
33 sources = Attribute("sources",
34 "Space-separated list of regular files to be deployed in the working "
35 "path prior to building. Archives won't be expanded automatically.",
36 flags = Flags.ExecReadOnly)
37 code = Attribute("code",
38 "Plain text source code to be uploaded to the server. It will be stored "
39 "under ${SOURCES}/code",
40 flags = Flags.ExecReadOnly)
41 build = Attribute("build",
42 "Build commands to execute after deploying the sources. "
43 "Sources will be in the ${SOURCES} folder. "
44 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
45 "Try to make the commands return with a nonzero exit code on error.\n"
46 "Also, do not install any programs here, use the 'install' attribute. This will "
47 "help keep the built files constrained to the build folder (which may "
48 "not be the home folder), and will result in faster deployment. Also, "
49 "make sure to clean up temporary files, to reduce bandwidth usage between "
50 "nodes when transferring built packages.",
51 flags = Flags.ReadOnly)
52 install = Attribute("install",
53 "Commands to transfer built files to their final destinations. "
54 "Sources will be in the initial working folder, and a special "
55 "tag ${SOURCES} can be used to reference the experiment's "
56 "home folder (where the application commands will run).\n"
57 "ALL sources and targets needed for execution must be copied there, "
58 "if building has been enabled.\n"
59 "That is, 'slave' nodes will not automatically get any source files. "
60 "'slave' nodes don't get build dependencies either, so if you need "
61 "make and other tools to install, be sure to provide them as "
62 "actual dependencies instead.",
63 flags = Flags.ReadOnly)
64 stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
65 stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
66 stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
67 tear_down = Attribute("tearDown", "Bash script to be executed before "
68 "releasing the resource",
69 flags = Flags.ReadOnly)
71 cls._register_attribute(command)
72 cls._register_attribute(forward_x11)
73 cls._register_attribute(env)
74 cls._register_attribute(sudo)
75 cls._register_attribute(depends)
76 cls._register_attribute(sources)
77 cls._register_attribute(code)
78 cls._register_attribute(build)
79 cls._register_attribute(install)
80 cls._register_attribute(stdin)
81 cls._register_attribute(stdout)
82 cls._register_attribute(stderr)
83 cls._register_attribute(tear_down)
86 def _register_traces(cls):
87 stdout = Trace("stdout", "Standard output stream")
88 stderr = Trace("stderr", "Standard error stream")
89 buildlog = Trace("buildlog", "Output of the build process")
91 cls._register_trace(stdout)
92 cls._register_trace(stderr)
93 cls._register_trace(buildlog)
95 def __init__(self, ec, guid):
96 super(LinuxApplication, self).__init__(ec, guid)
99 self._home = "app-%s" % self.guid
101 # timestamp of last state check of the application
102 self._last_state_check = strfnow()
104 self._logger = logging.getLogger("LinuxApplication")
106 def log_message(self, msg):
107 return " guid %d - host %s - %s " % (self.guid,
108 self.node.get("hostname"), msg)
112 node = self.get_connected(LinuxNode.rtype())
113 if node: return node[0]
118 return os.path.join(self.node.exp_home, self._home)
122 return os.path.join(self.app_home, 'src')
126 return os.path.join(self.app_home, 'build')
136 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
137 self.info("Retrieving '%s' trace %s " % (name, attr))
139 path = os.path.join(self.app_home, name)
141 command = "(test -f %s && echo 'success') || echo 'error'" % path
142 (out, err), proc = self.node.execute(command)
144 if (err and proc.poll()) or out.find("error") != -1:
145 msg = " Couldn't find trace %s " % name
146 self.error(msg, out, err)
149 if attr == TraceAttr.PATH:
152 if attr == TraceAttr.ALL:
153 (out, err), proc = self.node.check_output(self.app_home, name)
155 if err and proc.poll():
156 msg = " Couldn't read trace %s " % name
157 self.error(msg, out, err)
162 if attr == TraceAttr.STREAM:
163 cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
164 elif attr == TraceAttr.SIZE:
165 cmd = "stat -c%%s %s " % path
167 (out, err), proc = self.node.execute(cmd)
169 if err and proc.poll():
170 msg = " Couldn't find trace %s " % name
171 self.error(msg, out, err)
174 if attr == TraceAttr.SIZE:
175 out = int(out.strip())
179 def provision(self, filters = None):
180 # create home dir for application
181 self.node.mkdir(self.app_home)
184 self.upload_sources()
192 # install dependencies
193 self.install_dependencies()
201 command = self.get("command")
202 x11 = self.get("forwardX11")
203 if not x11 and command:
204 self.info("Uploading command '%s'" % command)
208 env = self.get("env") or ""
209 for var in env.split(" "):
210 environ += 'export %s\n' % var
212 command = environ + command
214 # If the command runs asynchronous, pre upload the command
215 # to the app.sh file in the remote host
216 dst = os.path.join(self.app_home, "app.sh")
217 command = self.replace_paths(command)
218 self.node.upload(command, dst, text = True)
220 super(LinuxApplication, self).provision()
222 def upload_sources(self):
223 # TODO: check if sources need to be uploaded and upload them
224 sources = self.get("sources")
226 self.info(" Uploading sources ")
228 # create dir for sources
229 self.node.mkdir(self.src_dir)
231 sources = sources.split(' ')
233 http_sources = list()
234 for source in list(sources):
235 if source.startswith("http") or source.startswith("https"):
236 http_sources.append(source)
237 sources.remove(source)
239 # Download http sources
241 cmd = " wget -c --directory-prefix=${SOURCES} "
244 for source in http_sources:
245 cmd += " %s " % (source)
246 verif += " ls ${SOURCES}/%s ;" % os.path.basename(source)
248 # Wget output goes to stderr :S
249 cmd += " 2> /dev/null ; "
252 cmd += " %s " % verif
254 # Upload the command to a file, and execute asynchronously
255 self.upload_and_run(cmd,
256 "http_sources.sh", "http_sources_pid",
257 "http_sources_out", "http_sources_err")
259 self.node.upload(sources, self.src_dir)
261 def upload_code(self):
262 code = self.get("code")
264 # create dir for sources
265 self.node.mkdir(self.src_dir)
267 self.info(" Uploading code ")
269 dst = os.path.join(self.src_dir, "code")
270 self.node.upload(sources, dst, text = True)
272 def upload_stdin(self):
273 stdin = self.get("stdin")
275 # create dir for sources
276 self.info(" Uploading stdin ")
278 dst = os.path.join(self.app_home, "stdin")
279 self.node.upload(stdin, dst, text = True)
281 def install_dependencies(self):
282 depends = self.get("depends")
284 self.info(" Installing dependencies %s" % depends)
285 self.node.install_packages(depends, home = self.app_home)
288 build = self.get("build")
290 self.info(" Building sources ")
292 # create dir for build
293 self.node.mkdir(self.build_dir)
295 # Upload the command to a file, and execute asynchronously
296 self.upload_and_run(build,
297 "build.sh", "build_pid",
298 "build_out", "build_err")
301 install = self.get("install")
303 self.info(" Installing sources ")
305 # Upload the command to a file, and execute asynchronously
306 self.upload_and_run(install,
307 "install.sh", "install_pid",
308 "install_out", "install_err")
311 # Wait until node is associated and deployed
313 if not node or node.state < ResourceState.READY:
314 self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
315 self.ec.schedule(reschedule_delay, self.deploy)
318 command = self.get("command") or ""
319 self.info(" Deploying command '%s' " % command)
323 self._state = ResourceState.FAILED
326 super(LinuxApplication, self).deploy()
329 command = self.get('command')
330 env = self.get('env')
331 stdin = 'stdin' if self.get('stdin') else None
332 stdout = 'stdout' if self.get('stdout') else 'stdout'
333 stderr = 'stderr' if self.get('stderr') else 'stderr'
334 sudo = self.get('sudo') or False
335 x11 = self.get('forwardX11') or False
338 super(LinuxApplication, self).start()
341 self.info("No command to start ")
342 self._state = ResourceState.FINISHED
345 self.info("Starting command '%s'" % command)
351 for var in env.split(" "):
352 environ += ' %s ' % var
354 command = "(" + environ + " ; " + command + ")"
355 command = self.replace_paths(command)
357 # If the command requires X11 forwarding, we
358 # can't run it asynchronously
359 (out, err), proc = self.node.execute(command,
364 self._state = ResourceState.FINISHED
366 if proc.poll() and err:
369 # Command was previously uploaded, now run the remote
370 # bash file asynchronously
371 cmd = "bash ./app.sh"
372 (out, err), proc = self.node.run(cmd, self.app_home,
378 if proc.poll() and err:
382 pid, ppid = self.node.wait_pid(home = self.app_home)
383 if pid: self._pid = int(pid)
384 if ppid: self._ppid = int(ppid)
386 if not self.pid or not self.ppid:
389 (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr')
391 if failed or out or chkerr:
392 # check if execution errors occurred
393 msg = " Failed to start command '%s' " % command
400 self.error(msg, out, err)
402 msg2 = " Setting state to Failed"
404 self._state = ResourceState.FAILED
406 raise RuntimeError, msg
409 command = self.get('command') or ''
412 if state == ResourceState.STARTED:
413 self.info("Stopping command '%s'" % command)
415 (out, err), proc = self.node.kill(self.pid, self.ppid)
418 # check if execution errors occurred
419 msg = " Failed to STOP command '%s' " % self.get("command")
420 self.error(msg, out, err)
421 self._state = ResourceState.FAILED
424 super(LinuxApplication, self).stop()
427 self.info("Releasing resource")
429 tear_down = self.get("tearDown")
431 self.node.execute(tear_down)
434 if self.state == ResourceState.STOPPED:
435 super(LinuxApplication, self).release()
439 if self._state == ResourceState.STARTED:
440 # To avoid overwhelming the remote hosts and the local processor
441 # with too many ssh queries, the state is only requested
442 # every 'state_check_delay' .
443 if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
444 # check if execution errors occurred
445 (out, err), proc = self.node.check_output(self.app_home, 'stderr')
448 if err.find("No such file or directory") >= 0 :
449 # The resource is marked as started, but the
450 # command was not yet executed
451 return ResourceState.READY
453 msg = " Failed to execute command '%s'" % self.get("command")
454 self.error(msg, out, err)
455 self._state = ResourceState.FAILED
457 elif self.pid and self.ppid:
458 status = self.node.status(self.pid, self.ppid)
460 if status == sshfuncs.FINISHED:
461 self._state = ResourceState.FINISHED
464 self._last_state_check = strfnow()
468 def upload_and_run(self, cmd, fname, pidfile, outfile, errfile):
469 dst = os.path.join(self.app_home, fname)
470 cmd = self.replace_paths(cmd)
471 self.node.upload(cmd, dst, text = True)
473 cmd = "bash ./%s" % fname
474 (out, err), proc = self.node.run_and_wait(cmd, self.app_home,
478 raise_on_error = True)
480 def replace_paths(self, command):
482 Replace all special path tags with shell-escaped actual paths.
485 return d if d.startswith("/") else os.path.join("${HOME}", d)
488 .replace("${SOURCES}", absolute_dir(self.src_dir))
489 .replace("${BUILD}", absolute_dir(self.build_dir))
490 .replace("${APP_HOME}", absolute_dir(self.app_home))
491 .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
492 .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
495 def valid_connection(self, guid):
498 # XXX: What if it is connected to more than one node?
499 resources = self.find_resources(exact_tags = [tags.NODE])
500 self._node = resources[0] if len(resources) == 1 else None