Improved LinuxApplication behavior
[nepi.git] / src / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sshfuncs import ProcStatus
25 from nepi.util.timefuncs import strfnow, strfdiff
26
27 import os
28
29 # TODO: Resolve wildcards in commands!!
30
31
32 @clsinit
33 class LinuxApplication(ResourceManager):
34     _rtype = "LinuxApplication"
35
36     @classmethod
37     def _register_attributes(cls):
38         command = Attribute("command", "Command to execute", 
39                 flags = Flags.ExecReadOnly)
40         forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections", 
41                 flags = Flags.ExecReadOnly)
42         env = Attribute("env", "Environment variables string for command execution",
43                 flags = Flags.ExecReadOnly)
44         sudo = Attribute("sudo", "Run with root privileges", 
45                 flags = Flags.ExecReadOnly)
46         depends = Attribute("depends", 
47                 "Space-separated list of packages required to run the application",
48                 flags = Flags.ExecReadOnly)
49         sources = Attribute("sources", 
50                 "Space-separated list of regular files to be deployed in the working "
51                 "path prior to building. Archives won't be expanded automatically.",
52                 flags = Flags.ExecReadOnly)
53         code = Attribute("code", 
54                 "Plain text source code to be uploaded to the server. It will be stored "
55                 "under ${SOURCES}/code",
56                 flags = Flags.ExecReadOnly)
57         build = Attribute("build", 
58                 "Build commands to execute after deploying the sources. "
59                 "Sources will be in the ${SOURCES} folder. "
60                 "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n"
61                 "Try to make the commands return with a nonzero exit code on error.\n"
62                 "Also, do not install any programs here, use the 'install' attribute. This will "
63                 "help keep the built files constrained to the build folder (which may "
64                 "not be the home folder), and will result in faster deployment. Also, "
65                 "make sure to clean up temporary files, to reduce bandwidth usage between "
66                 "nodes when transferring built packages.",
67                 flags = Flags.ReadOnly)
68         install = Attribute("install", 
69                 "Commands to transfer built files to their final destinations. "
70                 "Sources will be in the initial working folder, and a special "
71                 "tag ${SOURCES} can be used to reference the experiment's "
72                 "home folder (where the application commands will run).\n"
73                 "ALL sources and targets needed for execution must be copied there, "
74                 "if building has been enabled.\n"
75                 "That is, 'slave' nodes will not automatically get any source files. "
76                 "'slave' nodes don't get build dependencies either, so if you need "
77                 "make and other tools to install, be sure to provide them as "
78                 "actual dependencies instead.",
79                 flags = Flags.ReadOnly)
80         stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
81         stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
82         stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
83         tear_down = Attribute("tearDown", "Bash script to be executed before "
84                 "releasing the resource", 
85                 flags = Flags.ReadOnly)
86
87         cls._register_attribute(command)
88         cls._register_attribute(forward_x11)
89         cls._register_attribute(env)
90         cls._register_attribute(sudo)
91         cls._register_attribute(depends)
92         cls._register_attribute(sources)
93         cls._register_attribute(code)
94         cls._register_attribute(build)
95         cls._register_attribute(install)
96         cls._register_attribute(stdin)
97         cls._register_attribute(stdout)
98         cls._register_attribute(stderr)
99         cls._register_attribute(tear_down)
100
101     @classmethod
102     def _register_traces(cls):
103         stdout = Trace("stdout", "Standard output stream")
104         stderr = Trace("stderr", "Standard error stream")
105
106         cls._register_trace(stdout)
107         cls._register_trace(stderr)
108
109     def __init__(self, ec, guid):
110         super(LinuxApplication, self).__init__(ec, guid)
111         self._pid = None
112         self._ppid = None
113         self._home = "app-%s" % self.guid
114
115         # keep a reference to the running process handler when 
116         # the command is not executed as remote daemon in background
117         self._proc = None
118
119         # timestamp of last state check of the application
120         self._last_state_check = strfnow()
121     
122     def log_message(self, msg):
123         return " guid %d - host %s - %s " % (self.guid, 
124                 self.node.get("hostname"), msg)
125
126     @property
127     def node(self):
128         node = self.get_connected(LinuxNode.rtype())
129         if node: return node[0]
130         return None
131
132     @property
133     def app_home(self):
134         return os.path.join(self.node.exp_home, self._home)
135
136     @property
137     def src_dir(self):
138         return os.path.join(self.app_home, 'src')
139
140     @property
141     def build_dir(self):
142         return os.path.join(self.app_home, 'build')
143
144     @property
145     def pid(self):
146         return self._pid
147
148     @property
149     def ppid(self):
150         return self._ppid
151
152     @property
153     def in_foreground(self):
154         """ Returns True is the command needs to be executed in foreground.
155         This means that command will be executed using 'execute' instead of
156         'run'.
157
158         When using X11 forwarding option, the command can not run in background
159         and detached from a terminal in the remote host, since we need to keep 
160         the SSH connection to receive graphical data
161         """
162         return self.get("forwardX11") or False
163
164     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
165         self.info("Retrieving '%s' trace %s " % (name, attr))
166
167         path = os.path.join(self.app_home, name)
168         
169         command = "(test -f %s && echo 'success') || echo 'error'" % path
170         (out, err), proc = self.node.execute(command)
171
172         if (err and proc.poll()) or out.find("error") != -1:
173             msg = " Couldn't find trace %s " % name
174             self.error(msg, out, err)
175             return None
176     
177         if attr == TraceAttr.PATH:
178             return path
179
180         if attr == TraceAttr.ALL:
181             (out, err), proc = self.node.check_output(self.app_home, name)
182             
183             if err and proc.poll():
184                 msg = " Couldn't read trace %s " % name
185                 self.error(msg, out, err)
186                 return None
187
188             return out
189
190         if attr == TraceAttr.STREAM:
191             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
192         elif attr == TraceAttr.SIZE:
193             cmd = "stat -c%%s %s " % path
194
195         (out, err), proc = self.node.execute(cmd)
196
197         if err and proc.poll():
198             msg = " Couldn't find trace %s " % name
199             self.error(msg, out, err)
200             return None
201         
202         if attr == TraceAttr.SIZE:
203             out = int(out.strip())
204
205         return out
206             
207     def provision(self):
208         # create home dir for application
209         self.node.mkdir(self.app_home)
210
211         # upload sources
212         self.upload_sources()
213
214         # upload code
215         self.upload_code()
216
217         # upload stdin
218         self.upload_stdin()
219
220         # install dependencies
221         self.install_dependencies()
222
223         # build
224         self.build()
225
226         # Install
227         self.install()
228
229         # Upload command to remote bash script
230         # - only if command can be executed in background and detached
231         command = self.get("command")
232
233         if command and not self.in_foreground:
234             self.info("Uploading command '%s'" % command)
235
236             # replace application specific paths in the command
237             command = self.replace_paths(command)
238             
239             # replace application specific paths in the environment
240             env = self.get("env")
241             env = env and self.replace_paths(env)
242
243             self.node.upload_command(command, self.app_home, 
244                     shfile = "app.sh",
245                     env = env)
246        
247         self.info("Provisioning finished")
248
249         super(LinuxApplication, self).provision()
250
251     def upload_sources(self):
252         sources = self.get("sources")
253         if sources:
254             self.info("Uploading sources ")
255
256             # create dir for sources
257             self.node.mkdir(self.src_dir)
258
259             sources = sources.split(' ')
260
261             http_sources = list()
262             for source in list(sources):
263                 if source.startswith("http") or source.startswith("https"):
264                     http_sources.append(source)
265                     sources.remove(source)
266
267             # Download http sources remotely
268             if http_sources:
269                 command = [" wget -c --directory-prefix=${SOURCES} "]
270                 check = []
271
272                 for source in http_sources:
273                     command.append(" %s " % (source))
274                     check.append(" ls ${SOURCES}/%s " % os.path.basename(source))
275                 
276                 command = " ".join(command)
277                 check = " ; ".join(check)
278
279                 # Append the command to check that the sources were downloaded
280                 command += " ; %s " % check
281
282                 # replace application specific paths in the command
283                 command = self.replace_paths(command)
284                 
285                 # Upload the command to a bash script and run it
286                 # in background ( but wait until the command has
287                 # finished to continue )
288                 self.node.run_and_wait(command, self.app_home,
289                         shfile = "http_sources.sh",
290                         pidfile = "http_sources_pidfile", 
291                         ecodefile = "http_sources_exitcode", 
292                         stdout = "http_sources_stdout", 
293                         stderr = "http_sources_stderr")
294
295             if sources:
296                 self.node.upload(sources, self.src_dir)
297
298     def upload_code(self):
299         code = self.get("code")
300         if code:
301             # create dir for sources
302             self.node.mkdir(self.src_dir)
303
304             self.info("Uploading code ")
305
306             dst = os.path.join(self.src_dir, "code")
307             self.node.upload(sources, dst, text = True)
308
309     def upload_stdin(self):
310         stdin = self.get("stdin")
311         if stdin:
312             # create dir for sources
313             self.info(" Uploading stdin ")
314
315             dst = os.path.join(self.app_home, "stdin")
316             self.node.upload(stdin, dst, text = True)
317
318     def install_dependencies(self):
319         depends = self.get("depends")
320         if depends:
321             self.info("Installing dependencies %s" % depends)
322             self.node.install_packages(depends, self.app_home)
323
324     def build(self):
325         build = self.get("build")
326         if build:
327             self.info("Building sources ")
328             
329             # create dir for build
330             self.node.mkdir(self.build_dir)
331
332             # replace application specific paths in the command
333             command = self.replace_paths(build)
334
335             # Upload the command to a bash script and run it
336             # in background ( but wait until the command has
337             # finished to continue )
338             self.node.run_and_wait(command, self.app_home,
339                     shfile = "build.sh",
340                     pidfile = "build_pidfile", 
341                     ecodefile = "build_exitcode", 
342                     stdout = "build_stdout", 
343                     stderr = "build_stderr")
344  
345     def install(self):
346         install = self.get("install")
347         if install:
348             self.info("Installing sources ")
349
350             # replace application specific paths in the command
351             command = self.replace_paths(install)
352
353             # Upload the command to a bash script and run it
354             # in background ( but wait until the command has
355             # finished to continue )
356             self.node.run_and_wait(command, self.app_home,
357                     shfile = "install.sh",
358                     pidfile = "install_pidfile", 
359                     ecodefile = "install_exitcode", 
360                     stdout = "install_stdout", 
361                     stderr = "install_stderr")
362
363     def deploy(self):
364         # Wait until node is associated and deployed
365         node = self.node
366         if not node or node.state < ResourceState.READY:
367             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
368             
369             reschedule_delay = "0.5s"
370             self.ec.schedule(reschedule_delay, self.deploy)
371         else:
372             try:
373                 command = self.get("command") or ""
374                 self.info("Deploying command '%s' " % command)
375                 self.discover()
376                 self.provision()
377             except:
378                 self._state = ResourceState.FAILED
379                 raise
380
381             super(LinuxApplication, self).deploy()
382
383     def start(self):
384         command = self.get("command")
385         env = self.get("env")
386         stdin = "stdin" if self.get("stdin") else None
387         stdout = "stdout" if self.get("stdout") else "stdout"
388         stderr = "stderr" if self.get("stderr") else "stderr"
389         sudo = self.get("sudo") or False
390         failed = False
391
392         self.info("Starting command '%s'" % command)
393
394         if self.in_foreground:
395             # If command should be ran in foreground, we invoke
396             # the node 'execute' method
397             if not command:
398                 msg = "No command is defined but X11 forwarding has been set"
399                 self.error(msg)
400                 self._state = ResourceState.FAILED
401                 raise RuntimeError, msg
402
403             # Export environment
404             environ = "\n".join(map(lambda e: "export %s" % e, env.split(" ")))\
405                 if env else ""
406
407             command = environ + command
408             command = self.replace_paths(command)
409             
410             x11 = self.get("forwardX11")
411
412             # We save the reference to the process in self._proc 
413             # to be able to kill the process from the stop method
414             (out, err), self._proc = self.node.execute(command,
415                     sudo = sudo,
416                     stdin = stdin,
417                     forward_x11 = x11,
418                     blocking = False)
419
420             if self._proc.poll():
421                 out = ""
422                 err = self._proc.stderr.read()
423                 self._state = ResourceState.FAILED
424                 self.error(msg, out, err)
425                 raise RuntimeError, msg
426             
427             super(LinuxApplication, self).start()
428
429         elif command:
430             # If command is set (i.e. application not used only for dependency
431             # installation), and it does not need to run in foreground, we use 
432             # the 'run' method of the node to launch the application as a daemon 
433
434             # The real command to execute was previously uploaded to a remote bash
435             # script during deployment, now run the remote script using 'run' method 
436             # from the node
437             cmd = "bash ./app.sh"
438             (out, err), proc = self.node.run(cmd, self.app_home, 
439                 stdin = stdin, 
440                 stdout = stdout,
441                 stderr = stderr,
442                 sudo = sudo)
443
444             # check if execution errors occurred
445             msg = " Failed to start command '%s' " % command
446             
447             if proc.poll() and err:
448                 self.error(msg, out, err)
449                 raise RuntimeError, msg
450         
451             # Check status of process running in background
452             pid, ppid = self.node.wait_pid(self.app_home)
453             if pid: self._pid = int(pid)
454             if ppid: self._ppid = int(ppid)
455
456             # If the process is not running, check for error information
457             # on the remote machine
458             if not self.pid or not self.ppid:
459                 (out, err), proc = self.node.check_output(self.app_home, 'stderr')
460                 self.error(msg, out, err)
461
462                 msg2 = " Setting state to Failed"
463                 self.debug(msg2)
464                 self._state = ResourceState.FAILED
465
466                 raise RuntimeError, msg
467             
468             super(LinuxApplication, self).start()
469
470         else:
471             # If no command was given (i.e. Application was used for dependency
472             # installation), then the application is directly marked as FINISHED
473             self._state = ResourceState.FINISHED
474  
475     def stop(self):
476         """ Stops application execution
477         """
478         command = self.get('command') or ''
479         state = self.state
480
481         if state == ResourceState.STARTED:
482             stopped = True
483
484             self.info("Stopping command '%s'" % command)
485         
486             # If the command is running in foreground (it was launched using
487             # the node 'execute' method), then we use the handler to the Popen
488             # process to kill it. Else we send a kill signal using the pid and ppid
489             # retrieved after running the command with the node 'run' method
490
491             if self._proc:
492                 self._proc.kill()
493             else:
494                 (out, err), proc = self.node.kill(self.pid, self.ppid)
495
496                 if out or err:
497                     # check if execution errors occurred
498                     msg = " Failed to STOP command '%s' " % self.get("command")
499                     self.error(msg, out, err)
500                     self._state = ResourceState.FAILED
501                     stopped = False
502
503             if stopped:
504                 super(LinuxApplication, self).stop()
505
506     def release(self):
507         self.info("Releasing resource")
508
509         tear_down = self.get("tearDown")
510         if tear_down:
511             self.node.execute(tear_down)
512
513         self.stop()
514         if self.state == ResourceState.STOPPED:
515             super(LinuxApplication, self).release()
516     
517     @property
518     def state(self):
519         if self._state == ResourceState.STARTED:
520             if self.in_foreground:
521                 retcode = self._proc.poll()
522                 
523                 # retcode == None -> running
524                 # retcode > 0 -> error
525                 # retcode == 0 -> finished
526                 if retcode:
527                     out = ""
528                     err = self._proc.stderr.read()
529                     self._state = ResourceState.FAILED
530                     self.error(msg, out, err)
531                 elif retcode == 0:
532                     self._state = ResourceState.FINISHED
533
534             else:
535                 # To avoid overwhelming the remote hosts and the local processor
536                 # with too many ssh queries, the state is only requested
537                 # every 'state_check_delay' seconds.
538                 state_check_delay = 0.5
539                 if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
540                     # check if execution errors occurred
541                     (out, err), proc = self.node.check_errors(self.app_home)
542
543                     if out or err:
544                         if err.find("No such file or directory") >= 0 :
545                             # The resource is marked as started, but the
546                             # command was not yet executed
547                             return ResourceState.READY
548
549                         msg = " Failed to execute command '%s'" % self.get("command")
550                         self.error(msg, out, err)
551                         self._state = ResourceState.FAILED
552
553                     elif self.pid and self.ppid:
554                         status = self.node.status(self.pid, self.ppid)
555
556                         if status == ProcStatus.FINISHED:
557                             self._state = ResourceState.FINISHED
558
559                     self._last_state_check = strfnow()
560
561         return self._state
562
563     def replace_paths(self, command):
564         """
565         Replace all special path tags with shell-escaped actual paths.
566         """
567         def absolute_dir(d):
568             return d if d.startswith("/") else os.path.join("${HOME}", d)
569
570         return ( command
571             .replace("${SOURCES}", absolute_dir(self.src_dir))
572             .replace("${BUILD}", absolute_dir(self.build_dir))
573             .replace("${APP_HOME}", absolute_dir(self.app_home))
574             .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
575             .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
576             )
577         
578     def valid_connection(self, guid):
579         # TODO: Validate!
580         return True
581