Replacing _backend for _platform class attribute in ResourceManager
[nepi.git] / src / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23         ResourceState
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sshfuncs import ProcStatus
26 from nepi.util.timefuncs import tnow, tdiffsec
27
28 import os
29 import subprocess
30
31 # TODO: Resolve wildcards in commands!!
32 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
33
34 @clsinit_copy
35 class LinuxApplication(ResourceManager):
36     """
37     .. class:: Class Args :
38       
39         :param ec: The Experiment controller
40         :type ec: ExperimentController
41         :param guid: guid of the RM
42         :type guid: int
43
44     .. note::
45
46         A LinuxApplication RM represents a process that can be executed in
47         a remote Linux host using SSH.
48
49         The LinuxApplication RM takes care of uploadin sources and any files
50         needed to run the experiment, to the remote host. 
51         It also allows to provide source compilation (build) and installation 
52         instructions, and takes care of automating the sources build and 
53         installation tasks for the user.
54
55         It is important to note that files uploaded to the remote host have
56         two possible scopes: single-experiment or multi-experiment.
57         Single experiment files are those that will not be re-used by other 
58         experiments. Multi-experiment files are those that will.
59         Sources and shared files are always made available to all experiments.
60
61         Directory structure:
62
63         The directory structure used by LinuxApplication RM at the Linux
64         host is the following:
65
66         ${HOME}/.nepi/nepi-usr --> Base directory for multi-experiment files
67                       |
68         ${LIB}        |- /lib --> Base directory for libraries
69         ${BIN}        |- /bin --> Base directory for binary files
70         ${SRC}        |- /src --> Base directory for sources
71         ${SHARE}      |- /share --> Base directory for other files
72
73         ${HOME}/.nepi/nepi-exp --> Base directory for single-experiment files
74                       |
75         ${EXP_HOME}   |- /<exp-id>  --> Base directory for experiment exp-id
76                           |
77         ${APP_HOME}       |- /<app-guid> --> Base directory for application 
78                                |     specific files (e.g. command.sh, input)
79                                | 
80         ${RUN_HOME}            |- /<run-id> --> Base directory for run specific
81
82     """
83
84     _rtype = "linux::Application"
85     _help = "Runs an application on a Linux host with a BASH command "
86     _platform = "linux"
87
88     @classmethod
89     def _register_attributes(cls):
90         command = Attribute("command", "Command to execute at application start. "
91                 "Note that commands will be executed in the ${RUN_HOME} directory, "
92                 "make sure to take this into account when using relative paths. ", 
93                 flags = Flags.Design)
94         forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", 
95                 flags = Flags.Design)
96         env = Attribute("env", "Environment variables string for command execution",
97                 flags = Flags.Design)
98         sudo = Attribute("sudo", "Run with root privileges", 
99                 flags = Flags.Design)
100         depends = Attribute("depends", 
101                 "Space-separated list of packages required to run the application",
102                 flags = Flags.Design)
103         sources = Attribute("sources", 
104                 "semi-colon separated list of regular files to be uploaded to ${SRC} "
105                 "directory prior to building. Archives won't be expanded automatically. "
106                 "Sources are globally available for all experiments unless "
107                 "cleanHome is set to True (This will delete all sources). ",
108                 flags = Flags.Design)
109         files = Attribute("files", 
110                 "semi-colon separated list of regular miscellaneous files to be uploaded "
111                 "to ${SHARE} directory. "
112                 "Files are globally available for all experiments unless "
113                 "cleanHome is set to True (This will delete all files). ",
114                 flags = Flags.Design)
115         libs = Attribute("libs", 
116                 "semi-colon separated list of libraries (e.g. .so files) to be uploaded "
117                 "to ${LIB} directory. "
118                 "Libraries are globally available for all experiments unless "
119                 "cleanHome is set to True (This will delete all files). ",
120                 flags = Flags.Design)
121         bins = Attribute("bins", 
122                 "semi-colon separated list of binary files to be uploaded "
123                 "to ${BIN} directory. "
124                 "Binaries are globally available for all experiments unless "
125                 "cleanHome is set to True (This will delete all files). ",
126                 flags = Flags.Design)
127         code = Attribute("code", 
128                 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
129                 flags = Flags.Design)
130         build = Attribute("build", 
131                 "Build commands to execute after deploying the sources. "
132                 "Sources are uploaded to the ${SRC} directory and code "
133                 "is uploaded to the ${APP_HOME} directory. \n"
134                 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
135                 "./configure && make && make clean.\n"
136                 "Make sure to make the build commands return with a nonzero exit "
137                 "code on error.",
138                 flags = Flags.Design)
139         install = Attribute("install", 
140                 "Commands to transfer built files to their final destinations. "
141                 "Install commands are executed after build commands. ",
142                 flags = Flags.Design)
143         stdin = Attribute("stdin", "Standard input for the 'command'", 
144                 flags = Flags.Design)
145         tear_down = Attribute("tearDown", "Command to be executed just before " 
146                 "releasing the resource", 
147                 flags = Flags.Design)
148
149         cls._register_attribute(command)
150         cls._register_attribute(forward_x11)
151         cls._register_attribute(env)
152         cls._register_attribute(sudo)
153         cls._register_attribute(depends)
154         cls._register_attribute(sources)
155         cls._register_attribute(code)
156         cls._register_attribute(files)
157         cls._register_attribute(bins)
158         cls._register_attribute(libs)
159         cls._register_attribute(build)
160         cls._register_attribute(install)
161         cls._register_attribute(stdin)
162         cls._register_attribute(tear_down)
163
164     @classmethod
165     def _register_traces(cls):
166         stdout = Trace("stdout", "Standard output stream", enabled = True)
167         stderr = Trace("stderr", "Standard error stream", enabled = True)
168
169         cls._register_trace(stdout)
170         cls._register_trace(stderr)
171
172     def __init__(self, ec, guid):
173         super(LinuxApplication, self).__init__(ec, guid)
174         self._pid = None
175         self._ppid = None
176         self._node = None
177         self._home = "app-%s" % self.guid
178
179         # whether the command should run in foreground attached
180         # to a terminal
181         self._in_foreground = False
182
183         # whether to use sudo to kill the application process
184         self._sudo_kill = False
185
186         # keep a reference to the running process handler when 
187         # the command is not executed as remote daemon in background
188         self._proc = None
189
190         # timestamp of last state check of the application
191         self._last_state_check = tnow()
192         
193     def log_message(self, msg):
194         return " guid %d - host %s - %s " % (self.guid, 
195                 self.node.get("hostname"), msg)
196
197     @property
198     def node(self):
199         if not self._node:
200             node = self.get_connected(LinuxNode.get_rtype())
201             if not node: 
202                 msg = "Application %s guid %d NOT connected to Node" % (
203                         self._rtype, self.guid)
204                 raise RuntimeError, msg
205
206             self._node = node[0]
207
208         return self._node
209
210     @property
211     def app_home(self):
212         return os.path.join(self.node.exp_home, self._home)
213
214     @property
215     def run_home(self):
216         return os.path.join(self.app_home, self.ec.run_id)
217
218     @property
219     def pid(self):
220         return self._pid
221
222     @property
223     def ppid(self):
224         return self._ppid
225
226     @property
227     def in_foreground(self):
228         """ Returns True if the command needs to be executed in foreground.
229         This means that command will be executed using 'execute' instead of
230         'run' ('run' executes a command in background and detached from the 
231         terminal)
232         
233         When using X11 forwarding option, the command can not run in background
234         and detached from a terminal, since we need to keep the terminal attached 
235         to interact with it.
236         """
237         return self.get("forwardX11") or self._in_foreground
238
239     def trace_filepath(self, filename):
240         return os.path.join(self.run_home, filename)
241
242     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
243         self.info("Retrieving '%s' trace %s " % (name, attr))
244
245         path = self.trace_filepath(name)
246         
247         command = "(test -f %s && echo 'success') || echo 'error'" % path
248         (out, err), proc = self.node.execute(command)
249
250         if (err and proc.poll()) or out.find("error") != -1:
251             msg = " Couldn't find trace %s " % name
252             self.error(msg, out, err)
253             return None
254     
255         if attr == TraceAttr.PATH:
256             return path
257
258         if attr == TraceAttr.ALL:
259             (out, err), proc = self.node.check_output(self.run_home, name)
260             
261             if proc.poll():
262                 msg = " Couldn't read trace %s " % name
263                 self.error(msg, out, err)
264                 return None
265
266             return out
267
268         if attr == TraceAttr.STREAM:
269             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
270         elif attr == TraceAttr.SIZE:
271             cmd = "stat -c%%s %s " % path
272
273         (out, err), proc = self.node.execute(cmd)
274
275         if proc.poll():
276             msg = " Couldn't find trace %s " % name
277             self.error(msg, out, err)
278             return None
279         
280         if attr == TraceAttr.SIZE:
281             out = int(out.strip())
282
283         return out
284
285     def do_provision(self):
286         # take a snapshot of the system if user is root
287         # to ensure that cleanProcess will not kill
288         # pre-existent processes
289         if self.node.get("username") == 'root':
290             import pickle
291             procs = dict()
292             ps_aux = "ps aux |awk '{print $2,$11}'"
293             (out, err), proc = self.node.execute(ps_aux)
294             if len(out) != 0:
295                 for line in out.strip().split("\n"):
296                     parts = line.strip().split(" ")
297                     procs[parts[0]] = parts[1]
298                 pickle.dump(procs, open("/tmp/save.proc", "wb"))
299             
300         # create run dir for application
301         self.node.mkdir(self.run_home)
302    
303         # List of all the provision methods to invoke
304         steps = [
305             # upload sources
306             self.upload_sources,
307             # upload files
308             self.upload_files,
309             # upload binaries
310             self.upload_binaries,
311             # upload libraries
312             self.upload_libraries,
313             # upload code
314             self.upload_code,
315             # upload stdin
316             self.upload_stdin,
317             # install dependencies
318             self.install_dependencies,
319             # build
320             self.build,
321             # Install
322             self.install]
323
324         command = []
325
326         # Since provisioning takes a long time, before
327         # each step we check that the EC is still 
328         for step in steps:
329             if self.ec.abort:
330                 self.debug("Interrupting provisioning. EC says 'ABORT")
331                 return
332             
333             ret = step()
334             if ret:
335                 command.append(ret)
336
337         # upload deploy script
338         deploy_command = ";".join(command)
339         self.execute_deploy_command(deploy_command)
340
341         # upload start script
342         self.upload_start_command()
343        
344         self.info("Provisioning finished")
345
346         super(LinuxApplication, self).do_provision()
347
348     def upload_start_command(self, overwrite = False):
349         # Upload command to remote bash script
350         # - only if command can be executed in background and detached
351         command = self.get("command")
352
353         if command and not self.in_foreground:
354             self.info("Uploading command '%s'" % command)
355
356             # replace application specific paths in the command
357             command = self.replace_paths(command)
358             
359             # replace application specific paths in the environment
360             env = self.get("env")
361             env = env and self.replace_paths(env)
362
363             shfile = os.path.join(self.app_home, "start.sh")
364
365             self.node.upload_command(command, 
366                     shfile = shfile,
367                     env = env,
368                     overwrite = overwrite)
369
370     def execute_deploy_command(self, command, prefix="deploy"):
371         if command:
372             # replace application specific paths in the command
373             command = self.replace_paths(command)
374             
375             # replace application specific paths in the environment
376             env = self.get("env")
377             env = env and self.replace_paths(env)
378
379             # Upload the command to a bash script and run it
380             # in background ( but wait until the command has
381             # finished to continue )
382             shfile = os.path.join(self.app_home, "%s.sh" % prefix)
383             self.node.run_and_wait(command, self.run_home,
384                     shfile = shfile, 
385                     overwrite = False,
386                     pidfile = "%s_pidfile" % prefix, 
387                     ecodefile = "%s_exitcode" % prefix, 
388                     stdout = "%s_stdout" % prefix, 
389                     stderr = "%s_stderr" % prefix)
390
391     def upload_sources(self, sources = None, src_dir = None):
392         if not sources:
393             sources = self.get("sources")
394    
395         command = ""
396
397         if not src_dir:
398             src_dir = self.node.src_dir
399
400         if sources:
401             self.info("Uploading sources ")
402
403             sources = map(str.strip, sources.split(";"))
404
405             # Separate sources that should be downloaded from 
406             # the web, from sources that should be uploaded from
407             # the local machine
408             command = []
409             for source in list(sources):
410                 if source.startswith("http") or source.startswith("https"):
411                     # remove the hhtp source from the sources list
412                     sources.remove(source)
413
414                     command.append( " ( " 
415                             # Check if the source already exists
416                             " ls %(src_dir)s/%(basename)s "
417                             " || ( "
418                             # If source doesn't exist, download it and check
419                             # that it it downloaded ok
420                             "   wget -c --directory-prefix=%(src_dir)s %(source)s && "
421                             "   ls %(src_dir)s/%(basename)s "
422                             " ) ) " % {
423                                 "basename": os.path.basename(source),
424                                 "source": source,
425                                 "src_dir": src_dir
426                                 })
427
428             command = " && ".join(command)
429
430             # replace application specific paths in the command
431             command = self.replace_paths(command)
432        
433             if sources:
434                 sources = ';'.join(sources)
435                 self.node.upload(sources, src_dir, overwrite = False)
436
437         return command
438
439     def upload_files(self, files = None):
440         if not files:
441             files = self.get("files")
442
443         if files:
444             self.info("Uploading files %s " % files)
445             self.node.upload(files, self.node.share_dir, overwrite = False)
446
447     def upload_libraries(self, libs = None):
448         if not libs:
449             libs = self.get("libs")
450
451         if libs:
452             self.info("Uploading libraries %s " % libaries)
453             self.node.upload(libs, self.node.lib_dir, overwrite = False)
454
455     def upload_binaries(self, bins = None):
456         if not bins:
457             bins = self.get("bins")
458
459         if bins:
460             self.info("Uploading binaries %s " % binaries)
461             self.node.upload(bins, self.node.bin_dir, overwrite = False)
462
463     def upload_code(self, code = None):
464         if not code:
465             code = self.get("code")
466
467         if code:
468             self.info("Uploading code")
469
470             dst = os.path.join(self.app_home, "code")
471             self.node.upload(code, dst, overwrite = False, text = True)
472
473     def upload_stdin(self, stdin = None):
474         if not stdin:
475            stdin = self.get("stdin")
476
477         if stdin:
478             # create dir for sources
479             self.info("Uploading stdin")
480             
481             # upload stdin file to ${SHARE_DIR} directory
482             if os.path.isfile(stdin):
483                 basename = os.path.basename(stdin)
484                 dst = os.path.join(self.node.share_dir, basename)
485             else:
486                 dst = os.path.join(self.app_home, "stdin")
487
488             self.node.upload(stdin, dst, overwrite = False, text = True)
489
490             # create "stdin" symlink on ${APP_HOME} directory
491             command = "( cd %(app_home)s ; [ ! -f stdin ] &&  ln -s %(stdin)s stdin )" % ({
492                 "app_home": self.app_home, 
493                 "stdin": dst })
494
495             return command
496
497     def install_dependencies(self, depends = None):
498         if not depends:
499             depends = self.get("depends")
500
501         if depends:
502             self.info("Installing dependencies %s" % depends)
503             return self.node.install_packages_command(depends)
504
505     def build(self, build = None):
506         if not build:
507             build = self.get("build")
508
509         if build:
510             self.info("Building sources ")
511             
512             # replace application specific paths in the command
513             return self.replace_paths(build)
514
515     def install(self, install = None):
516         if not install:
517             install = self.get("install")
518
519         if install:
520             self.info("Installing sources ")
521
522             # replace application specific paths in the command
523             return self.replace_paths(install)
524
525     def do_deploy(self):
526         # Wait until node is associated and deployed
527         node = self.node
528         if not node or node.state < ResourceState.READY:
529             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
530             self.ec.schedule(self.reschedule_delay, self.deploy)
531         else:
532             command = self.get("command") or ""
533             self.info("Deploying command '%s' " % command)
534             self.do_discover()
535             self.do_provision()
536
537             super(LinuxApplication, self).do_deploy()
538    
539     def do_start(self):
540         command = self.get("command")
541
542         self.info("Starting command '%s'" % command)
543
544         if not command:
545             # If no command was given (i.e. Application was used for dependency
546             # installation), then the application is directly marked as STOPPED
547             super(LinuxApplication, self).set_stopped()
548         else:
549             if self.in_foreground:
550                 self._run_in_foreground()
551             else:
552                 self._run_in_background()
553
554             super(LinuxApplication, self).do_start()
555
556     def _run_in_foreground(self):
557         command = self.get("command")
558         sudo = self.get("sudo") or False
559         x11 = self.get("forwardX11")
560         env = self.get("env")
561
562         # Command will be launched in foreground and attached to the
563         # terminal using the node 'execute' in non blocking mode.
564
565         # We save the reference to the process in self._proc 
566         # to be able to kill the process from the stop method.
567         # We also set blocking = False, since we don't want the
568         # thread to block until the execution finishes.
569         (out, err), self._proc = self.execute_command(command, 
570                 env = env,
571                 sudo = sudo,
572                 forward_x11 = x11,
573                 blocking = False)
574
575         if self._proc.poll():
576             self.error(msg, out, err)
577             raise RuntimeError, msg
578
579     def _run_in_background(self):
580         command = self.get("command")
581         env = self.get("env")
582         sudo = self.get("sudo") or False
583
584         stdout = "stdout"
585         stderr = "stderr"
586         stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
587                 else None
588
589         # Command will be run as a daemon in baground and detached from any
590         # terminal.
591         # The command to run was previously uploaded to a bash script
592         # during deployment, now we launch the remote script using 'run'
593         # method from the node.
594         cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
595         (out, err), proc = self.node.run(cmd, self.run_home, 
596             stdin = stdin, 
597             stdout = stdout,
598             stderr = stderr,
599             sudo = sudo)
600
601         # check if execution errors occurred
602         msg = " Failed to start command '%s' " % command
603         
604         if proc.poll():
605             self.error(msg, out, err)
606             raise RuntimeError, msg
607     
608         # Wait for pid file to be generated
609         pid, ppid = self.node.wait_pid(self.run_home)
610         if pid: self._pid = int(pid)
611         if ppid: self._ppid = int(ppid)
612
613         # If the process is not running, check for error information
614         # on the remote machine
615         if not self.pid or not self.ppid:
616             (out, err), proc = self.node.check_errors(self.run_home,
617                     stderr = stderr) 
618
619             # Out is what was written in the stderr file
620             if err:
621                 msg = " Failed to start command '%s' " % command
622                 self.error(msg, out, err)
623                 raise RuntimeError, msg
624     
625     def do_stop(self):
626         """ Stops application execution
627         """
628         command = self.get('command') or ''
629
630         if self.state == ResourceState.STARTED:
631         
632             self.info("Stopping command '%s' " % command)
633         
634             # If the command is running in foreground (it was launched using
635             # the node 'execute' method), then we use the handler to the Popen
636             # process to kill it. Else we send a kill signal using the pid and ppid
637             # retrieved after running the command with the node 'run' method
638             if self._proc:
639                 self._proc.kill()
640             else:
641                 # Only try to kill the process if the pid and ppid
642                 # were retrieved
643                 if self.pid and self.ppid:
644                     (out, err), proc = self.node.kill(self.pid, self.ppid,
645                             sudo = self._sudo_kill)
646
647                     """
648                     # TODO: check if execution errors occurred
649                     if (proc and proc.poll()) or err:
650                         msg = " Failed to STOP command '%s' " % self.get("command")
651                         self.error(msg, out, err)
652                     """
653
654             super(LinuxApplication, self).do_stop()
655
656     def do_release(self):
657         self.info("Releasing resource")
658
659         self.do_stop()
660         
661         tear_down = self.get("tearDown")
662         if tear_down:
663             self.node.execute(tear_down)
664
665         hard_release = self.get("hardRelease")
666         if hard_release:
667             self.node.rmdir(self.app_home)
668
669         super(LinuxApplication, self).do_release()
670         
671     @property
672     def state(self):
673         """ Returns the state of the application
674         """
675         if self._state == ResourceState.STARTED:
676             if self.in_foreground:
677                 # Check if the process we used to execute the command
678                 # is still running ...
679                 retcode = self._proc.poll()
680
681                 # retcode == None -> running
682                 # retcode > 0 -> error
683                 # retcode == 0 -> finished
684                 if retcode:
685                     out = ""
686                     msg = " Failed to execute command '%s'" % self.get("command")
687                     err = self._proc.stderr.read()
688                     self.error(msg, out, err)
689                     self.do_fail()
690
691                 elif retcode == 0:
692                     self.set_stopped()
693             else:
694                 # We need to query the status of the command we launched in 
695                 # background. In order to avoid overwhelming the remote host and
696                 # the local processor with too many ssh queries, the state is only
697                 # requested every 'state_check_delay' seconds.
698                 state_check_delay = 0.5
699                 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
700                     if self.pid and self.ppid:
701                         # Make sure the process is still running in background
702                         status = self.node.status(self.pid, self.ppid)
703
704                         if status == ProcStatus.FINISHED:
705                             # If the program finished, check if execution
706                             # errors occurred
707                             (out, err), proc = self.node.check_errors(
708                                     self.run_home)
709
710                             if err:
711                                 msg = "Failed to execute command '%s'" % \
712                                         self.get("command")
713                                 self.error(msg, out, err)
714                                 self.do_fail()
715                             else:
716                                 self.set_stopped()
717
718                     self._last_state_check = tnow()
719
720         return self._state
721
722     def execute_command(self, command, 
723             env = None,
724             sudo = False,
725             tty = False,
726             forward_x11 = False,
727             blocking = False):
728
729         environ = ""
730         if env:
731             environ = self.node.format_environment(env, inline = True)
732         command = environ + command
733         command = self.replace_paths(command)
734
735         return self.node.execute(command,
736                 sudo = sudo,
737                 tty = tty,
738                 forward_x11 = forward_x11,
739                 blocking = blocking)
740
741     def replace_paths(self, command):
742         """
743         Replace all special path tags with shell-escaped actual paths.
744         """
745         return ( command
746             .replace("${USR}", self.node.usr_dir)
747             .replace("${LIB}", self.node.lib_dir)
748             .replace("${BIN}", self.node.bin_dir)
749             .replace("${SRC}", self.node.src_dir)
750             .replace("${SHARE}", self.node.share_dir)
751             .replace("${EXP}", self.node.exp_dir)
752             .replace("${EXP_HOME}", self.node.exp_home)
753             .replace("${APP_HOME}", self.app_home)
754             .replace("${RUN_HOME}", self.run_home)
755             .replace("${NODE_HOME}", self.node.node_home)
756             .replace("${HOME}", self.node.home_dir)
757             )
758
759     def valid_connection(self, guid):
760         # TODO: Validate!
761         return True
762