Fixing uploading string stdin in LinuxApplication
[nepi.git] / src / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23         ResourceState, reschedule_delay
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sshfuncs import ProcStatus
26 from nepi.util.timefuncs import tnow, tdiffsec
27
28 import os
29 import subprocess
30
31 # TODO: Resolve wildcards in commands!!
32 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
33
34 @clsinit_copy
35 class LinuxApplication(ResourceManager):
36     """
37     .. class:: Class Args :
38       
39         :param ec: The Experiment controller
40         :type ec: ExperimentController
41         :param guid: guid of the RM
42         :type guid: int
43
44     .. note::
45
46         A LinuxApplication RM represents a process that can be executed in
47         a remote Linux host using SSH.
48
49         The LinuxApplication RM takes care of uploadin sources and any files
50         needed to run the experiment, to the remote host. 
51         It also allows to provide source compilation (build) and installation 
52         instructions, and takes care of automating the sources build and 
53         installation tasks for the user.
54
55         It is important to note that files uploaded to the remote host have
56         two possible scopes: single-experiment or multi-experiment.
57         Single experiment files are those that will not be re-used by other 
58         experiments. Multi-experiment files are those that will.
59         Sources and shared files are always made available to all experiments.
60
61         Directory structure:
62
63         The directory structure used by LinuxApplication RM at the Linux
64         host is the following:
65
66         ${HOME}/.nepi/nepi-usr --> Base directory for multi-experiment files
67                       |
68         ${LIB}        |- /lib --> Base directory for libraries
69         ${BIN}        |- /bin --> Base directory for binary files
70         ${SRC}        |- /src --> Base directory for sources
71         ${SHARE}      |- /share --> Base directory for other files
72
73         ${HOME}/.nepi/nepi-exp --> Base directory for single-experiment files
74                       |
75         ${EXP_HOME}   |- /<exp-id>  --> Base directory for experiment exp-id
76                           |
77         ${APP_HOME}       |- /<app-guid> --> Base directory for application 
78                                |     specific files (e.g. command.sh, input)
79                                | 
80         ${RUN_HOME}            |- /<run-id> --> Base directory for run specific
81
82     """
83
84     _rtype = "LinuxApplication"
85     _help = "Runs an application on a Linux host with a BASH command "
86     _backend_type = "linux"
87
88     @classmethod
89     def _register_attributes(cls):
90         command = Attribute("command", "Command to execute at application start. "
91                 "Note that commands will be executed in the ${RUN_HOME} directory, "
92                 "make sure to take this into account when using relative paths. ", 
93                 flags = Flags.Design)
94         forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", 
95                 flags = Flags.Design)
96         env = Attribute("env", "Environment variables string for command execution",
97                 flags = Flags.Design)
98         sudo = Attribute("sudo", "Run with root privileges", 
99                 flags = Flags.Design)
100         depends = Attribute("depends", 
101                 "Space-separated list of packages required to run the application",
102                 flags = Flags.Design)
103         sources = Attribute("sources", 
104                 "semi-colon separated list of regular files to be uploaded to ${SRC} "
105                 "directory prior to building. Archives won't be expanded automatically. "
106                 "Sources are globally available for all experiments unless "
107                 "cleanHome is set to True (This will delete all sources). ",
108                 flags = Flags.Design)
109         files = Attribute("files", 
110                 "Space-separated list of regular miscellaneous files to be uploaded "
111                 "to ${SHARE} directory. "
112                 "Files are globally available for all experiments unless "
113                 "cleanHome is set to True (This will delete all files). ",
114                 flags = Flags.Design)
115         libs = Attribute("libs", 
116                 "Space-separated list of libraries (e.g. .so files) to be uploaded "
117                 "to ${LIB} directory. "
118                 "Libraries are globally available for all experiments unless "
119                 "cleanHome is set to True (This will delete all files). ",
120                 flags = Flags.Design)
121         bins = Attribute("bins", 
122                 "Space-separated list of binary files to be uploaded "
123                 "to ${BIN} directory. "
124                 "Binaries are globally available for all experiments unless "
125                 "cleanHome is set to True (This will delete all files). ",
126                 flags = Flags.Design)
127         code = Attribute("code", 
128                 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
129                 flags = Flags.Design)
130         build = Attribute("build", 
131                 "Build commands to execute after deploying the sources. "
132                 "Sources are uploaded to the ${SRC} directory and code "
133                 "is uploaded to the ${APP_HOME} directory. \n"
134                 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
135                 "./configure && make && make clean.\n"
136                 "Make sure to make the build commands return with a nonzero exit "
137                 "code on error.",
138                 flags = Flags.Design)
139         install = Attribute("install", 
140                 "Commands to transfer built files to their final destinations. "
141                 "Install commands are executed after build commands. ",
142                 flags = Flags.Design)
143         stdin = Attribute("stdin", "Standard input for the 'command'", 
144                 flags = Flags.Design)
145         tear_down = Attribute("tearDown", "Command to be executed just before " 
146                 "releasing the resource", 
147                 flags = Flags.Design)
148
149         cls._register_attribute(command)
150         cls._register_attribute(forward_x11)
151         cls._register_attribute(env)
152         cls._register_attribute(sudo)
153         cls._register_attribute(depends)
154         cls._register_attribute(sources)
155         cls._register_attribute(code)
156         cls._register_attribute(files)
157         cls._register_attribute(bins)
158         cls._register_attribute(libs)
159         cls._register_attribute(build)
160         cls._register_attribute(install)
161         cls._register_attribute(stdin)
162         cls._register_attribute(tear_down)
163
164     @classmethod
165     def _register_traces(cls):
166         stdout = Trace("stdout", "Standard output stream", enabled = True)
167         stderr = Trace("stderr", "Standard error stream", enabled = True)
168
169         cls._register_trace(stdout)
170         cls._register_trace(stderr)
171
172     def __init__(self, ec, guid):
173         super(LinuxApplication, self).__init__(ec, guid)
174         self._pid = None
175         self._ppid = None
176         self._home = "app-%s" % self.guid
177         # whether the command should run in foreground attached
178         # to a terminal
179         self._in_foreground = False
180
181         # whether to use sudo to kill the application process
182         self._sudo_kill = False
183
184         # keep a reference to the running process handler when 
185         # the command is not executed as remote daemon in background
186         self._proc = None
187
188         # timestamp of last state check of the application
189         self._last_state_check = tnow()
190         
191     def log_message(self, msg):
192         return " guid %d - host %s - %s " % (self.guid, 
193                 self.node.get("hostname"), msg)
194
195     @property
196     def node(self):
197         node = self.get_connected(LinuxNode.get_rtype())
198         if node: return node[0]
199         return None
200
201     @property
202     def app_home(self):
203         return os.path.join(self.node.exp_home, self._home)
204
205     @property
206     def run_home(self):
207         return os.path.join(self.app_home, self.ec.run_id)
208
209     @property
210     def pid(self):
211         return self._pid
212
213     @property
214     def ppid(self):
215         return self._ppid
216
217     @property
218     def in_foreground(self):
219         """ Returns True if the command needs to be executed in foreground.
220         This means that command will be executed using 'execute' instead of
221         'run' ('run' executes a command in background and detached from the 
222         terminal)
223         
224         When using X11 forwarding option, the command can not run in background
225         and detached from a terminal, since we need to keep the terminal attached 
226         to interact with it.
227         """
228         return self.get("forwardX11") or self._in_foreground
229
230     def trace_filepath(self, filename):
231         return os.path.join(self.run_home, filename)
232
233     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
234         self.info("Retrieving '%s' trace %s " % (name, attr))
235
236         path = self.trace_filepath(name)
237         
238         command = "(test -f %s && echo 'success') || echo 'error'" % path
239         (out, err), proc = self.node.execute(command)
240
241         if (err and proc.poll()) or out.find("error") != -1:
242             msg = " Couldn't find trace %s " % name
243             self.error(msg, out, err)
244             return None
245     
246         if attr == TraceAttr.PATH:
247             return path
248
249         if attr == TraceAttr.ALL:
250             (out, err), proc = self.node.check_output(self.run_home, name)
251             
252             if proc.poll():
253                 msg = " Couldn't read trace %s " % name
254                 self.error(msg, out, err)
255                 return None
256
257             return out
258
259         if attr == TraceAttr.STREAM:
260             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
261         elif attr == TraceAttr.SIZE:
262             cmd = "stat -c%%s %s " % path
263
264         (out, err), proc = self.node.execute(cmd)
265
266         if proc.poll():
267             msg = " Couldn't find trace %s " % name
268             self.error(msg, out, err)
269             return None
270         
271         if attr == TraceAttr.SIZE:
272             out = int(out.strip())
273
274         return out
275
276     def do_provision(self):
277         # take a snapshot of the system if user is root
278         # to assure cleanProcess kill every nepi process
279         if self.node.get("username") == 'root':
280             import pickle
281             procs = dict()
282             ps_aux = "ps aux |awk '{print $2,$11}'"
283             (out, err), proc = self.node.execute(ps_aux)
284             for line in out.strip().split("\n"):
285                 parts = line.strip().split(" ")
286                 procs[parts[0]] = parts[1]
287             pickle.dump(procs, open("/tmp/save.proc", "wb"))
288             
289         # create run dir for application
290         self.node.mkdir(self.run_home)
291    
292         # List of all the provision methods to invoke
293         steps = [
294             # upload sources
295             self.upload_sources,
296             # upload files
297             self.upload_files,
298             # upload binaries
299             self.upload_binaries,
300             # upload libraries
301             self.upload_libraries,
302             # upload code
303             self.upload_code,
304             # upload stdin
305             self.upload_stdin,
306             # install dependencies
307             self.install_dependencies,
308             # build
309             self.build,
310             # Install
311             self.install]
312
313         command = []
314
315         # Since provisioning takes a long time, before
316         # each step we check that the EC is still 
317         for step in steps:
318             if self.ec.abort:
319                 self.debug("Interrupting provisioning. EC says 'ABORT")
320                 return
321             
322             ret = step()
323             if ret:
324                 command.append(ret)
325
326         # upload deploy script
327         deploy_command = ";".join(command)
328         self.execute_deploy_command(deploy_command)
329
330         # upload start script
331         self.upload_start_command()
332        
333         self.info("Provisioning finished")
334
335         super(LinuxApplication, self).do_provision()
336
337     def upload_start_command(self, overwrite = False):
338         # Upload command to remote bash script
339         # - only if command can be executed in background and detached
340         command = self.get("command")
341
342         if command and not self.in_foreground:
343             self.info("Uploading command '%s'" % command)
344
345             # replace application specific paths in the command
346             command = self.replace_paths(command)
347             
348             # replace application specific paths in the environment
349             env = self.get("env")
350             env = env and self.replace_paths(env)
351
352             shfile = os.path.join(self.app_home, "start.sh")
353
354             self.node.upload_command(command, 
355                     shfile = shfile,
356                     env = env,
357                     overwrite = overwrite)
358
359     def execute_deploy_command(self, command):
360         if command:
361             # Upload the command to a bash script and run it
362             # in background ( but wait until the command has
363             # finished to continue )
364             shfile = os.path.join(self.app_home, "deploy.sh")
365             self.node.run_and_wait(command, self.run_home,
366                     shfile = shfile, 
367                     overwrite = False,
368                     pidfile = "deploy_pidfile", 
369                     ecodefile = "deploy_exitcode", 
370                     stdout = "deploy_stdout", 
371                     stderr = "deploy_stderr")
372
373     def upload_sources(self, src_dir = None):
374         sources = self.get("sources")
375    
376         command = ""
377
378         if not src_dir:
379             src_dir = self.node.src_dir
380
381         if sources:
382             self.info("Uploading sources ")
383
384             sources = map(str.strip, sources.split(";"))
385
386             # Separate sources that should be downloaded from 
387             # the web, from sources that should be uploaded from
388             # the local machine
389             command = []
390             for source in list(sources):
391                 if source.startswith("http") or source.startswith("https"):
392                     # remove the hhtp source from the sources list
393                     sources.remove(source)
394
395                     command.append( " ( " 
396                             # Check if the source already exists
397                             " ls %(src_dir)s/%(basename)s "
398                             " || ( "
399                             # If source doesn't exist, download it and check
400                             # that it it downloaded ok
401                             "   wget -c --directory-prefix=%(src_dir)s %(source)s && "
402                             "   ls %(src_dir)s/%(basename)s "
403                             " ) ) " % {
404                                 "basename": os.path.basename(source),
405                                 "source": source,
406                                 "src_dir": src_dir
407                                 })
408
409             command = " && ".join(command)
410
411             # replace application specific paths in the command
412             command = self.replace_paths(command)
413        
414             if sources:
415                 sources = ';'.join(sources)
416                 self.node.upload(sources, src_dir, overwrite = False)
417
418         return command
419
420     def upload_files(self):
421         files = self.get("files")
422
423         if files:
424             self.info("Uploading files %s " % files)
425             self.node.upload(files, self.node.share_dir, overwrite = False)
426
427     def upload_libraries(self):
428         libs = self.get("libs")
429
430         if libs:
431             self.info("Uploading libraries %s " % libaries)
432             self.node.upload(libs, self.node.lib_dir, overwrite = False)
433
434     def upload_binaries(self):
435         bins = self.get("bins")
436
437         if bins:
438             self.info("Uploading binaries %s " % binaries)
439             self.node.upload(bins, self.node.bin_dir, overwrite = False)
440
441     def upload_code(self):
442         code = self.get("code")
443
444         if code:
445             self.info("Uploading code")
446
447             dst = os.path.join(self.app_home, "code")
448             self.node.upload(code, dst, overwrite = False, text = True)
449
450     def upload_stdin(self):
451         stdin = self.get("stdin")
452         if stdin:
453             # create dir for sources
454             self.info("Uploading stdin")
455             
456             # upload stdin file to ${SHARE_DIR} directory
457             if os.path.isfile(stdin):
458                 basename = os.path.basename(stdin)
459                 dst = os.path.join(self.node.share_dir, basename)
460             else:
461                 dst = os.path.join(self.app_home, "stdin")
462
463             self.node.upload(stdin, dst, overwrite = False, text = True)
464
465             # create "stdin" symlink on ${APP_HOME} directory
466             command = "( cd %(app_home)s ; [ ! -f stdin ] &&  ln -s %(stdin)s stdin )" % ({
467                 "app_home": self.app_home, 
468                 "stdin": dst })
469
470             return command
471
472     def install_dependencies(self):
473         depends = self.get("depends")
474         if depends:
475             self.info("Installing dependencies %s" % depends)
476             return self.node.install_packages_command(depends)
477
478     def build(self):
479         build = self.get("build")
480
481         if build:
482             self.info("Building sources ")
483             
484             # replace application specific paths in the command
485             return self.replace_paths(build)
486
487     def install(self):
488         install = self.get("install")
489
490         if install:
491             self.info("Installing sources ")
492
493             # replace application specific paths in the command
494             return self.replace_paths(install)
495
496     def do_deploy(self):
497         # Wait until node is associated and deployed
498         node = self.node
499         if not node or node.state < ResourceState.READY:
500             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
501             self.ec.schedule(reschedule_delay, self.deploy)
502         else:
503             command = self.get("command") or ""
504             self.info("Deploying command '%s' " % command)
505             self.do_discover()
506             self.do_provision()
507
508             super(LinuxApplication, self).do_deploy()
509    
510     def do_start(self):
511         command = self.get("command")
512
513         self.info("Starting command '%s'" % command)
514
515         if not command:
516             # If no command was given (i.e. Application was used for dependency
517             # installation), then the application is directly marked as STOPPED
518             super(LinuxApplication, self).set_stopped()
519         else:
520             if self.in_foreground:
521                 self._run_in_foreground()
522             else:
523                 self._run_in_background()
524
525             super(LinuxApplication, self).do_start()
526
527     def _run_in_foreground(self):
528         command = self.get("command")
529         sudo = self.get("sudo") or False
530         x11 = self.get("forwardX11")
531         env = self.get("env")
532
533         # Command will be launched in foreground and attached to the
534         # terminal using the node 'execute' in non blocking mode.
535
536         # We save the reference to the process in self._proc 
537         # to be able to kill the process from the stop method.
538         # We also set blocking = False, since we don't want the
539         # thread to block until the execution finishes.
540         (out, err), self._proc = self.execute_command(command, 
541                 env = env,
542                 sudo = sudo,
543                 forward_x11 = x11,
544                 blocking = False)
545
546         if self._proc.poll():
547             self.error(msg, out, err)
548             raise RuntimeError, msg
549
550     def _run_in_background(self):
551         command = self.get("command")
552         env = self.get("env")
553         sudo = self.get("sudo") or False
554
555         stdout = "stdout"
556         stderr = "stderr"
557         stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
558                 else None
559
560         # Command will be run as a daemon in baground and detached from any
561         # terminal.
562         # The command to run was previously uploaded to a bash script
563         # during deployment, now we launch the remote script using 'run'
564         # method from the node.
565         cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
566         (out, err), proc = self.node.run(cmd, self.run_home, 
567             stdin = stdin, 
568             stdout = stdout,
569             stderr = stderr,
570             sudo = sudo)
571
572         # check if execution errors occurred
573         msg = " Failed to start command '%s' " % command
574         
575         if proc.poll():
576             self.error(msg, out, err)
577             raise RuntimeError, msg
578     
579         # Wait for pid file to be generated
580         pid, ppid = self.node.wait_pid(self.run_home)
581         if pid: self._pid = int(pid)
582         if ppid: self._ppid = int(ppid)
583
584         # If the process is not running, check for error information
585         # on the remote machine
586         if not self.pid or not self.ppid:
587             (out, err), proc = self.node.check_errors(self.run_home,
588                     stderr = stderr) 
589
590             # Out is what was written in the stderr file
591             if err:
592                 msg = " Failed to start command '%s' " % command
593                 self.error(msg, out, err)
594                 raise RuntimeError, msg
595     
596     def do_stop(self):
597         """ Stops application execution
598         """
599         command = self.get('command') or ''
600
601         if self.state == ResourceState.STARTED:
602         
603             self.info("Stopping command '%s' " % command)
604         
605             # If the command is running in foreground (it was launched using
606             # the node 'execute' method), then we use the handler to the Popen
607             # process to kill it. Else we send a kill signal using the pid and ppid
608             # retrieved after running the command with the node 'run' method
609             if self._proc:
610                 self._proc.kill()
611             else:
612                 # Only try to kill the process if the pid and ppid
613                 # were retrieved
614                 if self.pid and self.ppid:
615                     (out, err), proc = self.node.kill(self.pid, self.ppid,
616                             sudo = self._sudo_kill)
617
618                     # TODO: check if execution errors occurred
619                     if (proc and proc.poll()) or err:
620                         msg = " Failed to STOP command '%s' " % self.get("command")
621                         self.error(msg, out, err)
622         
623             super(LinuxApplication, self).do_stop()
624
625     def do_release(self):
626         self.info("Releasing resource")
627
628         tear_down = self.get("tearDown")
629         if tear_down:
630             self.node.execute(tear_down)
631
632         self.do_stop()
633
634         super(LinuxApplication, self).do_release()
635         
636     @property
637     def state(self):
638         """ Returns the state of the application
639         """
640         if self._state == ResourceState.STARTED:
641             if self.in_foreground:
642                 # Check if the process we used to execute the command
643                 # is still running ...
644                 retcode = self._proc.poll()
645
646                 # retcode == None -> running
647                 # retcode > 0 -> error
648                 # retcode == 0 -> finished
649                 if retcode:
650                     out = ""
651                     msg = " Failed to execute command '%s'" % self.get("command")
652                     err = self._proc.stderr.read()
653                     self.error(msg, out, err)
654                     self.do_fail()
655
656                 elif retcode == 0:
657                     self.set_stopped()
658             else:
659                 # We need to query the status of the command we launched in 
660                 # background. In order to avoid overwhelming the remote host and
661                 # the local processor with too many ssh queries, the state is only
662                 # requested every 'state_check_delay' seconds.
663                 state_check_delay = 0.5
664                 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
665                     if self.pid and self.ppid:
666                         # Make sure the process is still running in background
667                         status = self.node.status(self.pid, self.ppid)
668
669                         if status == ProcStatus.FINISHED:
670                             # If the program finished, check if execution
671                             # errors occurred
672                             (out, err), proc = self.node.check_errors(
673                                     self.run_home)
674
675                             if err:
676                                 msg = "Failed to execute command '%s'" % \
677                                         self.get("command")
678                                 self.error(msg, out, err)
679                                 self.do_fail()
680                             else:
681                                 self.set_stopped()
682
683                     self._last_state_check = tnow()
684
685         return self._state
686
687     def execute_command(self, command, 
688             env = None,
689             sudo = False,
690             forward_x11 = False,
691             blocking = False):
692
693         environ = ""
694         if env:
695             environ = self.node.format_environment(env, inline = True)
696         command = environ + command
697         command = self.replace_paths(command)
698
699         return self.node.execute(command,
700                 sudo = sudo,
701                 forward_x11 = forward_x11,
702                 blocking = blocking)
703
704     def replace_paths(self, command):
705         """
706         Replace all special path tags with shell-escaped actual paths.
707         """
708         return ( command
709             .replace("${USR}", self.node.usr_dir)
710             .replace("${LIB}", self.node.lib_dir)
711             .replace("${BIN}", self.node.bin_dir)
712             .replace("${SRC}", self.node.src_dir)
713             .replace("${SHARE}", self.node.share_dir)
714             .replace("${EXP}", self.node.exp_dir)
715             .replace("${EXP_HOME}", self.node.exp_home)
716             .replace("${APP_HOME}", self.app_home)
717             .replace("${RUN_HOME}", self.run_home)
718             .replace("${NODE_HOME}", self.node.node_home)
719             .replace("${HOME}", self.node.home_dir)
720             )
721
722     def valid_connection(self, guid):
723         # TODO: Validate!
724         return True
725