Fixing type do_finish instead of do_finished in LinuxApplication
[nepi.git] / src / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23         ResourceState, reschedule_delay
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sshfuncs import ProcStatus
26 from nepi.util.timefuncs import tnow, tdiffsec
27
28 import os
29 import subprocess
30
31 # TODO: Resolve wildcards in commands!!
32 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
33
34 @clsinit_copy
35 class LinuxApplication(ResourceManager):
36     """
37     .. class:: Class Args :
38       
39         :param ec: The Experiment controller
40         :type ec: ExperimentController
41         :param guid: guid of the RM
42         :type guid: int
43
44     .. note::
45
46     A LinuxApplication RM represents a process that can be executed in
47     a remote Linux host using SSH.
48
49     The LinuxApplication RM takes care of uploadin sources and any files
50     needed to run the experiment, to the remote host. 
51     It also allows to provide source compilation (build) and installation 
52     instructions, and takes care of automating the sources build and 
53     installation tasks for the user.
54
55     It is important to note that files uploaded to the remote host have
56     two possible scopes: single-experiment or multi-experiment.
57     Single experiment files are those that will not be re-used by other 
58     experiments. Multi-experiment files are those that will.
59     Sources and shared files are always made available to all experiments.
60
61     Directory structure:
62
63     The directory structure used by LinuxApplication RM at the Linux
64     host is the following:
65
66         ${HOME}/nepi-usr --> Base directory for multi-experiment files
67                       |
68         ${LIB}        |- /lib --> Base directory for libraries
69         ${BIN}        |- /bin --> Base directory for binary files
70         ${SRC}        |- /src --> Base directory for sources
71         ${SHARE}      |- /share --> Base directory for other files
72
73         ${HOME}/nepi-exp --> Base directory for single-experiment files
74                       |
75         ${EXP_HOME}   |- /<exp-id>  --> Base directory for experiment exp-id
76                           |
77         ${APP_HOME}       |- /<app-guid> --> Base directory for application 
78                                |     specific files (e.g. command.sh, input)
79                                | 
80         ${RUN_HOME}            |- /<run-id> --> Base directory for run specific
81
82     """
83
84     _rtype = "LinuxApplication"
85     _help = "Runs an application on a Linux host with a BASH command "
86     _backend_type = "linux"
87
88     @classmethod
89     def _register_attributes(cls):
90         command = Attribute("command", "Command to execute at application start. "
91                 "Note that commands will be executed in the ${RUN_HOME} directory, "
92                 "make sure to take this into account when using relative paths. ", 
93                 flags = Flags.ExecReadOnly)
94         forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", 
95                 flags = Flags.ExecReadOnly)
96         env = Attribute("env", "Environment variables string for command execution",
97                 flags = Flags.ExecReadOnly)
98         sudo = Attribute("sudo", "Run with root privileges", 
99                 flags = Flags.ExecReadOnly)
100         depends = Attribute("depends", 
101                 "Space-separated list of packages required to run the application",
102                 flags = Flags.ExecReadOnly)
103         sources = Attribute("sources", 
104                 "Space-separated list of regular files to be uploaded to ${SRC} "
105                 "directory prior to building. Archives won't be expanded automatically. "
106                 "Sources are globally available for all experiments unless "
107                 "cleanHome is set to True (This will delete all sources). ",
108                 flags = Flags.ExecReadOnly)
109         files = Attribute("files", 
110                 "Space-separated list of regular miscellaneous files to be uploaded "
111                 "to ${SHARE} directory. "
112                 "Files are globally available for all experiments unless "
113                 "cleanHome is set to True (This will delete all files). ",
114                 flags = Flags.ExecReadOnly)
115         libs = Attribute("libs", 
116                 "Space-separated list of libraries (e.g. .so files) to be uploaded "
117                 "to ${LIB} directory. "
118                 "Libraries are globally available for all experiments unless "
119                 "cleanHome is set to True (This will delete all files). ",
120                 flags = Flags.ExecReadOnly)
121         bins = Attribute("bins", 
122                 "Space-separated list of binary files to be uploaded "
123                 "to ${BIN} directory. "
124                 "Binaries are globally available for all experiments unless "
125                 "cleanHome is set to True (This will delete all files). ",
126                 flags = Flags.ExecReadOnly)
127         code = Attribute("code", 
128                 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
129                 flags = Flags.ExecReadOnly)
130         build = Attribute("build", 
131                 "Build commands to execute after deploying the sources. "
132                 "Sources are uploaded to the ${SRC} directory and code "
133                 "is uploaded to the ${APP_HOME} directory. \n"
134                 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
135                 "./configure && make && make clean.\n"
136                 "Make sure to make the build commands return with a nonzero exit "
137                 "code on error.",
138                 flags = Flags.ReadOnly)
139         install = Attribute("install", 
140                 "Commands to transfer built files to their final destinations. "
141                 "Install commands are executed after build commands. ",
142                 flags = Flags.ReadOnly)
143         stdin = Attribute("stdin", "Standard input for the 'command'", 
144                 flags = Flags.ExecReadOnly)
145         tear_down = Attribute("tearDown", "Command to be executed just before " 
146                 "releasing the resource", 
147                 flags = Flags.ReadOnly)
148
149         cls._register_attribute(command)
150         cls._register_attribute(forward_x11)
151         cls._register_attribute(env)
152         cls._register_attribute(sudo)
153         cls._register_attribute(depends)
154         cls._register_attribute(sources)
155         cls._register_attribute(code)
156         cls._register_attribute(files)
157         cls._register_attribute(bins)
158         cls._register_attribute(libs)
159         cls._register_attribute(build)
160         cls._register_attribute(install)
161         cls._register_attribute(stdin)
162         cls._register_attribute(tear_down)
163
164     @classmethod
165     def _register_traces(cls):
166         stdout = Trace("stdout", "Standard output stream")
167         stderr = Trace("stderr", "Standard error stream")
168
169         cls._register_trace(stdout)
170         cls._register_trace(stderr)
171
172     def __init__(self, ec, guid):
173         super(LinuxApplication, self).__init__(ec, guid)
174         self._pid = None
175         self._ppid = None
176         self._home = "app-%s" % self.guid
177         # whether the command should run in foreground attached
178         # to a terminal
179         self._in_foreground = False
180
181         # whether to use sudo to kill the application process
182         self._sudo_kill = False
183
184         # keep a reference to the running process handler when 
185         # the command is not executed as remote daemon in background
186         self._proc = None
187
188         # timestamp of last state check of the application
189         self._last_state_check = tnow()
190
191     def log_message(self, msg):
192         return " guid %d - host %s - %s " % (self.guid, 
193                 self.node.get("hostname"), msg)
194
195     @property
196     def node(self):
197         node = self.get_connected(LinuxNode.rtype())
198         if node: return node[0]
199         return None
200
201     @property
202     def app_home(self):
203         return os.path.join(self.node.exp_home, self._home)
204
205     @property
206     def run_home(self):
207         return os.path.join(self.app_home, self.ec.run_id)
208
209     @property
210     def pid(self):
211         return self._pid
212
213     @property
214     def ppid(self):
215         return self._ppid
216
217     @property
218     def in_foreground(self):
219         """ Returns True if the command needs to be executed in foreground.
220         This means that command will be executed using 'execute' instead of
221         'run' ('run' executes a command in background and detached from the 
222         terminal)
223         
224         When using X11 forwarding option, the command can not run in background
225         and detached from a terminal, since we need to keep the terminal attached 
226         to interact with it.
227         """
228         return self.get("forwardX11") or self._in_foreground
229
230     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
231         self.info("Retrieving '%s' trace %s " % (name, attr))
232
233         path = os.path.join(self.run_home, name)
234         
235         command = "(test -f %s && echo 'success') || echo 'error'" % path
236         (out, err), proc = self.node.execute(command)
237
238         if (err and proc.poll()) or out.find("error") != -1:
239             msg = " Couldn't find trace %s " % name
240             self.error(msg, out, err)
241             return None
242     
243         if attr == TraceAttr.PATH:
244             return path
245
246         if attr == TraceAttr.ALL:
247             (out, err), proc = self.node.check_output(self.run_home, name)
248             
249             if proc.poll():
250                 msg = " Couldn't read trace %s " % name
251                 self.error(msg, out, err)
252                 return None
253
254             return out
255
256         if attr == TraceAttr.STREAM:
257             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
258         elif attr == TraceAttr.SIZE:
259             cmd = "stat -c%%s %s " % path
260
261         (out, err), proc = self.node.execute(cmd)
262
263         if proc.poll():
264             msg = " Couldn't find trace %s " % name
265             self.error(msg, out, err)
266             return None
267         
268         if attr == TraceAttr.SIZE:
269             out = int(out.strip())
270
271         return out
272
273     def do_provision(self):
274         # create run dir for application
275         self.node.mkdir(self.run_home)
276    
277         # List of all the provision methods to invoke
278         steps = [
279             # upload sources
280             self.upload_sources,
281             # upload files
282             self.upload_files,
283             # upload binaries
284             self.upload_binaries,
285             # upload libraries
286             self.upload_libraries,
287             # upload code
288             self.upload_code,
289             # upload stdin
290             self.upload_stdin,
291             # install dependencies
292             self.install_dependencies,
293             # build
294             self.build,
295             # Install
296             self.install]
297
298         command = []
299
300         # Since provisioning takes a long time, before
301         # each step we check that the EC is still 
302         for step in steps:
303             if self.ec.abort:
304                 self.debug("Interrupting provisioning. EC says 'ABORT")
305                 return
306             
307             ret = step()
308             if ret:
309                 command.append(ret)
310
311         # upload deploy script
312         deploy_command = ";".join(command)
313         self.execute_deploy_command(deploy_command)
314
315         # upload start script
316         self.upload_start_command()
317        
318         self.info("Provisioning finished")
319
320         super(LinuxApplication, self).do_provision()
321
322     def upload_start_command(self):
323         # Upload command to remote bash script
324         # - only if command can be executed in background and detached
325         command = self.get("command")
326
327         if command and not self.in_foreground:
328             self.info("Uploading command '%s'" % command)
329
330             # replace application specific paths in the command
331             command = self.replace_paths(command)
332             
333             # replace application specific paths in the environment
334             env = self.get("env")
335             env = env and self.replace_paths(env)
336
337             shfile = os.path.join(self.app_home, "start.sh")
338
339             self.node.upload_command(command, 
340                     shfile = shfile,
341                     env = env,
342                     overwrite = False)
343
344     def execute_deploy_command(self, command):
345         if command:
346             # Upload the command to a bash script and run it
347             # in background ( but wait until the command has
348             # finished to continue )
349             shfile = os.path.join(self.app_home, "deploy.sh")
350             self.node.run_and_wait(command, self.run_home,
351                     shfile = shfile, 
352                     overwrite = False,
353                     pidfile = "deploy_pidfile", 
354                     ecodefile = "deploy_exitcode", 
355                     stdout = "deploy_stdout", 
356                     stderr = "deploy_stderr")
357
358     def upload_sources(self):
359         sources = self.get("sources")
360    
361         command = ""
362
363         if sources:
364             self.info("Uploading sources ")
365
366             sources = sources.split(' ')
367
368             # Separate sources that should be downloaded from 
369             # the web, from sources that should be uploaded from
370             # the local machine
371             command = []
372             for source in list(sources):
373                 if source.startswith("http") or source.startswith("https"):
374                     # remove the hhtp source from the sources list
375                     sources.remove(source)
376
377                     command.append( " ( " 
378                             # Check if the source already exists
379                             " ls ${SRC}/%(basename)s "
380                             " || ( "
381                             # If source doesn't exist, download it and check
382                             # that it it downloaded ok
383                             "   wget -c --directory-prefix=${SRC} %(source)s && "
384                             "   ls ${SRC}/%(basename)s "
385                             " ) ) " % {
386                                 "basename": os.path.basename(source),
387                                 "source": source
388                                 })
389
390             command = " && ".join(command)
391
392             # replace application specific paths in the command
393             command = self.replace_paths(command)
394        
395             if sources:
396                 sources = ' '.join(sources)
397                 self.node.upload(sources, self.node.src_dir, overwrite = False)
398
399         return command
400
401     def upload_files(self):
402         files = self.get("files")
403
404         if files:
405             self.info("Uploading files %s " % files)
406             self.node.upload(files, self.node.share_dir, overwrite = False)
407
408     def upload_libraries(self):
409         libs = self.get("libs")
410
411         if libs:
412             self.info("Uploading libraries %s " % libaries)
413             self.node.upload(libs, self.node.lib_dir, overwrite = False)
414
415     def upload_binaries(self):
416         bins = self.get("bins")
417
418         if bins:
419             self.info("Uploading binaries %s " % binaries)
420             self.node.upload(bins, self.node.bin_dir, overwrite = False)
421
422     def upload_code(self):
423         code = self.get("code")
424
425         if code:
426             self.info("Uploading code")
427
428             dst = os.path.join(self.app_home, "code")
429             self.node.upload(code, dst, overwrite = False, text = True)
430
431     def upload_stdin(self):
432         stdin = self.get("stdin")
433         if stdin:
434             # create dir for sources
435             self.info("Uploading stdin")
436             
437             # upload stdin file to ${SHARE_DIR} directory
438             basename = os.path.basename(stdin)
439             dst = os.path.join(self.node.share_dir, basename)
440             self.node.upload(stdin, dst, overwrite = False, text = True)
441
442             # create "stdin" symlink on ${APP_HOME} directory
443             command = "( cd %(app_home)s ; [ ! -f stdin ] &&  ln -s %(stdin)s stdin )" % ({
444                 "app_home": self.app_home, 
445                 "stdin": dst })
446
447             return command
448
449     def install_dependencies(self):
450         depends = self.get("depends")
451         if depends:
452             self.info("Installing dependencies %s" % depends)
453             return self.node.install_packages_command(depends)
454
455     def build(self):
456         build = self.get("build")
457
458         if build:
459             self.info("Building sources ")
460             
461             # replace application specific paths in the command
462             return self.replace_paths(build)
463
464     def install(self):
465         install = self.get("install")
466
467         if install:
468             self.info("Installing sources ")
469
470             # replace application specific paths in the command
471             return self.replace_paths(install)
472
473     def do_deploy(self):
474         # Wait until node is associated and deployed
475         node = self.node
476         if not node or node.state < ResourceState.READY:
477             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
478             self.ec.schedule(reschedule_delay, self.deploy)
479         else:
480             command = self.get("command") or ""
481             self.info("Deploying command '%s' " % command)
482             self.do_discover()
483             self.do_provision()
484
485             super(LinuxApplication, self).do_deploy()
486    
487     def do_start(self):
488         command = self.get("command")
489
490         self.info("Starting command '%s'" % command)
491
492         if not command:
493             # If no command was given (i.e. Application was used for dependency
494             # installation), then the application is directly marked as FINISHED
495             super(LinuxApplication, self).do_finish()
496         else:
497             if self.in_foreground:
498                 self._run_in_foreground()
499             else:
500                 self._run_in_background()
501
502             super(LinuxApplication, self).do_start()
503
504     def _run_in_foreground(self):
505         command = self.get("command")
506         sudo = self.get("sudo") or False
507         x11 = self.get("forwardX11")
508         env = self.get("env")
509
510         # For a command being executed in foreground, if there is stdin,
511         # it is expected to be text string not a file or pipe
512         stdin = self.get("stdin") or None
513
514         # Command will be launched in foreground and attached to the
515         # terminal using the node 'execute' in non blocking mode.
516
517         # We save the reference to the process in self._proc 
518         # to be able to kill the process from the stop method.
519         # We also set blocking = False, since we don't want the
520         # thread to block until the execution finishes.
521         (out, err), self._proc = self.execute_command(command, 
522                 env = env,
523                 sudo = sudo,
524                 stdin = stdin,
525                 forward_x11 = x11,
526                 blocking = False)
527
528         if self._proc.poll():
529             self.error(msg, out, err)
530             raise RuntimeError, msg
531
532     def _run_in_background(self):
533         command = self.get("command")
534         env = self.get("env")
535         sudo = self.get("sudo") or False
536
537         stdout = "stdout"
538         stderr = "stderr"
539         stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
540                 else None
541
542         # Command will be run as a daemon in baground and detached from any
543         # terminal.
544         # The command to run was previously uploaded to a bash script
545         # during deployment, now we launch the remote script using 'run'
546         # method from the node.
547         cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
548         (out, err), proc = self.node.run(cmd, self.run_home, 
549             stdin = stdin, 
550             stdout = stdout,
551             stderr = stderr,
552             sudo = sudo)
553
554         # check if execution errors occurred
555         msg = " Failed to start command '%s' " % command
556         
557         if proc.poll():
558             self.error(msg, out, err)
559             raise RuntimeError, msg
560     
561         # Wait for pid file to be generated
562         pid, ppid = self.node.wait_pid(self.run_home)
563         if pid: self._pid = int(pid)
564         if ppid: self._ppid = int(ppid)
565
566         # If the process is not running, check for error information
567         # on the remote machine
568         if not self.pid or not self.ppid:
569             (out, err), proc = self.node.check_errors(self.run_home,
570                     stderr = stderr) 
571
572             # Out is what was written in the stderr file
573             if err:
574                 msg = " Failed to start command '%s' " % command
575                 self.error(msg, out, err)
576                 raise RuntimeError, msg
577     
578     def do_stop(self):
579         """ Stops application execution
580         """
581         command = self.get('command') or ''
582
583         if self.state == ResourceState.STARTED:
584         
585             self.info("Stopping command '%s' " % command)
586         
587             # If the command is running in foreground (it was launched using
588             # the node 'execute' method), then we use the handler to the Popen
589             # process to kill it. Else we send a kill signal using the pid and ppid
590             # retrieved after running the command with the node 'run' method
591             if self._proc:
592                 self._proc.kill()
593             else:
594                 # Only try to kill the process if the pid and ppid
595                 # were retrieved
596                 if self.pid and self.ppid:
597                     (out, err), proc = self.node.kill(self.pid, self.ppid,
598                             sudo = self._sudo_kill)
599
600                     # TODO: check if execution errors occurred
601                     if proc.poll() or err:
602                         msg = " Failed to STOP command '%s' " % self.get("command")
603                         self.error(msg, out, err)
604         
605             super(LinuxApplication, self).do_stop()
606
607     def do_release(self):
608         self.info("Releasing resource")
609
610         tear_down = self.get("tearDown")
611         if tear_down:
612             self.node.execute(tear_down)
613
614         self.do_stop()
615
616         super(LinuxApplication, self).do_release()
617         
618     @property
619     def state(self):
620         """ Returns the state of the application
621         """
622         if self._state == ResourceState.STARTED:
623             if self.in_foreground:
624                 # Check if the process we used to execute the command
625                 # is still running ...
626                 retcode = self._proc.poll()
627
628                 # retcode == None -> running
629                 # retcode > 0 -> error
630                 # retcode == 0 -> finished
631                 if retcode:
632                     out = ""
633                     msg = " Failed to execute command '%s'" % self.get("command")
634                     err = self._proc.stderr.read()
635                     self.error(msg, out, err)
636                     self.fail()
637
638                 elif retcode == 0:
639                     self.finish()
640             else:
641                 # We need to query the status of the command we launched in 
642                 # background. In order to avoid overwhelming the remote host and
643                 # the local processor with too many ssh queries, the state is only
644                 # requested every 'state_check_delay' seconds.
645                 state_check_delay = 0.5
646                 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
647                     if self.pid and self.ppid:
648                         # Make sure the process is still running in background
649                         status = self.node.status(self.pid, self.ppid)
650
651                         if status == ProcStatus.FINISHED:
652                             # If the program finished, check if execution
653                             # errors occurred
654                             (out, err), proc = self.node.check_errors(
655                                     self.run_home)
656
657                             if err:
658                                 msg = "Failed to execute command '%s'" % \
659                                         self.get("command")
660                                 self.error(msg, out, err)
661                                 self.fail()
662                             else:
663                                 self.finish()
664
665                     self._last_state_check = tnow()
666
667         return self._state
668
669     def execute_command(self, command, 
670             env = None,
671             sudo = False,
672             stdin = None,
673             forward_x11 = False,
674             blocking = False):
675
676         environ = ""
677         if env:
678             environ = self.node.format_environment(env, inline = True)
679         command = environ + command
680         command = self.replace_paths(command)
681
682         return self.node.execute(command,
683                 sudo = sudo,
684                 stdin = stdin,
685                 forward_x11 = forward_x11,
686                 blocking = blocking)
687
688     def replace_paths(self, command):
689         """
690         Replace all special path tags with shell-escaped actual paths.
691         """
692         return ( command
693             .replace("${USR}", self.node.usr_dir)
694             .replace("${LIB}", self.node.lib_dir)
695             .replace("${BIN}", self.node.bin_dir)
696             .replace("${SRC}", self.node.src_dir)
697             .replace("${SHARE}", self.node.share_dir)
698             .replace("${EXP}", self.node.exp_dir)
699             .replace("${EXP_HOME}", self.node.exp_home)
700             .replace("${APP_HOME}", self.app_home)
701             .replace("${RUN_HOME}", self.run_home)
702             .replace("${NODE_HOME}", self.node.node_home)
703             .replace("${HOME}", self.node.home_dir)
704             )
705
706     def valid_connection(self, guid):
707         # TODO: Validate!
708         return True
709