2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sshfuncs import ProcStatus
25 from nepi.util.timefuncs import strfnow, strfdiff
30 # TODO: Resolve wildcards in commands!!
31 # TODO: compare_hash for all files that are uploaded!
35 class LinuxApplication(ResourceManager):
36 _rtype = "LinuxApplication"
39 def _register_attributes(cls):
40 command = Attribute("command", "Command to execute",
41 flags = Flags.ExecReadOnly)
42 forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections",
43 flags = Flags.ExecReadOnly)
44 env = Attribute("env", "Environment variables string for command execution",
45 flags = Flags.ExecReadOnly)
46 sudo = Attribute("sudo", "Run with root privileges",
47 flags = Flags.ExecReadOnly)
48 depends = Attribute("depends",
49 "Space-separated list of packages required to run the application",
50 flags = Flags.ExecReadOnly)
51 sources = Attribute("sources",
52 "Space-separated list of regular files to be deployed in the working "
53 "path prior to building. Archives won't be expanded automatically.",
54 flags = Flags.ExecReadOnly)
55 code = Attribute("code",
56 "Plain text source code to be uploaded to the server. It will be stored "
57 "under ${SOURCES}/code",
58 flags = Flags.ExecReadOnly)
59 build = Attribute("build",
60 "Build commands to execute after deploying the sources. "
61 "Sources will be in the ${SOURCES} folder. "
62 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
63 "Try to make the commands return with a nonzero exit code on error.\n"
64 "Also, do not install any programs here, use the 'install' attribute. This will "
65 "help keep the built files constrained to the build folder (which may "
66 "not be the home folder), and will result in faster deployment. Also, "
67 "make sure to clean up temporary files, to reduce bandwidth usage between "
68 "nodes when transferring built packages.",
69 flags = Flags.ReadOnly)
70 install = Attribute("install",
71 "Commands to transfer built files to their final destinations. "
72 "Sources will be in the initial working folder, and a special "
73 "tag ${SOURCES} can be used to reference the experiment's "
74 "home folder (where the application commands will run).\n"
75 "ALL sources and targets needed for execution must be copied there, "
76 "if building has been enabled.\n"
77 "That is, 'slave' nodes will not automatically get any source files. "
78 "'slave' nodes don't get build dependencies either, so if you need "
79 "make and other tools to install, be sure to provide them as "
80 "actual dependencies instead.",
81 flags = Flags.ReadOnly)
82 stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
83 stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
84 stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
85 tear_down = Attribute("tearDown", "Bash script to be executed before "
86 "releasing the resource",
87 flags = Flags.ReadOnly)
89 cls._register_attribute(command)
90 cls._register_attribute(forward_x11)
91 cls._register_attribute(env)
92 cls._register_attribute(sudo)
93 cls._register_attribute(depends)
94 cls._register_attribute(sources)
95 cls._register_attribute(code)
96 cls._register_attribute(build)
97 cls._register_attribute(install)
98 cls._register_attribute(stdin)
99 cls._register_attribute(stdout)
100 cls._register_attribute(stderr)
101 cls._register_attribute(tear_down)
104 def _register_traces(cls):
105 stdout = Trace("stdout", "Standard output stream")
106 stderr = Trace("stderr", "Standard error stream")
108 cls._register_trace(stdout)
109 cls._register_trace(stderr)
111 def __init__(self, ec, guid):
112 super(LinuxApplication, self).__init__(ec, guid)
115 self._home = "app-%s" % self.guid
116 self._in_foreground = False
118 # keep a reference to the running process handler when
119 # the command is not executed as remote daemon in background
122 # timestamp of last state check of the application
123 self._last_state_check = strfnow()
125 def log_message(self, msg):
126 return " guid %d - host %s - %s " % (self.guid,
127 self.node.get("hostname"), msg)
131 node = self.get_connected(LinuxNode.rtype())
132 if node: return node[0]
137 return os.path.join(self.node.exp_home, self._home)
141 return os.path.join(self.app_home, 'src')
145 return os.path.join(self.app_home, 'build')
156 def in_foreground(self):
157 """ Returns True if the command needs to be executed in foreground.
158 This means that command will be executed using 'execute' instead of
159 'run' ('run' executes a command in background and detached from the
162 When using X11 forwarding option, the command can not run in background
163 and detached from a terminal, since we need to keep the terminal attached
166 return self.get("forwardX11") or self._in_foreground
168 def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
169 self.info("Retrieving '%s' trace %s " % (name, attr))
171 path = os.path.join(self.app_home, name)
173 command = "(test -f %s && echo 'success') || echo 'error'" % path
174 (out, err), proc = self.node.execute(command)
176 if (err and proc.poll()) or out.find("error") != -1:
177 msg = " Couldn't find trace %s " % name
178 self.error(msg, out, err)
181 if attr == TraceAttr.PATH:
184 if attr == TraceAttr.ALL:
185 (out, err), proc = self.node.check_output(self.app_home, name)
187 if err and proc.poll():
188 msg = " Couldn't read trace %s " % name
189 self.error(msg, out, err)
194 if attr == TraceAttr.STREAM:
195 cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
196 elif attr == TraceAttr.SIZE:
197 cmd = "stat -c%%s %s " % path
199 (out, err), proc = self.node.execute(cmd)
201 if err and proc.poll():
202 msg = " Couldn't find trace %s " % name
203 self.error(msg, out, err)
206 if attr == TraceAttr.SIZE:
207 out = int(out.strip())
212 # create home dir for application
213 self.node.mkdir(self.app_home)
216 self.upload_sources()
224 # install dependencies
225 self.install_dependencies()
233 # Upload command to remote bash script
234 # - only if command can be executed in background and detached
235 command = self.get("command")
237 if command and not self.in_foreground:
238 self.info("Uploading command '%s'" % command)
240 # replace application specific paths in the command
241 command = self.replace_paths(command)
243 # replace application specific paths in the environment
244 env = self.get("env")
245 env = env and self.replace_paths(env)
247 self.node.upload_command(command, self.app_home,
251 self.info("Provisioning finished")
253 super(LinuxApplication, self).provision()
255 def upload_sources(self):
256 sources = self.get("sources")
258 self.info("Uploading sources ")
260 # create dir for sources
261 self.node.mkdir(self.src_dir)
263 sources = sources.split(' ')
265 http_sources = list()
266 for source in list(sources):
267 if source.startswith("http") or source.startswith("https"):
268 http_sources.append(source)
269 sources.remove(source)
271 # Download http sources remotely
273 command = [" wget -c --directory-prefix=${SOURCES} "]
276 for source in http_sources:
277 command.append(" %s " % (source))
278 check.append(" ls ${SOURCES}/%s " % os.path.basename(source))
280 command = " ".join(command)
281 check = " ; ".join(check)
283 # Append the command to check that the sources were downloaded
284 command += " ; %s " % check
286 # replace application specific paths in the command
287 command = self.replace_paths(command)
289 # Upload the command to a bash script and run it
290 # in background ( but wait until the command has
291 # finished to continue )
292 self.node.run_and_wait(command, self.app_home,
293 shfile = "http_sources.sh",
294 pidfile = "http_sources_pidfile",
295 ecodefile = "http_sources_exitcode",
296 stdout = "http_sources_stdout",
297 stderr = "http_sources_stderr")
300 self.node.upload(sources, self.src_dir)
302 def upload_code(self):
303 code = self.get("code")
305 # create dir for sources
306 self.node.mkdir(self.src_dir)
308 self.info("Uploading code ")
310 dst = os.path.join(self.src_dir, "code")
311 self.node.upload(sources, dst, text = True)
313 def upload_stdin(self):
314 stdin = self.get("stdin")
316 # create dir for sources
317 self.info(" Uploading stdin ")
319 dst = os.path.join(self.app_home, "stdin")
321 # If what we are uploading is a file, check whether
322 # the same file already exists (using md5sum)
323 if self.compare_hash(stdin, dst):
326 self.node.upload(stdin, dst, text = True)
328 def install_dependencies(self):
329 depends = self.get("depends")
331 self.info("Installing dependencies %s" % depends)
332 self.node.install_packages(depends, self.app_home)
335 build = self.get("build")
337 self.info("Building sources ")
339 # create dir for build
340 self.node.mkdir(self.build_dir)
342 # replace application specific paths in the command
343 command = self.replace_paths(build)
345 # Upload the command to a bash script and run it
346 # in background ( but wait until the command has
347 # finished to continue )
348 self.node.run_and_wait(command, self.app_home,
350 pidfile = "build_pidfile",
351 ecodefile = "build_exitcode",
352 stdout = "build_stdout",
353 stderr = "build_stderr")
356 install = self.get("install")
358 self.info("Installing sources ")
360 # replace application specific paths in the command
361 command = self.replace_paths(install)
363 # Upload the command to a bash script and run it
364 # in background ( but wait until the command has
365 # finished to continue )
366 self.node.run_and_wait(command, self.app_home,
367 shfile = "install.sh",
368 pidfile = "install_pidfile",
369 ecodefile = "install_exitcode",
370 stdout = "install_stdout",
371 stderr = "install_stderr")
374 # Wait until node is associated and deployed
376 if not node or node.state < ResourceState.READY:
377 self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
379 reschedule_delay = "0.5s"
380 self.ec.schedule(reschedule_delay, self.deploy)
383 command = self.get("command") or ""
384 self.info("Deploying command '%s' " % command)
388 self._state = ResourceState.FAILED
391 super(LinuxApplication, self).deploy()
394 command = self.get("command")
396 self.info("Starting command '%s'" % command)
399 # If no command was given (i.e. Application was used for dependency
400 # installation), then the application is directly marked as FINISHED
401 self._state = ResourceState.FINISHED
404 if self.in_foreground:
405 self._start_in_foreground()
407 self._start_in_background()
409 super(LinuxApplication, self).start()
411 def _start_in_foreground(self):
412 command = self.get("command")
413 stdin = "stdin" if self.get("stdin") else None
414 sudo = self.get("sudo") or False
415 x11 = self.get("forwardX11")
417 # Command will be launched in foreground and attached to the
418 # terminal using the node 'execute' in non blocking mode.
421 env = self.get("env")
422 environ = self.node.format_environment(env, inline = True)
423 command = environ + command
424 command = self.replace_paths(command)
426 # We save the reference to the process in self._proc
427 # to be able to kill the process from the stop method.
428 # We also set blocking = False, since we don't want the
429 # thread to block until the execution finishes.
430 (out, err), self._proc = self.node.execute(command,
436 if self._proc.poll():
437 self._state = ResourceState.FAILED
438 self.error(msg, out, err)
439 raise RuntimeError, msg
441 def _start_in_background(self):
442 command = self.get("command")
443 env = self.get("env")
444 stdin = "stdin" if self.get("stdin") else None
445 stdout = "stdout" if self.get("stdout") else "stdout"
446 stderr = "stderr" if self.get("stderr") else "stderr"
447 sudo = self.get("sudo") or False
449 # Command will be as a daemon in baground and detached from any terminal.
450 # The real command to run was previously uploaded to a bash script
451 # during deployment, now launch the remote script using 'run'
452 # method from the node
453 cmd = "bash ./app.sh"
454 (out, err), proc = self.node.run(cmd, self.app_home,
460 # check if execution errors occurred
461 msg = " Failed to start command '%s' " % command
464 self._state = ResourceState.FAILED
465 self.error(msg, out, err)
466 raise RuntimeError, msg
468 # Wait for pid file to be generated
469 pid, ppid = self.node.wait_pid(self.app_home)
470 if pid: self._pid = int(pid)
471 if ppid: self._ppid = int(ppid)
473 # If the process is not running, check for error information
474 # on the remote machine
475 if not self.pid or not self.ppid:
476 (out, err), proc = self.node.check_errors(self.app_home,
479 # Out is what was written in the stderr file
481 self._state = ResourceState.FAILED
482 msg = " Failed to start command '%s' " % command
483 self.error(msg, out, err)
484 raise RuntimeError, msg
487 """ Stops application execution
489 command = self.get('command') or ''
491 if self.state == ResourceState.STARTED:
494 self.info("Stopping command '%s'" % command)
496 # If the command is running in foreground (it was launched using
497 # the node 'execute' method), then we use the handler to the Popen
498 # process to kill it. Else we send a kill signal using the pid and ppid
499 # retrieved after running the command with the node 'run' method
504 # Only try to kill the process if the pid and ppid
506 if self.pid and self.ppid:
507 (out, err), proc = self.node.kill(self.pid, self.ppid)
510 # check if execution errors occurred
511 msg = " Failed to STOP command '%s' " % self.get("command")
512 self.error(msg, out, err)
513 self._state = ResourceState.FAILED
517 super(LinuxApplication, self).stop()
520 self.info("Releasing resource")
522 tear_down = self.get("tearDown")
524 self.node.execute(tear_down)
528 if self.state == ResourceState.STOPPED:
529 super(LinuxApplication, self).release()
533 """ Returns the state of the application
535 if self._state == ResourceState.STARTED:
536 if self.in_foreground:
537 # Check if the process we used to execute the command
538 # is still running ...
539 retcode = self._proc.poll()
541 # retcode == None -> running
542 # retcode > 0 -> error
543 # retcode == 0 -> finished
546 msg = " Failed to execute command '%s'" % self.get("command")
547 err = self._proc.stderr.read()
548 self.error(msg, out, err)
549 self._state = ResourceState.FAILED
551 self._state = ResourceState.FINISHED
554 # We need to query the status of the command we launched in
555 # background. In oredr to avoid overwhelming the remote host and
556 # the local processor with too many ssh queries, the state is only
557 # requested every 'state_check_delay' seconds.
558 state_check_delay = 0.5
559 if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
560 # check if execution errors occurred
561 (out, err), proc = self.node.check_errors(self.app_home)
564 msg = " Failed to execute command '%s'" % self.get("command")
565 self.error(msg, out, err)
566 self._state = ResourceState.FAILED
568 elif self.pid and self.ppid:
569 # No execution errors occurred. Make sure the background
570 # process with the recorded pid is still running.
571 status = self.node.status(self.pid, self.ppid)
573 if status == ProcStatus.FINISHED:
574 self._state = ResourceState.FINISHED
576 self._last_state_check = strfnow()
580 def replace_paths(self, command):
582 Replace all special path tags with shell-escaped actual paths.
585 return d if d.startswith("/") else os.path.join("${HOME}", d)
588 .replace("${SOURCES}", absolute_dir(self.src_dir))
589 .replace("${BUILD}", absolute_dir(self.build_dir))
590 .replace("${APP_HOME}", absolute_dir(self.app_home))
591 .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
592 .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
595 def compare_hash(self, local, remote):
596 # getting md5sum from remote file
597 (out, err), proc = self.node.execute("md5sum %s " % remote)
599 if proc.poll() == 0: #OK
600 if not os.path.isfile(local):
601 # store to a tmp file
602 f = tempfile.NamedTemporaryFile()
607 lproc = subprocess.Popen(["md5sum", local],
608 stdout = subprocess.PIPE,
609 stderr = subprocess.PIPE)
611 # getting md5sum from local file
612 (lout, lerr) = lproc.communicate()
614 # files are the same, no need to upload
615 lchk = lout.strip().split(" ")[0]
616 rchk = out.strip().split(" ")[0]
618 msg = " Comparing files: LOCAL %s md5sum %s - REMOTE %s md5sum %s" % (
619 local, lchk, remote, rchk)
627 def valid_connection(self, guid):