5e06dab719ab0890ca1b553f6a0eb9f2767e0a9a
[nepi.git] / src / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23         ResourceState, reschedule_delay
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sshfuncs import ProcStatus
26 from nepi.util.timefuncs import tnow, tdiffsec
27
28 import os
29 import subprocess
30
31 # TODO: Resolve wildcards in commands!!
32 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
33
34 @clsinit_copy
35 class LinuxApplication(ResourceManager):
36     """
37     .. class:: Class Args :
38       
39         :param ec: The Experiment controller
40         :type ec: ExperimentController
41         :param guid: guid of the RM
42         :type guid: int
43
44     .. note::
45
46         A LinuxApplication RM represents a process that can be executed in
47         a remote Linux host using SSH.
48
49         The LinuxApplication RM takes care of uploadin sources and any files
50         needed to run the experiment, to the remote host. 
51         It also allows to provide source compilation (build) and installation 
52         instructions, and takes care of automating the sources build and 
53         installation tasks for the user.
54
55         It is important to note that files uploaded to the remote host have
56         two possible scopes: single-experiment or multi-experiment.
57         Single experiment files are those that will not be re-used by other 
58         experiments. Multi-experiment files are those that will.
59         Sources and shared files are always made available to all experiments.
60
61         Directory structure:
62
63         The directory structure used by LinuxApplication RM at the Linux
64         host is the following:
65
66         ${HOME}/.nepi/nepi-usr --> Base directory for multi-experiment files
67                       |
68         ${LIB}        |- /lib --> Base directory for libraries
69         ${BIN}        |- /bin --> Base directory for binary files
70         ${SRC}        |- /src --> Base directory for sources
71         ${SHARE}      |- /share --> Base directory for other files
72
73         ${HOME}/.nepi/nepi-exp --> Base directory for single-experiment files
74                       |
75         ${EXP_HOME}   |- /<exp-id>  --> Base directory for experiment exp-id
76                           |
77         ${APP_HOME}       |- /<app-guid> --> Base directory for application 
78                                |     specific files (e.g. command.sh, input)
79                                | 
80         ${RUN_HOME}            |- /<run-id> --> Base directory for run specific
81
82     """
83
84     _rtype = "LinuxApplication"
85     _help = "Runs an application on a Linux host with a BASH command "
86     _backend_type = "linux"
87
88     @classmethod
89     def _register_attributes(cls):
90         command = Attribute("command", "Command to execute at application start. "
91                 "Note that commands will be executed in the ${RUN_HOME} directory, "
92                 "make sure to take this into account when using relative paths. ", 
93                 flags = Flags.Design)
94         forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", 
95                 flags = Flags.Design)
96         env = Attribute("env", "Environment variables string for command execution",
97                 flags = Flags.Design)
98         sudo = Attribute("sudo", "Run with root privileges", 
99                 flags = Flags.Design)
100         depends = Attribute("depends", 
101                 "Space-separated list of packages required to run the application",
102                 flags = Flags.Design)
103         sources = Attribute("sources", 
104                 "semi-colon separated list of regular files to be uploaded to ${SRC} "
105                 "directory prior to building. Archives won't be expanded automatically. "
106                 "Sources are globally available for all experiments unless "
107                 "cleanHome is set to True (This will delete all sources). ",
108                 flags = Flags.Design)
109         files = Attribute("files", 
110                 "semi-colon separated list of regular miscellaneous files to be uploaded "
111                 "to ${SHARE} directory. "
112                 "Files are globally available for all experiments unless "
113                 "cleanHome is set to True (This will delete all files). ",
114                 flags = Flags.Design)
115         libs = Attribute("libs", 
116                 "semi-colon separated list of libraries (e.g. .so files) to be uploaded "
117                 "to ${LIB} directory. "
118                 "Libraries are globally available for all experiments unless "
119                 "cleanHome is set to True (This will delete all files). ",
120                 flags = Flags.Design)
121         bins = Attribute("bins", 
122                 "semi-colon separated list of binary files to be uploaded "
123                 "to ${BIN} directory. "
124                 "Binaries are globally available for all experiments unless "
125                 "cleanHome is set to True (This will delete all files). ",
126                 flags = Flags.Design)
127         code = Attribute("code", 
128                 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
129                 flags = Flags.Design)
130         build = Attribute("build", 
131                 "Build commands to execute after deploying the sources. "
132                 "Sources are uploaded to the ${SRC} directory and code "
133                 "is uploaded to the ${APP_HOME} directory. \n"
134                 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
135                 "./configure && make && make clean.\n"
136                 "Make sure to make the build commands return with a nonzero exit "
137                 "code on error.",
138                 flags = Flags.Design)
139         install = Attribute("install", 
140                 "Commands to transfer built files to their final destinations. "
141                 "Install commands are executed after build commands. ",
142                 flags = Flags.Design)
143         stdin = Attribute("stdin", "Standard input for the 'command'", 
144                 flags = Flags.Design)
145         tear_down = Attribute("tearDown", "Command to be executed just before " 
146                 "releasing the resource", 
147                 flags = Flags.Design)
148
149         cls._register_attribute(command)
150         cls._register_attribute(forward_x11)
151         cls._register_attribute(env)
152         cls._register_attribute(sudo)
153         cls._register_attribute(depends)
154         cls._register_attribute(sources)
155         cls._register_attribute(code)
156         cls._register_attribute(files)
157         cls._register_attribute(bins)
158         cls._register_attribute(libs)
159         cls._register_attribute(build)
160         cls._register_attribute(install)
161         cls._register_attribute(stdin)
162         cls._register_attribute(tear_down)
163
164     @classmethod
165     def _register_traces(cls):
166         stdout = Trace("stdout", "Standard output stream", enabled = True)
167         stderr = Trace("stderr", "Standard error stream", enabled = True)
168
169         cls._register_trace(stdout)
170         cls._register_trace(stderr)
171
172     def __init__(self, ec, guid):
173         super(LinuxApplication, self).__init__(ec, guid)
174         self._pid = None
175         self._ppid = None
176         self._home = "app-%s" % self.guid
177         # whether the command should run in foreground attached
178         # to a terminal
179         self._in_foreground = False
180
181         # whether to use sudo to kill the application process
182         self._sudo_kill = False
183
184         # keep a reference to the running process handler when 
185         # the command is not executed as remote daemon in background
186         self._proc = None
187
188         # timestamp of last state check of the application
189         self._last_state_check = tnow()
190         
191     def log_message(self, msg):
192         return " guid %d - host %s - %s " % (self.guid, 
193                 self.node.get("hostname"), msg)
194
195     @property
196     def node(self):
197         node = self.get_connected(LinuxNode.get_rtype())
198         if node: return node[0]
199         raise RuntimeError, "Application must be connected to Node"
200
201     @property
202     def app_home(self):
203         return os.path.join(self.node.exp_home, self._home)
204
205     @property
206     def run_home(self):
207         return os.path.join(self.app_home, self.ec.run_id)
208
209     @property
210     def pid(self):
211         return self._pid
212
213     @property
214     def ppid(self):
215         return self._ppid
216
217     @property
218     def in_foreground(self):
219         """ Returns True if the command needs to be executed in foreground.
220         This means that command will be executed using 'execute' instead of
221         'run' ('run' executes a command in background and detached from the 
222         terminal)
223         
224         When using X11 forwarding option, the command can not run in background
225         and detached from a terminal, since we need to keep the terminal attached 
226         to interact with it.
227         """
228         return self.get("forwardX11") or self._in_foreground
229
230     def trace_filepath(self, filename):
231         return os.path.join(self.run_home, filename)
232
233     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
234         self.info("Retrieving '%s' trace %s " % (name, attr))
235
236         path = self.trace_filepath(name)
237         
238         command = "(test -f %s && echo 'success') || echo 'error'" % path
239         (out, err), proc = self.node.execute(command)
240
241         if (err and proc.poll()) or out.find("error") != -1:
242             msg = " Couldn't find trace %s " % name
243             self.error(msg, out, err)
244             return None
245     
246         if attr == TraceAttr.PATH:
247             return path
248
249         if attr == TraceAttr.ALL:
250             (out, err), proc = self.node.check_output(self.run_home, name)
251             
252             if proc.poll():
253                 msg = " Couldn't read trace %s " % name
254                 self.error(msg, out, err)
255                 return None
256
257             return out
258
259         if attr == TraceAttr.STREAM:
260             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
261         elif attr == TraceAttr.SIZE:
262             cmd = "stat -c%%s %s " % path
263
264         (out, err), proc = self.node.execute(cmd)
265
266         if proc.poll():
267             msg = " Couldn't find trace %s " % name
268             self.error(msg, out, err)
269             return None
270         
271         if attr == TraceAttr.SIZE:
272             out = int(out.strip())
273
274         return out
275
276     def do_provision(self):
277         # take a snapshot of the system if user is root
278         # to ensure that cleanProcess will not kill
279         # pre-existent processes
280         if self.node.get("username") == 'root':
281             import pickle
282             procs = dict()
283             ps_aux = "ps aux |awk '{print $2,$11}'"
284             (out, err), proc = self.node.execute(ps_aux)
285             if len(out) != 0:
286                 for line in out.strip().split("\n"):
287                     parts = line.strip().split(" ")
288                     procs[parts[0]] = parts[1]
289                 pickle.dump(procs, open("/tmp/save.proc", "wb"))
290             
291         # create run dir for application
292         self.node.mkdir(self.run_home)
293    
294         # List of all the provision methods to invoke
295         steps = [
296             # upload sources
297             self.upload_sources,
298             # upload files
299             self.upload_files,
300             # upload binaries
301             self.upload_binaries,
302             # upload libraries
303             self.upload_libraries,
304             # upload code
305             self.upload_code,
306             # upload stdin
307             self.upload_stdin,
308             # install dependencies
309             self.install_dependencies,
310             # build
311             self.build,
312             # Install
313             self.install]
314
315         command = []
316
317         # Since provisioning takes a long time, before
318         # each step we check that the EC is still 
319         for step in steps:
320             if self.ec.abort:
321                 self.debug("Interrupting provisioning. EC says 'ABORT")
322                 return
323             
324             ret = step()
325             if ret:
326                 command.append(ret)
327
328         # upload deploy script
329         deploy_command = ";".join(command)
330         self.execute_deploy_command(deploy_command)
331
332         # upload start script
333         self.upload_start_command()
334        
335         self.info("Provisioning finished")
336
337         super(LinuxApplication, self).do_provision()
338
339     def upload_start_command(self, overwrite = False):
340         # Upload command to remote bash script
341         # - only if command can be executed in background and detached
342         command = self.get("command")
343
344         if command and not self.in_foreground:
345             self.info("Uploading command '%s'" % command)
346
347             # replace application specific paths in the command
348             command = self.replace_paths(command)
349             
350             # replace application specific paths in the environment
351             env = self.get("env")
352             env = env and self.replace_paths(env)
353
354             shfile = os.path.join(self.app_home, "start.sh")
355
356             self.node.upload_command(command, 
357                     shfile = shfile,
358                     env = env,
359                     overwrite = overwrite)
360
361     def execute_deploy_command(self, command, prefix="deploy"):
362         if command:
363             # replace application specific paths in the command
364             command = self.replace_paths(command)
365             
366             # replace application specific paths in the environment
367             env = self.get("env")
368             env = env and self.replace_paths(env)
369
370             # Upload the command to a bash script and run it
371             # in background ( but wait until the command has
372             # finished to continue )
373             shfile = os.path.join(self.app_home, "%s.sh" % prefix)
374             self.node.run_and_wait(command, self.run_home,
375                     shfile = shfile, 
376                     overwrite = False,
377                     pidfile = "%s_pidfile" % prefix, 
378                     ecodefile = "%s_exitcode" % prefix, 
379                     stdout = "%s_stdout" % prefix, 
380                     stderr = "%s_stderr" % prefix)
381
382     def upload_sources(self, sources = None, src_dir = None):
383         if not sources:
384             sources = self.get("sources")
385    
386         command = ""
387
388         if not src_dir:
389             src_dir = self.node.src_dir
390
391         if sources:
392             self.info("Uploading sources ")
393
394             sources = map(str.strip, sources.split(";"))
395
396             # Separate sources that should be downloaded from 
397             # the web, from sources that should be uploaded from
398             # the local machine
399             command = []
400             for source in list(sources):
401                 if source.startswith("http") or source.startswith("https"):
402                     # remove the hhtp source from the sources list
403                     sources.remove(source)
404
405                     command.append( " ( " 
406                             # Check if the source already exists
407                             " ls %(src_dir)s/%(basename)s "
408                             " || ( "
409                             # If source doesn't exist, download it and check
410                             # that it it downloaded ok
411                             "   wget -c --directory-prefix=%(src_dir)s %(source)s && "
412                             "   ls %(src_dir)s/%(basename)s "
413                             " ) ) " % {
414                                 "basename": os.path.basename(source),
415                                 "source": source,
416                                 "src_dir": src_dir
417                                 })
418
419             command = " && ".join(command)
420
421             # replace application specific paths in the command
422             command = self.replace_paths(command)
423        
424             if sources:
425                 sources = ';'.join(sources)
426                 self.node.upload(sources, src_dir, overwrite = False)
427
428         return command
429
430     def upload_files(self, files = None):
431         if not files:
432             files = self.get("files")
433
434         if files:
435             self.info("Uploading files %s " % files)
436             self.node.upload(files, self.node.share_dir, overwrite = False)
437
438     def upload_libraries(self, libs = None):
439         if not libs:
440             libs = self.get("libs")
441
442         if libs:
443             self.info("Uploading libraries %s " % libaries)
444             self.node.upload(libs, self.node.lib_dir, overwrite = False)
445
446     def upload_binaries(self, bins = None):
447         if not bins:
448             bins = self.get("bins")
449
450         if bins:
451             self.info("Uploading binaries %s " % binaries)
452             self.node.upload(bins, self.node.bin_dir, overwrite = False)
453
454     def upload_code(self, code = None):
455         if not code:
456             code = self.get("code")
457
458         if code:
459             self.info("Uploading code")
460
461             dst = os.path.join(self.app_home, "code")
462             self.node.upload(code, dst, overwrite = False, text = True)
463
464     def upload_stdin(self, stdin = None):
465         if not stdin:
466            stdin = self.get("stdin")
467
468         if stdin:
469             # create dir for sources
470             self.info("Uploading stdin")
471             
472             # upload stdin file to ${SHARE_DIR} directory
473             if os.path.isfile(stdin):
474                 basename = os.path.basename(stdin)
475                 dst = os.path.join(self.node.share_dir, basename)
476             else:
477                 dst = os.path.join(self.app_home, "stdin")
478
479             self.node.upload(stdin, dst, overwrite = False, text = True)
480
481             # create "stdin" symlink on ${APP_HOME} directory
482             command = "( cd %(app_home)s ; [ ! -f stdin ] &&  ln -s %(stdin)s stdin )" % ({
483                 "app_home": self.app_home, 
484                 "stdin": dst })
485
486             return command
487
488     def install_dependencies(self, depends = None):
489         if not depends:
490             depends = self.get("depends")
491
492         if depends:
493             self.info("Installing dependencies %s" % depends)
494             return self.node.install_packages_command(depends)
495
496     def build(self, build = None):
497         if not build:
498             build = self.get("build")
499
500         if build:
501             self.info("Building sources ")
502             
503             # replace application specific paths in the command
504             return self.replace_paths(build)
505
506     def install(self, install = None):
507         if not install:
508             install = self.get("install")
509
510         if install:
511             self.info("Installing sources ")
512
513             # replace application specific paths in the command
514             return self.replace_paths(install)
515
516     def do_deploy(self):
517         # Wait until node is associated and deployed
518         node = self.node
519         if not node or node.state < ResourceState.READY:
520             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
521             self.ec.schedule(reschedule_delay, self.deploy)
522         else:
523             command = self.get("command") or ""
524             self.info("Deploying command '%s' " % command)
525             self.do_discover()
526             self.do_provision()
527
528             super(LinuxApplication, self).do_deploy()
529    
530     def do_start(self):
531         command = self.get("command")
532
533         self.info("Starting command '%s'" % command)
534
535         if not command:
536             # If no command was given (i.e. Application was used for dependency
537             # installation), then the application is directly marked as STOPPED
538             super(LinuxApplication, self).set_stopped()
539         else:
540             if self.in_foreground:
541                 self._run_in_foreground()
542             else:
543                 self._run_in_background()
544
545             super(LinuxApplication, self).do_start()
546
547     def _run_in_foreground(self):
548         command = self.get("command")
549         sudo = self.get("sudo") or False
550         x11 = self.get("forwardX11")
551         env = self.get("env")
552
553         # Command will be launched in foreground and attached to the
554         # terminal using the node 'execute' in non blocking mode.
555
556         # We save the reference to the process in self._proc 
557         # to be able to kill the process from the stop method.
558         # We also set blocking = False, since we don't want the
559         # thread to block until the execution finishes.
560         (out, err), self._proc = self.execute_command(command, 
561                 env = env,
562                 sudo = sudo,
563                 forward_x11 = x11,
564                 blocking = False)
565
566         if self._proc.poll():
567             self.error(msg, out, err)
568             raise RuntimeError, msg
569
570     def _run_in_background(self):
571         command = self.get("command")
572         env = self.get("env")
573         sudo = self.get("sudo") or False
574
575         stdout = "stdout"
576         stderr = "stderr"
577         stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
578                 else None
579
580         # Command will be run as a daemon in baground and detached from any
581         # terminal.
582         # The command to run was previously uploaded to a bash script
583         # during deployment, now we launch the remote script using 'run'
584         # method from the node.
585         cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
586         (out, err), proc = self.node.run(cmd, self.run_home, 
587             stdin = stdin, 
588             stdout = stdout,
589             stderr = stderr,
590             sudo = sudo)
591
592         # check if execution errors occurred
593         msg = " Failed to start command '%s' " % command
594         
595         if proc.poll():
596             self.error(msg, out, err)
597             raise RuntimeError, msg
598     
599         # Wait for pid file to be generated
600         pid, ppid = self.node.wait_pid(self.run_home)
601         if pid: self._pid = int(pid)
602         if ppid: self._ppid = int(ppid)
603
604         # If the process is not running, check for error information
605         # on the remote machine
606         if not self.pid or not self.ppid:
607             (out, err), proc = self.node.check_errors(self.run_home,
608                     stderr = stderr) 
609
610             # Out is what was written in the stderr file
611             if err:
612                 msg = " Failed to start command '%s' " % command
613                 self.error(msg, out, err)
614                 raise RuntimeError, msg
615     
616     def do_stop(self):
617         """ Stops application execution
618         """
619         command = self.get('command') or ''
620
621         if self.state == ResourceState.STARTED:
622         
623             self.info("Stopping command '%s' " % command)
624         
625             # If the command is running in foreground (it was launched using
626             # the node 'execute' method), then we use the handler to the Popen
627             # process to kill it. Else we send a kill signal using the pid and ppid
628             # retrieved after running the command with the node 'run' method
629             if self._proc:
630                 self._proc.kill()
631             else:
632                 # Only try to kill the process if the pid and ppid
633                 # were retrieved
634                 if self.pid and self.ppid:
635                     (out, err), proc = self.node.kill(self.pid, self.ppid,
636                             sudo = self._sudo_kill)
637
638                     # TODO: check if execution errors occurred
639                     if (proc and proc.poll()) or err:
640                         msg = " Failed to STOP command '%s' " % self.get("command")
641                         self.error(msg, out, err)
642         
643             super(LinuxApplication, self).do_stop()
644
645     def do_release(self):
646         self.info("Releasing resource")
647
648         self.do_stop()
649         
650         tear_down = self.get("tearDown")
651         if tear_down:
652             self.node.execute(tear_down)
653
654         hard_release = self.get("hardRelease")
655         if hard_release:
656             self.node.rmdir(self.app_home)
657
658         super(LinuxApplication, self).do_release()
659         
660     @property
661     def state(self):
662         """ Returns the state of the application
663         """
664         if self._state == ResourceState.STARTED:
665             if self.in_foreground:
666                 # Check if the process we used to execute the command
667                 # is still running ...
668                 retcode = self._proc.poll()
669
670                 # retcode == None -> running
671                 # retcode > 0 -> error
672                 # retcode == 0 -> finished
673                 if retcode:
674                     out = ""
675                     msg = " Failed to execute command '%s'" % self.get("command")
676                     err = self._proc.stderr.read()
677                     self.error(msg, out, err)
678                     self.do_fail()
679
680                 elif retcode == 0:
681                     self.set_stopped()
682             else:
683                 # We need to query the status of the command we launched in 
684                 # background. In order to avoid overwhelming the remote host and
685                 # the local processor with too many ssh queries, the state is only
686                 # requested every 'state_check_delay' seconds.
687                 state_check_delay = 0.5
688                 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
689                     if self.pid and self.ppid:
690                         # Make sure the process is still running in background
691                         status = self.node.status(self.pid, self.ppid)
692
693                         if status == ProcStatus.FINISHED:
694                             # If the program finished, check if execution
695                             # errors occurred
696                             (out, err), proc = self.node.check_errors(
697                                     self.run_home)
698
699                             if err:
700                                 msg = "Failed to execute command '%s'" % \
701                                         self.get("command")
702                                 self.error(msg, out, err)
703                                 self.do_fail()
704                             else:
705                                 self.set_stopped()
706
707                     self._last_state_check = tnow()
708
709         return self._state
710
711     def execute_command(self, command, 
712             env = None,
713             sudo = False,
714             tty = False,
715             forward_x11 = False,
716             blocking = False):
717
718         environ = ""
719         if env:
720             environ = self.node.format_environment(env, inline = True)
721         command = environ + command
722         command = self.replace_paths(command)
723
724         return self.node.execute(command,
725                 sudo = sudo,
726                 tty = tty,
727                 forward_x11 = forward_x11,
728                 blocking = blocking)
729
730     def replace_paths(self, command):
731         """
732         Replace all special path tags with shell-escaped actual paths.
733         """
734         return ( command
735             .replace("${USR}", self.node.usr_dir)
736             .replace("${LIB}", self.node.lib_dir)
737             .replace("${BIN}", self.node.bin_dir)
738             .replace("${SRC}", self.node.src_dir)
739             .replace("${SHARE}", self.node.share_dir)
740             .replace("${EXP}", self.node.exp_dir)
741             .replace("${EXP_HOME}", self.node.exp_home)
742             .replace("${APP_HOME}", self.app_home)
743             .replace("${RUN_HOME}", self.run_home)
744             .replace("${NODE_HOME}", self.node.node_home)
745             .replace("${HOME}", self.node.home_dir)
746             )
747
748     def valid_connection(self, guid):
749         # TODO: Validate!
750         return True
751