Adding process snapshot before running command
[nepi.git] / src / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23         ResourceState, reschedule_delay
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sshfuncs import ProcStatus
26 from nepi.util.timefuncs import tnow, tdiffsec
27
28 import os
29 import subprocess
30
31 # TODO: Resolve wildcards in commands!!
32 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
33
34 @clsinit_copy
35 class LinuxApplication(ResourceManager):
36     """
37     .. class:: Class Args :
38       
39         :param ec: The Experiment controller
40         :type ec: ExperimentController
41         :param guid: guid of the RM
42         :type guid: int
43
44     .. note::
45
46         A LinuxApplication RM represents a process that can be executed in
47         a remote Linux host using SSH.
48
49         The LinuxApplication RM takes care of uploadin sources and any files
50         needed to run the experiment, to the remote host. 
51         It also allows to provide source compilation (build) and installation 
52         instructions, and takes care of automating the sources build and 
53         installation tasks for the user.
54
55         It is important to note that files uploaded to the remote host have
56         two possible scopes: single-experiment or multi-experiment.
57         Single experiment files are those that will not be re-used by other 
58         experiments. Multi-experiment files are those that will.
59         Sources and shared files are always made available to all experiments.
60
61         Directory structure:
62
63         The directory structure used by LinuxApplication RM at the Linux
64         host is the following:
65
66         ${HOME}/nepi-usr --> Base directory for multi-experiment files
67                       |
68         ${LIB}        |- /lib --> Base directory for libraries
69         ${BIN}        |- /bin --> Base directory for binary files
70         ${SRC}        |- /src --> Base directory for sources
71         ${SHARE}      |- /share --> Base directory for other files
72
73         ${HOME}/nepi-exp --> Base directory for single-experiment files
74                       |
75         ${EXP_HOME}   |- /<exp-id>  --> Base directory for experiment exp-id
76                           |
77         ${APP_HOME}       |- /<app-guid> --> Base directory for application 
78                                |     specific files (e.g. command.sh, input)
79                                | 
80         ${RUN_HOME}            |- /<run-id> --> Base directory for run specific
81
82     """
83
84     _rtype = "LinuxApplication"
85     _help = "Runs an application on a Linux host with a BASH command "
86     _backend_type = "linux"
87
88     @classmethod
89     def _register_attributes(cls):
90         command = Attribute("command", "Command to execute at application start. "
91                 "Note that commands will be executed in the ${RUN_HOME} directory, "
92                 "make sure to take this into account when using relative paths. ", 
93                 flags = Flags.ExecReadOnly)
94         forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", 
95                 flags = Flags.ExecReadOnly)
96         env = Attribute("env", "Environment variables string for command execution",
97                 flags = Flags.ExecReadOnly)
98         sudo = Attribute("sudo", "Run with root privileges", 
99                 flags = Flags.ExecReadOnly)
100         depends = Attribute("depends", 
101                 "Space-separated list of packages required to run the application",
102                 flags = Flags.ExecReadOnly)
103         sources = Attribute("sources", 
104                 "Space-separated list of regular files to be uploaded to ${SRC} "
105                 "directory prior to building. Archives won't be expanded automatically. "
106                 "Sources are globally available for all experiments unless "
107                 "cleanHome is set to True (This will delete all sources). ",
108                 flags = Flags.ExecReadOnly)
109         files = Attribute("files", 
110                 "Space-separated list of regular miscellaneous files to be uploaded "
111                 "to ${SHARE} directory. "
112                 "Files are globally available for all experiments unless "
113                 "cleanHome is set to True (This will delete all files). ",
114                 flags = Flags.ExecReadOnly)
115         libs = Attribute("libs", 
116                 "Space-separated list of libraries (e.g. .so files) to be uploaded "
117                 "to ${LIB} directory. "
118                 "Libraries are globally available for all experiments unless "
119                 "cleanHome is set to True (This will delete all files). ",
120                 flags = Flags.ExecReadOnly)
121         bins = Attribute("bins", 
122                 "Space-separated list of binary files to be uploaded "
123                 "to ${BIN} directory. "
124                 "Binaries are globally available for all experiments unless "
125                 "cleanHome is set to True (This will delete all files). ",
126                 flags = Flags.ExecReadOnly)
127         code = Attribute("code", 
128                 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
129                 flags = Flags.ExecReadOnly)
130         build = Attribute("build", 
131                 "Build commands to execute after deploying the sources. "
132                 "Sources are uploaded to the ${SRC} directory and code "
133                 "is uploaded to the ${APP_HOME} directory. \n"
134                 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
135                 "./configure && make && make clean.\n"
136                 "Make sure to make the build commands return with a nonzero exit "
137                 "code on error.",
138                 flags = Flags.ReadOnly)
139         install = Attribute("install", 
140                 "Commands to transfer built files to their final destinations. "
141                 "Install commands are executed after build commands. ",
142                 flags = Flags.ReadOnly)
143         stdin = Attribute("stdin", "Standard input for the 'command'", 
144                 flags = Flags.ExecReadOnly)
145         tear_down = Attribute("tearDown", "Command to be executed just before " 
146                 "releasing the resource", 
147                 flags = Flags.ReadOnly)
148
149         cls._register_attribute(command)
150         cls._register_attribute(forward_x11)
151         cls._register_attribute(env)
152         cls._register_attribute(sudo)
153         cls._register_attribute(depends)
154         cls._register_attribute(sources)
155         cls._register_attribute(code)
156         cls._register_attribute(files)
157         cls._register_attribute(bins)
158         cls._register_attribute(libs)
159         cls._register_attribute(build)
160         cls._register_attribute(install)
161         cls._register_attribute(stdin)
162         cls._register_attribute(tear_down)
163
164     @classmethod
165     def _register_traces(cls):
166         stdout = Trace("stdout", "Standard output stream", enabled = True)
167         stderr = Trace("stderr", "Standard error stream", enabled = True)
168
169         cls._register_trace(stdout)
170         cls._register_trace(stderr)
171
172     def __init__(self, ec, guid):
173         super(LinuxApplication, self).__init__(ec, guid)
174         self._pid = None
175         self._ppid = None
176         self._home = "app-%s" % self.guid
177         # whether the command should run in foreground attached
178         # to a terminal
179         self._in_foreground = False
180
181         # whether to use sudo to kill the application process
182         self._sudo_kill = False
183
184         # keep a reference to the running process handler when 
185         # the command is not executed as remote daemon in background
186         self._proc = None
187
188         # timestamp of last state check of the application
189         self._last_state_check = tnow()
190         
191     def log_message(self, msg):
192         return " guid %d - host %s - %s " % (self.guid, 
193                 self.node.get("hostname"), msg)
194
195     @property
196     def node(self):
197         node = self.get_connected(LinuxNode.get_rtype())
198         if node: return node[0]
199         return None
200
201     @property
202     def app_home(self):
203         return os.path.join(self.node.exp_home, self._home)
204
205     @property
206     def run_home(self):
207         return os.path.join(self.app_home, self.ec.run_id)
208
209     @property
210     def pid(self):
211         return self._pid
212
213     @property
214     def ppid(self):
215         return self._ppid
216
217     @property
218     def in_foreground(self):
219         """ Returns True if the command needs to be executed in foreground.
220         This means that command will be executed using 'execute' instead of
221         'run' ('run' executes a command in background and detached from the 
222         terminal)
223         
224         When using X11 forwarding option, the command can not run in background
225         and detached from a terminal, since we need to keep the terminal attached 
226         to interact with it.
227         """
228         return self.get("forwardX11") or self._in_foreground
229
230     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
231         self.info("Retrieving '%s' trace %s " % (name, attr))
232
233         path = os.path.join(self.run_home, name)
234         
235         command = "(test -f %s && echo 'success') || echo 'error'" % path
236         (out, err), proc = self.node.execute(command)
237
238         if (err and proc.poll()) or out.find("error") != -1:
239             msg = " Couldn't find trace %s " % name
240             self.error(msg, out, err)
241             return None
242     
243         if attr == TraceAttr.PATH:
244             return path
245
246         if attr == TraceAttr.ALL:
247             (out, err), proc = self.node.check_output(self.run_home, name)
248             
249             if proc.poll():
250                 msg = " Couldn't read trace %s " % name
251                 self.error(msg, out, err)
252                 return None
253
254             return out
255
256         if attr == TraceAttr.STREAM:
257             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
258         elif attr == TraceAttr.SIZE:
259             cmd = "stat -c%%s %s " % path
260
261         (out, err), proc = self.node.execute(cmd)
262
263         if proc.poll():
264             msg = " Couldn't find trace %s " % name
265             self.error(msg, out, err)
266             return None
267         
268         if attr == TraceAttr.SIZE:
269             out = int(out.strip())
270
271         return out
272
273     def do_provision(self):
274         # take a snapshot of the system if user is root
275         # to assure cleanProcess kill every nepi process
276         if self.node.get("username") == 'root':
277             ps_aux = "ps aux |awk '{print $2}' |sort -u"
278             (out, err), proc = self.node.execute(ps_aux)
279             self.node._pids = out.split()
280         
281         # create run dir for application
282         self.node.mkdir(self.run_home)
283    
284         # List of all the provision methods to invoke
285         steps = [
286             # upload sources
287             self.upload_sources,
288             # upload files
289             self.upload_files,
290             # upload binaries
291             self.upload_binaries,
292             # upload libraries
293             self.upload_libraries,
294             # upload code
295             self.upload_code,
296             # upload stdin
297             self.upload_stdin,
298             # install dependencies
299             self.install_dependencies,
300             # build
301             self.build,
302             # Install
303             self.install]
304
305         command = []
306
307         # Since provisioning takes a long time, before
308         # each step we check that the EC is still 
309         for step in steps:
310             if self.ec.abort:
311                 self.debug("Interrupting provisioning. EC says 'ABORT")
312                 return
313             
314             ret = step()
315             if ret:
316                 command.append(ret)
317
318         # upload deploy script
319         deploy_command = ";".join(command)
320         self.execute_deploy_command(deploy_command)
321
322         # upload start script
323         self.upload_start_command()
324        
325         self.info("Provisioning finished")
326
327         super(LinuxApplication, self).do_provision()
328
329     def upload_start_command(self, overwrite = False):
330         # Upload command to remote bash script
331         # - only if command can be executed in background and detached
332         command = self.get("command")
333
334         if command and not self.in_foreground:
335             self.info("Uploading command '%s'" % command)
336
337             # replace application specific paths in the command
338             command = self.replace_paths(command)
339             
340             # replace application specific paths in the environment
341             env = self.get("env")
342             env = env and self.replace_paths(env)
343
344             shfile = os.path.join(self.app_home, "start.sh")
345
346             self.node.upload_command(command, 
347                     shfile = shfile,
348                     env = env,
349                     overwrite = overwrite)
350
351     def execute_deploy_command(self, command):
352         if command:
353             # Upload the command to a bash script and run it
354             # in background ( but wait until the command has
355             # finished to continue )
356             shfile = os.path.join(self.app_home, "deploy.sh")
357             self.node.run_and_wait(command, self.run_home,
358                     shfile = shfile, 
359                     overwrite = False,
360                     pidfile = "deploy_pidfile", 
361                     ecodefile = "deploy_exitcode", 
362                     stdout = "deploy_stdout", 
363                     stderr = "deploy_stderr")
364
365     def upload_sources(self):
366         sources = self.get("sources")
367    
368         command = ""
369
370         if sources:
371             self.info("Uploading sources ")
372
373             sources = sources.split(' ')
374
375             # Separate sources that should be downloaded from 
376             # the web, from sources that should be uploaded from
377             # the local machine
378             command = []
379             for source in list(sources):
380                 if source.startswith("http") or source.startswith("https"):
381                     # remove the hhtp source from the sources list
382                     sources.remove(source)
383
384                     command.append( " ( " 
385                             # Check if the source already exists
386                             " ls ${SRC}/%(basename)s "
387                             " || ( "
388                             # If source doesn't exist, download it and check
389                             # that it it downloaded ok
390                             "   wget -c --directory-prefix=${SRC} %(source)s && "
391                             "   ls ${SRC}/%(basename)s "
392                             " ) ) " % {
393                                 "basename": os.path.basename(source),
394                                 "source": source
395                                 })
396
397             command = " && ".join(command)
398
399             # replace application specific paths in the command
400             command = self.replace_paths(command)
401        
402             if sources:
403                 sources = ' '.join(sources)
404                 self.node.upload(sources, self.node.src_dir, overwrite = False)
405
406         return command
407
408     def upload_files(self):
409         files = self.get("files")
410
411         if files:
412             self.info("Uploading files %s " % files)
413             self.node.upload(files, self.node.share_dir, overwrite = False)
414
415     def upload_libraries(self):
416         libs = self.get("libs")
417
418         if libs:
419             self.info("Uploading libraries %s " % libaries)
420             self.node.upload(libs, self.node.lib_dir, overwrite = False)
421
422     def upload_binaries(self):
423         bins = self.get("bins")
424
425         if bins:
426             self.info("Uploading binaries %s " % binaries)
427             self.node.upload(bins, self.node.bin_dir, overwrite = False)
428
429     def upload_code(self):
430         code = self.get("code")
431
432         if code:
433             self.info("Uploading code")
434
435             dst = os.path.join(self.app_home, "code")
436             self.node.upload(code, dst, overwrite = False, text = True)
437
438     def upload_stdin(self):
439         stdin = self.get("stdin")
440         if stdin:
441             # create dir for sources
442             self.info("Uploading stdin")
443             
444             # upload stdin file to ${SHARE_DIR} directory
445             basename = os.path.basename(stdin)
446             dst = os.path.join(self.node.share_dir, basename)
447             self.node.upload(stdin, dst, overwrite = False, text = True)
448
449             # create "stdin" symlink on ${APP_HOME} directory
450             command = "( cd %(app_home)s ; [ ! -f stdin ] &&  ln -s %(stdin)s stdin )" % ({
451                 "app_home": self.app_home, 
452                 "stdin": dst })
453
454             return command
455
456     def install_dependencies(self):
457         depends = self.get("depends")
458         if depends:
459             self.info("Installing dependencies %s" % depends)
460             return self.node.install_packages_command(depends)
461
462     def build(self):
463         build = self.get("build")
464
465         if build:
466             self.info("Building sources ")
467             
468             # replace application specific paths in the command
469             return self.replace_paths(build)
470
471     def install(self):
472         install = self.get("install")
473
474         if install:
475             self.info("Installing sources ")
476
477             # replace application specific paths in the command
478             return self.replace_paths(install)
479
480     def do_deploy(self):
481         # Wait until node is associated and deployed
482         node = self.node
483         if not node or node.state < ResourceState.READY:
484             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
485             self.ec.schedule(reschedule_delay, self.deploy)
486         else:
487             command = self.get("command") or ""
488             self.info("Deploying command '%s' " % command)
489             self.do_discover()
490             self.do_provision()
491
492             super(LinuxApplication, self).do_deploy()
493    
494     def do_start(self):
495         command = self.get("command")
496
497         self.info("Starting command '%s'" % command)
498
499         if not command:
500             # If no command was given (i.e. Application was used for dependency
501             # installation), then the application is directly marked as STOPPED
502             super(LinuxApplication, self).set_stopped()
503         else:
504             if self.in_foreground:
505                 self._run_in_foreground()
506             else:
507                 self._run_in_background()
508
509             super(LinuxApplication, self).do_start()
510
511     def _run_in_foreground(self):
512         command = self.get("command")
513         sudo = self.get("sudo") or False
514         x11 = self.get("forwardX11")
515         env = self.get("env")
516
517         # For a command being executed in foreground, if there is stdin,
518         # it is expected to be text string not a file or pipe
519         stdin = self.get("stdin") or None
520
521         # Command will be launched in foreground and attached to the
522         # terminal using the node 'execute' in non blocking mode.
523
524         # We save the reference to the process in self._proc 
525         # to be able to kill the process from the stop method.
526         # We also set blocking = False, since we don't want the
527         # thread to block until the execution finishes.
528         (out, err), self._proc = self.execute_command(command, 
529                 env = env,
530                 sudo = sudo,
531                 stdin = stdin,
532                 forward_x11 = x11,
533                 blocking = False)
534
535         if self._proc.poll():
536             self.error(msg, out, err)
537             raise RuntimeError, msg
538
539     def _run_in_background(self):
540         command = self.get("command")
541         env = self.get("env")
542         sudo = self.get("sudo") or False
543
544         stdout = "stdout"
545         stderr = "stderr"
546         stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
547                 else None
548
549         # Command will be run as a daemon in baground and detached from any
550         # terminal.
551         # The command to run was previously uploaded to a bash script
552         # during deployment, now we launch the remote script using 'run'
553         # method from the node.
554         cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
555         (out, err), proc = self.node.run(cmd, self.run_home, 
556             stdin = stdin, 
557             stdout = stdout,
558             stderr = stderr,
559             sudo = sudo)
560
561         # check if execution errors occurred
562         msg = " Failed to start command '%s' " % command
563         
564         if proc.poll():
565             self.error(msg, out, err)
566             raise RuntimeError, msg
567     
568         # Wait for pid file to be generated
569         pid, ppid = self.node.wait_pid(self.run_home)
570         if pid: self._pid = int(pid)
571         if ppid: self._ppid = int(ppid)
572
573         # If the process is not running, check for error information
574         # on the remote machine
575         if not self.pid or not self.ppid:
576             (out, err), proc = self.node.check_errors(self.run_home,
577                     stderr = stderr) 
578
579             # Out is what was written in the stderr file
580             if err:
581                 msg = " Failed to start command '%s' " % command
582                 self.error(msg, out, err)
583                 raise RuntimeError, msg
584     
585     def do_stop(self):
586         """ Stops application execution
587         """
588         command = self.get('command') or ''
589
590         if self.state == ResourceState.STARTED:
591         
592             self.info("Stopping command '%s' " % command)
593         
594             # If the command is running in foreground (it was launched using
595             # the node 'execute' method), then we use the handler to the Popen
596             # process to kill it. Else we send a kill signal using the pid and ppid
597             # retrieved after running the command with the node 'run' method
598             if self._proc:
599                 self._proc.kill()
600             else:
601                 # Only try to kill the process if the pid and ppid
602                 # were retrieved
603                 if self.pid and self.ppid:
604                     (out, err), proc = self.node.kill(self.pid, self.ppid,
605                             sudo = self._sudo_kill)
606
607                     # TODO: check if execution errors occurred
608                     if proc.poll() or err:
609                         msg = " Failed to STOP command '%s' " % self.get("command")
610                         self.error(msg, out, err)
611         
612             super(LinuxApplication, self).do_stop()
613
614     def do_release(self):
615         self.info("Releasing resource")
616
617         tear_down = self.get("tearDown")
618         if tear_down:
619             self.node.execute(tear_down)
620
621         self.do_stop()
622
623         super(LinuxApplication, self).do_release()
624         
625     @property
626     def state(self):
627         """ Returns the state of the application
628         """
629         if self._state == ResourceState.STARTED:
630             if self.in_foreground:
631                 # Check if the process we used to execute the command
632                 # is still running ...
633                 retcode = self._proc.poll()
634
635                 # retcode == None -> running
636                 # retcode > 0 -> error
637                 # retcode == 0 -> finished
638                 if retcode:
639                     out = ""
640                     msg = " Failed to execute command '%s'" % self.get("command")
641                     err = self._proc.stderr.read()
642                     self.error(msg, out, err)
643                     self.do_fail()
644
645                 elif retcode == 0:
646                     self.set_stopped()
647             else:
648                 # We need to query the status of the command we launched in 
649                 # background. In order to avoid overwhelming the remote host and
650                 # the local processor with too many ssh queries, the state is only
651                 # requested every 'state_check_delay' seconds.
652                 state_check_delay = 0.5
653                 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
654                     if self.pid and self.ppid:
655                         # Make sure the process is still running in background
656                         status = self.node.status(self.pid, self.ppid)
657
658                         if status == ProcStatus.FINISHED:
659                             # If the program finished, check if execution
660                             # errors occurred
661                             (out, err), proc = self.node.check_errors(
662                                     self.run_home)
663
664                             if err:
665                                 msg = "Failed to execute command '%s'" % \
666                                         self.get("command")
667                                 self.error(msg, out, err)
668                                 self.do_fail()
669                             else:
670                                 self.set_stopped()
671
672                     self._last_state_check = tnow()
673
674         return self._state
675
676     def execute_command(self, command, 
677             env = None,
678             sudo = False,
679             stdin = None,
680             forward_x11 = False,
681             blocking = False):
682
683         environ = ""
684         if env:
685             environ = self.node.format_environment(env, inline = True)
686         command = environ + command
687         command = self.replace_paths(command)
688
689         return self.node.execute(command,
690                 sudo = sudo,
691                 stdin = stdin,
692                 forward_x11 = forward_x11,
693                 blocking = blocking)
694
695     def replace_paths(self, command):
696         """
697         Replace all special path tags with shell-escaped actual paths.
698         """
699         return ( command
700             .replace("${USR}", self.node.usr_dir)
701             .replace("${LIB}", self.node.lib_dir)
702             .replace("${BIN}", self.node.bin_dir)
703             .replace("${SRC}", self.node.src_dir)
704             .replace("${SHARE}", self.node.share_dir)
705             .replace("${EXP}", self.node.exp_dir)
706             .replace("${EXP_HOME}", self.node.exp_home)
707             .replace("${APP_HOME}", self.app_home)
708             .replace("${RUN_HOME}", self.run_home)
709             .replace("${NODE_HOME}", self.node.node_home)
710             .replace("${HOME}", self.node.home_dir)
711             )
712
713     def valid_connection(self, guid):
714         # TODO: Validate!
715         return True
716