Fix #29 LinuxApplication passing a list of files as 'sources' not working
[nepi.git] / src / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23         ResourceState, reschedule_delay
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sshfuncs import ProcStatus
26 from nepi.util.timefuncs import tnow, tdiffsec
27
28 import os
29 import subprocess
30
31 # TODO: Resolve wildcards in commands!!
32 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
33
34 @clsinit_copy
35 class LinuxApplication(ResourceManager):
36     """
37     .. class:: Class Args :
38       
39         :param ec: The Experiment controller
40         :type ec: ExperimentController
41         :param guid: guid of the RM
42         :type guid: int
43
44     .. note::
45
46         A LinuxApplication RM represents a process that can be executed in
47         a remote Linux host using SSH.
48
49         The LinuxApplication RM takes care of uploadin sources and any files
50         needed to run the experiment, to the remote host. 
51         It also allows to provide source compilation (build) and installation 
52         instructions, and takes care of automating the sources build and 
53         installation tasks for the user.
54
55         It is important to note that files uploaded to the remote host have
56         two possible scopes: single-experiment or multi-experiment.
57         Single experiment files are those that will not be re-used by other 
58         experiments. Multi-experiment files are those that will.
59         Sources and shared files are always made available to all experiments.
60
61         Directory structure:
62
63         The directory structure used by LinuxApplication RM at the Linux
64         host is the following:
65
66         ${HOME}/nepi-usr --> Base directory for multi-experiment files
67                       |
68         ${LIB}        |- /lib --> Base directory for libraries
69         ${BIN}        |- /bin --> Base directory for binary files
70         ${SRC}        |- /src --> Base directory for sources
71         ${SHARE}      |- /share --> Base directory for other files
72
73         ${HOME}/nepi-exp --> Base directory for single-experiment files
74                       |
75         ${EXP_HOME}   |- /<exp-id>  --> Base directory for experiment exp-id
76                           |
77         ${APP_HOME}       |- /<app-guid> --> Base directory for application 
78                                |     specific files (e.g. command.sh, input)
79                                | 
80         ${RUN_HOME}            |- /<run-id> --> Base directory for run specific
81
82     """
83
84     _rtype = "LinuxApplication"
85     _help = "Runs an application on a Linux host with a BASH command "
86     _backend_type = "linux"
87
88     @classmethod
89     def _register_attributes(cls):
90         command = Attribute("command", "Command to execute at application start. "
91                 "Note that commands will be executed in the ${RUN_HOME} directory, "
92                 "make sure to take this into account when using relative paths. ", 
93                 flags = Flags.ExecReadOnly)
94         forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", 
95                 flags = Flags.ExecReadOnly)
96         env = Attribute("env", "Environment variables string for command execution",
97                 flags = Flags.ExecReadOnly)
98         sudo = Attribute("sudo", "Run with root privileges", 
99                 flags = Flags.ExecReadOnly)
100         depends = Attribute("depends", 
101                 "Space-separated list of packages required to run the application",
102                 flags = Flags.ExecReadOnly)
103         sources = Attribute("sources", 
104                 "Colon-separated list of regular files to be uploaded to ${SRC} "
105                 "directory prior to building. Archives won't be expanded automatically. "
106                 "Sources are globally available for all experiments unless "
107                 "cleanHome is set to True (This will delete all sources). ",
108                 flags = Flags.ExecReadOnly)
109         files = Attribute("files", 
110                 "Space-separated list of regular miscellaneous files to be uploaded "
111                 "to ${SHARE} directory. "
112                 "Files are globally available for all experiments unless "
113                 "cleanHome is set to True (This will delete all files). ",
114                 flags = Flags.ExecReadOnly)
115         libs = Attribute("libs", 
116                 "Space-separated list of libraries (e.g. .so files) to be uploaded "
117                 "to ${LIB} directory. "
118                 "Libraries are globally available for all experiments unless "
119                 "cleanHome is set to True (This will delete all files). ",
120                 flags = Flags.ExecReadOnly)
121         bins = Attribute("bins", 
122                 "Space-separated list of binary files to be uploaded "
123                 "to ${BIN} directory. "
124                 "Binaries are globally available for all experiments unless "
125                 "cleanHome is set to True (This will delete all files). ",
126                 flags = Flags.ExecReadOnly)
127         code = Attribute("code", 
128                 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
129                 flags = Flags.ExecReadOnly)
130         build = Attribute("build", 
131                 "Build commands to execute after deploying the sources. "
132                 "Sources are uploaded to the ${SRC} directory and code "
133                 "is uploaded to the ${APP_HOME} directory. \n"
134                 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
135                 "./configure && make && make clean.\n"
136                 "Make sure to make the build commands return with a nonzero exit "
137                 "code on error.",
138                 flags = Flags.ReadOnly)
139         install = Attribute("install", 
140                 "Commands to transfer built files to their final destinations. "
141                 "Install commands are executed after build commands. ",
142                 flags = Flags.ReadOnly)
143         stdin = Attribute("stdin", "Standard input for the 'command'", 
144                 flags = Flags.ExecReadOnly)
145         tear_down = Attribute("tearDown", "Command to be executed just before " 
146                 "releasing the resource", 
147                 flags = Flags.ReadOnly)
148
149         cls._register_attribute(command)
150         cls._register_attribute(forward_x11)
151         cls._register_attribute(env)
152         cls._register_attribute(sudo)
153         cls._register_attribute(depends)
154         cls._register_attribute(sources)
155         cls._register_attribute(code)
156         cls._register_attribute(files)
157         cls._register_attribute(bins)
158         cls._register_attribute(libs)
159         cls._register_attribute(build)
160         cls._register_attribute(install)
161         cls._register_attribute(stdin)
162         cls._register_attribute(tear_down)
163
164     @classmethod
165     def _register_traces(cls):
166         stdout = Trace("stdout", "Standard output stream", enabled = True)
167         stderr = Trace("stderr", "Standard error stream", enabled = True)
168
169         cls._register_trace(stdout)
170         cls._register_trace(stderr)
171
172     def __init__(self, ec, guid):
173         super(LinuxApplication, self).__init__(ec, guid)
174         self._pid = None
175         self._ppid = None
176         self._home = "app-%s" % self.guid
177         # whether the command should run in foreground attached
178         # to a terminal
179         self._in_foreground = False
180
181         # whether to use sudo to kill the application process
182         self._sudo_kill = False
183
184         # keep a reference to the running process handler when 
185         # the command is not executed as remote daemon in background
186         self._proc = None
187
188         # timestamp of last state check of the application
189         self._last_state_check = tnow()
190         
191     def log_message(self, msg):
192         return " guid %d - host %s - %s " % (self.guid, 
193                 self.node.get("hostname"), msg)
194
195     @property
196     def node(self):
197         node = self.get_connected(LinuxNode.get_rtype())
198         if node: return node[0]
199         return None
200
201     @property
202     def app_home(self):
203         return os.path.join(self.node.exp_home, self._home)
204
205     @property
206     def run_home(self):
207         return os.path.join(self.app_home, self.ec.run_id)
208
209     @property
210     def pid(self):
211         return self._pid
212
213     @property
214     def ppid(self):
215         return self._ppid
216
217     @property
218     def in_foreground(self):
219         """ Returns True if the command needs to be executed in foreground.
220         This means that command will be executed using 'execute' instead of
221         'run' ('run' executes a command in background and detached from the 
222         terminal)
223         
224         When using X11 forwarding option, the command can not run in background
225         and detached from a terminal, since we need to keep the terminal attached 
226         to interact with it.
227         """
228         return self.get("forwardX11") or self._in_foreground
229
230     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
231         self.info("Retrieving '%s' trace %s " % (name, attr))
232
233         path = os.path.join(self.run_home, name)
234         
235         command = "(test -f %s && echo 'success') || echo 'error'" % path
236         (out, err), proc = self.node.execute(command)
237
238         if (err and proc.poll()) or out.find("error") != -1:
239             msg = " Couldn't find trace %s " % name
240             self.error(msg, out, err)
241             return None
242     
243         if attr == TraceAttr.PATH:
244             return path
245
246         if attr == TraceAttr.ALL:
247             (out, err), proc = self.node.check_output(self.run_home, name)
248             
249             if proc.poll():
250                 msg = " Couldn't read trace %s " % name
251                 self.error(msg, out, err)
252                 return None
253
254             return out
255
256         if attr == TraceAttr.STREAM:
257             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
258         elif attr == TraceAttr.SIZE:
259             cmd = "stat -c%%s %s " % path
260
261         (out, err), proc = self.node.execute(cmd)
262
263         if proc.poll():
264             msg = " Couldn't find trace %s " % name
265             self.error(msg, out, err)
266             return None
267         
268         if attr == TraceAttr.SIZE:
269             out = int(out.strip())
270
271         return out
272
273     def do_provision(self):
274         # take a snapshot of the system if user is root
275         # to assure cleanProcess kill every nepi process
276         if self.node.get("username") == 'root':
277             import pickle
278             procs = dict()
279             ps_aux = "ps aux |awk '{print $2,$11}'"
280             (out, err), proc = self.node.execute(ps_aux)
281             for line in out.strip().split("\n"):
282                 parts = line.strip().split(" ")
283                 procs[parts[0]] = parts[1]
284             pickle.dump(procs, open("/tmp/save.proc", "wb"))
285             
286         # create run dir for application
287         self.node.mkdir(self.run_home)
288    
289         # List of all the provision methods to invoke
290         steps = [
291             # upload sources
292             self.upload_sources,
293             # upload files
294             self.upload_files,
295             # upload binaries
296             self.upload_binaries,
297             # upload libraries
298             self.upload_libraries,
299             # upload code
300             self.upload_code,
301             # upload stdin
302             self.upload_stdin,
303             # install dependencies
304             self.install_dependencies,
305             # build
306             self.build,
307             # Install
308             self.install]
309
310         command = []
311
312         # Since provisioning takes a long time, before
313         # each step we check that the EC is still 
314         for step in steps:
315             if self.ec.abort:
316                 self.debug("Interrupting provisioning. EC says 'ABORT")
317                 return
318             
319             ret = step()
320             if ret:
321                 command.append(ret)
322
323         # upload deploy script
324         deploy_command = ";".join(command)
325         self.execute_deploy_command(deploy_command)
326
327         # upload start script
328         self.upload_start_command()
329        
330         self.info("Provisioning finished")
331
332         super(LinuxApplication, self).do_provision()
333
334     def upload_start_command(self, overwrite = False):
335         # Upload command to remote bash script
336         # - only if command can be executed in background and detached
337         command = self.get("command")
338
339         if command and not self.in_foreground:
340             self.info("Uploading command '%s'" % command)
341
342             # replace application specific paths in the command
343             command = self.replace_paths(command)
344             
345             # replace application specific paths in the environment
346             env = self.get("env")
347             env = env and self.replace_paths(env)
348
349             shfile = os.path.join(self.app_home, "start.sh")
350
351             self.node.upload_command(command, 
352                     shfile = shfile,
353                     env = env,
354                     overwrite = overwrite)
355
356     def execute_deploy_command(self, command):
357         if command:
358             # Upload the command to a bash script and run it
359             # in background ( but wait until the command has
360             # finished to continue )
361             shfile = os.path.join(self.app_home, "deploy.sh")
362             self.node.run_and_wait(command, self.run_home,
363                     shfile = shfile, 
364                     overwrite = False,
365                     pidfile = "deploy_pidfile", 
366                     ecodefile = "deploy_exitcode", 
367                     stdout = "deploy_stdout", 
368                     stderr = "deploy_stderr")
369
370     def upload_sources(self):
371         sources = self.get("sources")
372    
373         command = ""
374
375         if sources:
376             self.info("Uploading sources ")
377
378             sources = map(str.strip, sources.split(";"))
379
380             # Separate sources that should be downloaded from 
381             # the web, from sources that should be uploaded from
382             # the local machine
383             command = []
384             for source in list(sources):
385                 if source.startswith("http") or source.startswith("https"):
386                     # remove the hhtp source from the sources list
387                     sources.remove(source)
388
389                     command.append( " ( " 
390                             # Check if the source already exists
391                             " ls ${SRC}/%(basename)s "
392                             " || ( "
393                             # If source doesn't exist, download it and check
394                             # that it it downloaded ok
395                             "   wget -c --directory-prefix=${SRC} %(source)s && "
396                             "   ls ${SRC}/%(basename)s "
397                             " ) ) " % {
398                                 "basename": os.path.basename(source),
399                                 "source": source
400                                 })
401
402             command = " && ".join(command)
403
404             # replace application specific paths in the command
405             command = self.replace_paths(command)
406        
407             if sources:
408                 sources = ' '.join(sources)
409                 self.node.upload(sources, self.node.src_dir, overwrite = False)
410
411         return command
412
413     def upload_files(self):
414         files = self.get("files")
415
416         if files:
417             self.info("Uploading files %s " % files)
418             self.node.upload(files, self.node.share_dir, overwrite = False)
419
420     def upload_libraries(self):
421         libs = self.get("libs")
422
423         if libs:
424             self.info("Uploading libraries %s " % libaries)
425             self.node.upload(libs, self.node.lib_dir, overwrite = False)
426
427     def upload_binaries(self):
428         bins = self.get("bins")
429
430         if bins:
431             self.info("Uploading binaries %s " % binaries)
432             self.node.upload(bins, self.node.bin_dir, overwrite = False)
433
434     def upload_code(self):
435         code = self.get("code")
436
437         if code:
438             self.info("Uploading code")
439
440             dst = os.path.join(self.app_home, "code")
441             self.node.upload(code, dst, overwrite = False, text = True)
442
443     def upload_stdin(self):
444         stdin = self.get("stdin")
445         if stdin:
446             # create dir for sources
447             self.info("Uploading stdin")
448             
449             # upload stdin file to ${SHARE_DIR} directory
450             basename = os.path.basename(stdin)
451             dst = os.path.join(self.node.share_dir, basename)
452             self.node.upload(stdin, dst, overwrite = False, text = True)
453
454             # create "stdin" symlink on ${APP_HOME} directory
455             command = "( cd %(app_home)s ; [ ! -f stdin ] &&  ln -s %(stdin)s stdin )" % ({
456                 "app_home": self.app_home, 
457                 "stdin": dst })
458
459             return command
460
461     def install_dependencies(self):
462         depends = self.get("depends")
463         if depends:
464             self.info("Installing dependencies %s" % depends)
465             return self.node.install_packages_command(depends)
466
467     def build(self):
468         build = self.get("build")
469
470         if build:
471             self.info("Building sources ")
472             
473             # replace application specific paths in the command
474             return self.replace_paths(build)
475
476     def install(self):
477         install = self.get("install")
478
479         if install:
480             self.info("Installing sources ")
481
482             # replace application specific paths in the command
483             return self.replace_paths(install)
484
485     def do_deploy(self):
486         # Wait until node is associated and deployed
487         node = self.node
488         if not node or node.state < ResourceState.READY:
489             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
490             self.ec.schedule(reschedule_delay, self.deploy)
491         else:
492             command = self.get("command") or ""
493             self.info("Deploying command '%s' " % command)
494             self.do_discover()
495             self.do_provision()
496
497             super(LinuxApplication, self).do_deploy()
498    
499     def do_start(self):
500         command = self.get("command")
501
502         self.info("Starting command '%s'" % command)
503
504         if not command:
505             # If no command was given (i.e. Application was used for dependency
506             # installation), then the application is directly marked as STOPPED
507             super(LinuxApplication, self).set_stopped()
508         else:
509             if self.in_foreground:
510                 self._run_in_foreground()
511             else:
512                 self._run_in_background()
513
514             super(LinuxApplication, self).do_start()
515
516     def _run_in_foreground(self):
517         command = self.get("command")
518         sudo = self.get("sudo") or False
519         x11 = self.get("forwardX11")
520         env = self.get("env")
521
522         # For a command being executed in foreground, if there is stdin,
523         # it is expected to be text string not a file or pipe
524         stdin = self.get("stdin") or None
525
526         # Command will be launched in foreground and attached to the
527         # terminal using the node 'execute' in non blocking mode.
528
529         # We save the reference to the process in self._proc 
530         # to be able to kill the process from the stop method.
531         # We also set blocking = False, since we don't want the
532         # thread to block until the execution finishes.
533         (out, err), self._proc = self.execute_command(command, 
534                 env = env,
535                 sudo = sudo,
536                 stdin = stdin,
537                 forward_x11 = x11,
538                 blocking = False)
539
540         if self._proc.poll():
541             self.error(msg, out, err)
542             raise RuntimeError, msg
543
544     def _run_in_background(self):
545         command = self.get("command")
546         env = self.get("env")
547         sudo = self.get("sudo") or False
548
549         stdout = "stdout"
550         stderr = "stderr"
551         stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
552                 else None
553
554         # Command will be run as a daemon in baground and detached from any
555         # terminal.
556         # The command to run was previously uploaded to a bash script
557         # during deployment, now we launch the remote script using 'run'
558         # method from the node.
559         cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
560         (out, err), proc = self.node.run(cmd, self.run_home, 
561             stdin = stdin, 
562             stdout = stdout,
563             stderr = stderr,
564             sudo = sudo)
565
566         # check if execution errors occurred
567         msg = " Failed to start command '%s' " % command
568         
569         if proc.poll():
570             self.error(msg, out, err)
571             raise RuntimeError, msg
572     
573         # Wait for pid file to be generated
574         pid, ppid = self.node.wait_pid(self.run_home)
575         if pid: self._pid = int(pid)
576         if ppid: self._ppid = int(ppid)
577
578         # If the process is not running, check for error information
579         # on the remote machine
580         if not self.pid or not self.ppid:
581             (out, err), proc = self.node.check_errors(self.run_home,
582                     stderr = stderr) 
583
584             # Out is what was written in the stderr file
585             if err:
586                 msg = " Failed to start command '%s' " % command
587                 self.error(msg, out, err)
588                 raise RuntimeError, msg
589     
590     def do_stop(self):
591         """ Stops application execution
592         """
593         command = self.get('command') or ''
594
595         if self.state == ResourceState.STARTED:
596         
597             self.info("Stopping command '%s' " % command)
598         
599             # If the command is running in foreground (it was launched using
600             # the node 'execute' method), then we use the handler to the Popen
601             # process to kill it. Else we send a kill signal using the pid and ppid
602             # retrieved after running the command with the node 'run' method
603             if self._proc:
604                 self._proc.kill()
605             else:
606                 # Only try to kill the process if the pid and ppid
607                 # were retrieved
608                 if self.pid and self.ppid:
609                     (out, err), proc = self.node.kill(self.pid, self.ppid,
610                             sudo = self._sudo_kill)
611
612                     # TODO: check if execution errors occurred
613                     if proc.poll() or err:
614                         msg = " Failed to STOP command '%s' " % self.get("command")
615                         self.error(msg, out, err)
616         
617             super(LinuxApplication, self).do_stop()
618
619     def do_release(self):
620         self.info("Releasing resource")
621
622         tear_down = self.get("tearDown")
623         if tear_down:
624             self.node.execute(tear_down)
625
626         self.do_stop()
627
628         super(LinuxApplication, self).do_release()
629         
630     @property
631     def state(self):
632         """ Returns the state of the application
633         """
634         if self._state == ResourceState.STARTED:
635             if self.in_foreground:
636                 # Check if the process we used to execute the command
637                 # is still running ...
638                 retcode = self._proc.poll()
639
640                 # retcode == None -> running
641                 # retcode > 0 -> error
642                 # retcode == 0 -> finished
643                 if retcode:
644                     out = ""
645                     msg = " Failed to execute command '%s'" % self.get("command")
646                     err = self._proc.stderr.read()
647                     self.error(msg, out, err)
648                     self.do_fail()
649
650                 elif retcode == 0:
651                     self.set_stopped()
652             else:
653                 # We need to query the status of the command we launched in 
654                 # background. In order to avoid overwhelming the remote host and
655                 # the local processor with too many ssh queries, the state is only
656                 # requested every 'state_check_delay' seconds.
657                 state_check_delay = 0.5
658                 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
659                     if self.pid and self.ppid:
660                         # Make sure the process is still running in background
661                         status = self.node.status(self.pid, self.ppid)
662
663                         if status == ProcStatus.FINISHED:
664                             # If the program finished, check if execution
665                             # errors occurred
666                             (out, err), proc = self.node.check_errors(
667                                     self.run_home)
668
669                             if err:
670                                 msg = "Failed to execute command '%s'" % \
671                                         self.get("command")
672                                 self.error(msg, out, err)
673                                 self.do_fail()
674                             else:
675                                 self.set_stopped()
676
677                     self._last_state_check = tnow()
678
679         return self._state
680
681     def execute_command(self, command, 
682             env = None,
683             sudo = False,
684             stdin = None,
685             forward_x11 = False,
686             blocking = False):
687
688         environ = ""
689         if env:
690             environ = self.node.format_environment(env, inline = True)
691         command = environ + command
692         command = self.replace_paths(command)
693
694         return self.node.execute(command,
695                 sudo = sudo,
696                 stdin = stdin,
697                 forward_x11 = forward_x11,
698                 blocking = blocking)
699
700     def replace_paths(self, command):
701         """
702         Replace all special path tags with shell-escaped actual paths.
703         """
704         return ( command
705             .replace("${USR}", self.node.usr_dir)
706             .replace("${LIB}", self.node.lib_dir)
707             .replace("${BIN}", self.node.bin_dir)
708             .replace("${SRC}", self.node.src_dir)
709             .replace("${SHARE}", self.node.share_dir)
710             .replace("${EXP}", self.node.exp_dir)
711             .replace("${EXP_HOME}", self.node.exp_home)
712             .replace("${APP_HOME}", self.app_home)
713             .replace("${RUN_HOME}", self.run_home)
714             .replace("${NODE_HOME}", self.node.node_home)
715             .replace("${HOME}", self.node.home_dir)
716             )
717
718     def valid_connection(self, guid):
719         # TODO: Validate!
720         return True
721