45efd21d079c869ba63d75b5ca90f45f7e2ea8aa
[nepi.git] / src / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23         ResourceState, reschedule_delay
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sshfuncs import ProcStatus
26 from nepi.util.timefuncs import tnow, tdiffsec
27
28 import os
29 import subprocess
30
31 # TODO: Resolve wildcards in commands!!
32 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
33
34 @clsinit_copy
35 class LinuxApplication(ResourceManager):
36     """
37     .. class:: Class Args :
38       
39         :param ec: The Experiment controller
40         :type ec: ExperimentController
41         :param guid: guid of the RM
42         :type guid: int
43
44     .. note::
45
46         A LinuxApplication RM represents a process that can be executed in
47         a remote Linux host using SSH.
48
49         The LinuxApplication RM takes care of uploadin sources and any files
50         needed to run the experiment, to the remote host. 
51         It also allows to provide source compilation (build) and installation 
52         instructions, and takes care of automating the sources build and 
53         installation tasks for the user.
54
55         It is important to note that files uploaded to the remote host have
56         two possible scopes: single-experiment or multi-experiment.
57         Single experiment files are those that will not be re-used by other 
58         experiments. Multi-experiment files are those that will.
59         Sources and shared files are always made available to all experiments.
60
61         Directory structure:
62
63         The directory structure used by LinuxApplication RM at the Linux
64         host is the following:
65
66         ${HOME}/.nepi/nepi-usr --> Base directory for multi-experiment files
67                       |
68         ${LIB}        |- /lib --> Base directory for libraries
69         ${BIN}        |- /bin --> Base directory for binary files
70         ${SRC}        |- /src --> Base directory for sources
71         ${SHARE}      |- /share --> Base directory for other files
72
73         ${HOME}/.nepi/nepi-exp --> Base directory for single-experiment files
74                       |
75         ${EXP_HOME}   |- /<exp-id>  --> Base directory for experiment exp-id
76                           |
77         ${APP_HOME}       |- /<app-guid> --> Base directory for application 
78                                |     specific files (e.g. command.sh, input)
79                                | 
80         ${RUN_HOME}            |- /<run-id> --> Base directory for run specific
81
82     """
83
84     _rtype = "LinuxApplication"
85     _help = "Runs an application on a Linux host with a BASH command "
86     _backend_type = "linux"
87
88     @classmethod
89     def _register_attributes(cls):
90         command = Attribute("command", "Command to execute at application start. "
91                 "Note that commands will be executed in the ${RUN_HOME} directory, "
92                 "make sure to take this into account when using relative paths. ", 
93                 flags = Flags.Design)
94         forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", 
95                 flags = Flags.Design)
96         env = Attribute("env", "Environment variables string for command execution",
97                 flags = Flags.Design)
98         sudo = Attribute("sudo", "Run with root privileges", 
99                 flags = Flags.Design)
100         depends = Attribute("depends", 
101                 "Space-separated list of packages required to run the application",
102                 flags = Flags.Design)
103         sources = Attribute("sources", 
104                 "semi-colon separated list of regular files to be uploaded to ${SRC} "
105                 "directory prior to building. Archives won't be expanded automatically. "
106                 "Sources are globally available for all experiments unless "
107                 "cleanHome is set to True (This will delete all sources). ",
108                 flags = Flags.Design)
109         files = Attribute("files", 
110                 "semi-colon separated list of regular miscellaneous files to be uploaded "
111                 "to ${SHARE} directory. "
112                 "Files are globally available for all experiments unless "
113                 "cleanHome is set to True (This will delete all files). ",
114                 flags = Flags.Design)
115         libs = Attribute("libs", 
116                 "semi-colon separated list of libraries (e.g. .so files) to be uploaded "
117                 "to ${LIB} directory. "
118                 "Libraries are globally available for all experiments unless "
119                 "cleanHome is set to True (This will delete all files). ",
120                 flags = Flags.Design)
121         bins = Attribute("bins", 
122                 "semi-colon separated list of binary files to be uploaded "
123                 "to ${BIN} directory. "
124                 "Binaries are globally available for all experiments unless "
125                 "cleanHome is set to True (This will delete all files). ",
126                 flags = Flags.Design)
127         code = Attribute("code", 
128                 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
129                 flags = Flags.Design)
130         build = Attribute("build", 
131                 "Build commands to execute after deploying the sources. "
132                 "Sources are uploaded to the ${SRC} directory and code "
133                 "is uploaded to the ${APP_HOME} directory. \n"
134                 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
135                 "./configure && make && make clean.\n"
136                 "Make sure to make the build commands return with a nonzero exit "
137                 "code on error.",
138                 flags = Flags.Design)
139         install = Attribute("install", 
140                 "Commands to transfer built files to their final destinations. "
141                 "Install commands are executed after build commands. ",
142                 flags = Flags.Design)
143         stdin = Attribute("stdin", "Standard input for the 'command'", 
144                 flags = Flags.Design)
145         tear_down = Attribute("tearDown", "Command to be executed just before " 
146                 "releasing the resource", 
147                 flags = Flags.Design)
148
149         cls._register_attribute(command)
150         cls._register_attribute(forward_x11)
151         cls._register_attribute(env)
152         cls._register_attribute(sudo)
153         cls._register_attribute(depends)
154         cls._register_attribute(sources)
155         cls._register_attribute(code)
156         cls._register_attribute(files)
157         cls._register_attribute(bins)
158         cls._register_attribute(libs)
159         cls._register_attribute(build)
160         cls._register_attribute(install)
161         cls._register_attribute(stdin)
162         cls._register_attribute(tear_down)
163
164     @classmethod
165     def _register_traces(cls):
166         stdout = Trace("stdout", "Standard output stream", enabled = True)
167         stderr = Trace("stderr", "Standard error stream", enabled = True)
168
169         cls._register_trace(stdout)
170         cls._register_trace(stderr)
171
172     def __init__(self, ec, guid):
173         super(LinuxApplication, self).__init__(ec, guid)
174         self._pid = None
175         self._ppid = None
176         self._home = "app-%s" % self.guid
177         # whether the command should run in foreground attached
178         # to a terminal
179         self._in_foreground = False
180
181         # whether to use sudo to kill the application process
182         self._sudo_kill = False
183
184         # keep a reference to the running process handler when 
185         # the command is not executed as remote daemon in background
186         self._proc = None
187
188         # timestamp of last state check of the application
189         self._last_state_check = tnow()
190         
191     def log_message(self, msg):
192         return " guid %d - host %s - %s " % (self.guid, 
193                 self.node.get("hostname"), msg)
194
195     @property
196     def node(self):
197         node = self.get_connected(LinuxNode.get_rtype())
198         if node: return node[0]
199         return None
200
201     @property
202     def app_home(self):
203         return os.path.join(self.node.exp_home, self._home)
204
205     @property
206     def run_home(self):
207         return os.path.join(self.app_home, self.ec.run_id)
208
209     @property
210     def pid(self):
211         return self._pid
212
213     @property
214     def ppid(self):
215         return self._ppid
216
217     @property
218     def in_foreground(self):
219         """ Returns True if the command needs to be executed in foreground.
220         This means that command will be executed using 'execute' instead of
221         'run' ('run' executes a command in background and detached from the 
222         terminal)
223         
224         When using X11 forwarding option, the command can not run in background
225         and detached from a terminal, since we need to keep the terminal attached 
226         to interact with it.
227         """
228         return self.get("forwardX11") or self._in_foreground
229
230     def trace_filepath(self, filename):
231         return os.path.join(self.run_home, filename)
232
233     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
234         self.info("Retrieving '%s' trace %s " % (name, attr))
235
236         path = self.trace_filepath(name)
237         
238         command = "(test -f %s && echo 'success') || echo 'error'" % path
239         (out, err), proc = self.node.execute(command)
240
241         if (err and proc.poll()) or out.find("error") != -1:
242             msg = " Couldn't find trace %s " % name
243             self.error(msg, out, err)
244             return None
245     
246         if attr == TraceAttr.PATH:
247             return path
248
249         if attr == TraceAttr.ALL:
250             (out, err), proc = self.node.check_output(self.run_home, name)
251             
252             if proc.poll():
253                 msg = " Couldn't read trace %s " % name
254                 self.error(msg, out, err)
255                 return None
256
257             return out
258
259         if attr == TraceAttr.STREAM:
260             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
261         elif attr == TraceAttr.SIZE:
262             cmd = "stat -c%%s %s " % path
263
264         (out, err), proc = self.node.execute(cmd)
265
266         if proc.poll():
267             msg = " Couldn't find trace %s " % name
268             self.error(msg, out, err)
269             return None
270         
271         if attr == TraceAttr.SIZE:
272             out = int(out.strip())
273
274         return out
275
276     def do_provision(self):
277         # take a snapshot of the system if user is root
278         # to ensure that cleanProcess will not kill
279         # pre-existent processes
280         if self.node.get("username") == 'root':
281             import pickle
282             procs = dict()
283             ps_aux = "ps aux |awk '{print $2,$11}'"
284             (out, err), proc = self.node.execute(ps_aux)
285             for line in out.strip().split("\n"):
286                 parts = line.strip().split(" ")
287                 procs[parts[0]] = parts[1]
288             pickle.dump(procs, open("/tmp/save.proc", "wb"))
289             
290         # create run dir for application
291         self.node.mkdir(self.run_home)
292    
293         # List of all the provision methods to invoke
294         steps = [
295             # upload sources
296             self.upload_sources,
297             # upload files
298             self.upload_files,
299             # upload binaries
300             self.upload_binaries,
301             # upload libraries
302             self.upload_libraries,
303             # upload code
304             self.upload_code,
305             # upload stdin
306             self.upload_stdin,
307             # install dependencies
308             self.install_dependencies,
309             # build
310             self.build,
311             # Install
312             self.install]
313
314         command = []
315
316         # Since provisioning takes a long time, before
317         # each step we check that the EC is still 
318         for step in steps:
319             if self.ec.abort:
320                 self.debug("Interrupting provisioning. EC says 'ABORT")
321                 return
322             
323             ret = step()
324             if ret:
325                 command.append(ret)
326
327         # upload deploy script
328         deploy_command = ";".join(command)
329         self.execute_deploy_command(deploy_command)
330
331         # upload start script
332         self.upload_start_command()
333        
334         self.info("Provisioning finished")
335
336         super(LinuxApplication, self).do_provision()
337
338     def upload_start_command(self, overwrite = False):
339         # Upload command to remote bash script
340         # - only if command can be executed in background and detached
341         command = self.get("command")
342
343         if command and not self.in_foreground:
344             self.info("Uploading command '%s'" % command)
345
346             # replace application specific paths in the command
347             command = self.replace_paths(command)
348             
349             # replace application specific paths in the environment
350             env = self.get("env")
351             env = env and self.replace_paths(env)
352
353             shfile = os.path.join(self.app_home, "start.sh")
354
355             self.node.upload_command(command, 
356                     shfile = shfile,
357                     env = env,
358                     overwrite = overwrite)
359
360     def execute_deploy_command(self, command, prefix="deploy"):
361         if command:
362             # Upload the command to a bash script and run it
363             # in background ( but wait until the command has
364             # finished to continue )
365             shfile = os.path.join(self.app_home, "%s.sh" % prefix)
366             self.node.run_and_wait(command, self.run_home,
367                     shfile = shfile, 
368                     overwrite = False,
369                     pidfile = "%s_pidfile" % prefix, 
370                     ecodefile = "%s_exitcode" % prefix, 
371                     stdout = "%s_stdout" % prefix, 
372                     stderr = "%s_stderr" % prefix)
373
374     def upload_sources(self, sources = None, src_dir = None):
375         if not sources:
376             sources = self.get("sources")
377    
378         command = ""
379
380         if not src_dir:
381             src_dir = self.node.src_dir
382
383         if sources:
384             self.info("Uploading sources ")
385
386             sources = map(str.strip, sources.split(";"))
387
388             # Separate sources that should be downloaded from 
389             # the web, from sources that should be uploaded from
390             # the local machine
391             command = []
392             for source in list(sources):
393                 if source.startswith("http") or source.startswith("https"):
394                     # remove the hhtp source from the sources list
395                     sources.remove(source)
396
397                     command.append( " ( " 
398                             # Check if the source already exists
399                             " ls %(src_dir)s/%(basename)s "
400                             " || ( "
401                             # If source doesn't exist, download it and check
402                             # that it it downloaded ok
403                             "   wget -c --directory-prefix=%(src_dir)s %(source)s && "
404                             "   ls %(src_dir)s/%(basename)s "
405                             " ) ) " % {
406                                 "basename": os.path.basename(source),
407                                 "source": source,
408                                 "src_dir": src_dir
409                                 })
410
411             command = " && ".join(command)
412
413             # replace application specific paths in the command
414             command = self.replace_paths(command)
415        
416             if sources:
417                 sources = ';'.join(sources)
418                 self.node.upload(sources, src_dir, overwrite = False)
419
420         return command
421
422     def upload_files(self, files = None):
423         if not files:
424             files = self.get("files")
425
426         if files:
427             self.info("Uploading files %s " % files)
428             self.node.upload(files, self.node.share_dir, overwrite = False)
429
430     def upload_libraries(self, libs = None):
431         if not libs:
432             libs = self.get("libs")
433
434         if libs:
435             self.info("Uploading libraries %s " % libaries)
436             self.node.upload(libs, self.node.lib_dir, overwrite = False)
437
438     def upload_binaries(self, bins = None):
439         if not bins:
440             bins = self.get("bins")
441
442         if bins:
443             self.info("Uploading binaries %s " % binaries)
444             self.node.upload(bins, self.node.bin_dir, overwrite = False)
445
446     def upload_code(self, code = None):
447         if not code:
448             code = self.get("code")
449
450         if code:
451             self.info("Uploading code")
452
453             dst = os.path.join(self.app_home, "code")
454             self.node.upload(code, dst, overwrite = False, text = True)
455
456     def upload_stdin(self, stdin = None):
457         if not stdin:
458            stdin = self.get("stdin")
459
460         if stdin:
461             # create dir for sources
462             self.info("Uploading stdin")
463             
464             # upload stdin file to ${SHARE_DIR} directory
465             if os.path.isfile(stdin):
466                 basename = os.path.basename(stdin)
467                 dst = os.path.join(self.node.share_dir, basename)
468             else:
469                 dst = os.path.join(self.app_home, "stdin")
470
471             self.node.upload(stdin, dst, overwrite = False, text = True)
472
473             # create "stdin" symlink on ${APP_HOME} directory
474             command = "( cd %(app_home)s ; [ ! -f stdin ] &&  ln -s %(stdin)s stdin )" % ({
475                 "app_home": self.app_home, 
476                 "stdin": dst })
477
478             return command
479
480     def install_dependencies(self, depends = None):
481         if not depends:
482             depends = self.get("depends")
483
484         if depends:
485             self.info("Installing dependencies %s" % depends)
486             return self.node.install_packages_command(depends)
487
488     def build(self, build = None):
489         if not build:
490             build = self.get("build")
491
492         if build:
493             self.info("Building sources ")
494             
495             # replace application specific paths in the command
496             return self.replace_paths(build)
497
498     def install(self, install = None):
499         if not install:
500             install = self.get("install")
501
502         if install:
503             self.info("Installing sources ")
504
505             # replace application specific paths in the command
506             return self.replace_paths(install)
507
508     def do_deploy(self):
509         # Wait until node is associated and deployed
510         node = self.node
511         if not node or node.state < ResourceState.READY:
512             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
513             self.ec.schedule(reschedule_delay, self.deploy)
514         else:
515             command = self.get("command") or ""
516             self.info("Deploying command '%s' " % command)
517             self.do_discover()
518             self.do_provision()
519
520             super(LinuxApplication, self).do_deploy()
521    
522     def do_start(self):
523         command = self.get("command")
524
525         self.info("Starting command '%s'" % command)
526
527         if not command:
528             # If no command was given (i.e. Application was used for dependency
529             # installation), then the application is directly marked as STOPPED
530             super(LinuxApplication, self).set_stopped()
531         else:
532             if self.in_foreground:
533                 self._run_in_foreground()
534             else:
535                 self._run_in_background()
536
537             super(LinuxApplication, self).do_start()
538
539     def _run_in_foreground(self):
540         command = self.get("command")
541         sudo = self.get("sudo") or False
542         x11 = self.get("forwardX11")
543         env = self.get("env")
544
545         # Command will be launched in foreground and attached to the
546         # terminal using the node 'execute' in non blocking mode.
547
548         # We save the reference to the process in self._proc 
549         # to be able to kill the process from the stop method.
550         # We also set blocking = False, since we don't want the
551         # thread to block until the execution finishes.
552         (out, err), self._proc = self.execute_command(command, 
553                 env = env,
554                 sudo = sudo,
555                 forward_x11 = x11,
556                 blocking = False)
557
558         if self._proc.poll():
559             self.error(msg, out, err)
560             raise RuntimeError, msg
561
562     def _run_in_background(self):
563         command = self.get("command")
564         env = self.get("env")
565         sudo = self.get("sudo") or False
566
567         stdout = "stdout"
568         stderr = "stderr"
569         stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
570                 else None
571
572         # Command will be run as a daemon in baground and detached from any
573         # terminal.
574         # The command to run was previously uploaded to a bash script
575         # during deployment, now we launch the remote script using 'run'
576         # method from the node.
577         cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
578         (out, err), proc = self.node.run(cmd, self.run_home, 
579             stdin = stdin, 
580             stdout = stdout,
581             stderr = stderr,
582             sudo = sudo)
583
584         # check if execution errors occurred
585         msg = " Failed to start command '%s' " % command
586         
587         if proc.poll():
588             self.error(msg, out, err)
589             raise RuntimeError, msg
590     
591         # Wait for pid file to be generated
592         pid, ppid = self.node.wait_pid(self.run_home)
593         if pid: self._pid = int(pid)
594         if ppid: self._ppid = int(ppid)
595
596         # If the process is not running, check for error information
597         # on the remote machine
598         if not self.pid or not self.ppid:
599             (out, err), proc = self.node.check_errors(self.run_home,
600                     stderr = stderr) 
601
602             # Out is what was written in the stderr file
603             if err:
604                 msg = " Failed to start command '%s' " % command
605                 self.error(msg, out, err)
606                 raise RuntimeError, msg
607     
608     def do_stop(self):
609         """ Stops application execution
610         """
611         command = self.get('command') or ''
612
613         if self.state == ResourceState.STARTED:
614         
615             self.info("Stopping command '%s' " % command)
616         
617             # If the command is running in foreground (it was launched using
618             # the node 'execute' method), then we use the handler to the Popen
619             # process to kill it. Else we send a kill signal using the pid and ppid
620             # retrieved after running the command with the node 'run' method
621             if self._proc:
622                 self._proc.kill()
623             else:
624                 # Only try to kill the process if the pid and ppid
625                 # were retrieved
626                 if self.pid and self.ppid:
627                     (out, err), proc = self.node.kill(self.pid, self.ppid,
628                             sudo = self._sudo_kill)
629
630                     # TODO: check if execution errors occurred
631                     if (proc and proc.poll()) or err:
632                         msg = " Failed to STOP command '%s' " % self.get("command")
633                         self.error(msg, out, err)
634         
635             super(LinuxApplication, self).do_stop()
636
637     def do_release(self):
638         self.info("Releasing resource")
639
640         self.do_stop()
641         
642         tear_down = self.get("tearDown")
643         if tear_down:
644             self.node.execute(tear_down)
645
646         hard_release = self.get("hardRelease")
647         if hard_release:
648             self.node.rmdir(self.app_home)
649
650         super(LinuxApplication, self).do_release()
651         
652     @property
653     def state(self):
654         """ Returns the state of the application
655         """
656         if self._state == ResourceState.STARTED:
657             if self.in_foreground:
658                 # Check if the process we used to execute the command
659                 # is still running ...
660                 retcode = self._proc.poll()
661
662                 # retcode == None -> running
663                 # retcode > 0 -> error
664                 # retcode == 0 -> finished
665                 if retcode:
666                     out = ""
667                     msg = " Failed to execute command '%s'" % self.get("command")
668                     err = self._proc.stderr.read()
669                     self.error(msg, out, err)
670                     self.do_fail()
671
672                 elif retcode == 0:
673                     self.set_stopped()
674             else:
675                 # We need to query the status of the command we launched in 
676                 # background. In order to avoid overwhelming the remote host and
677                 # the local processor with too many ssh queries, the state is only
678                 # requested every 'state_check_delay' seconds.
679                 state_check_delay = 0.5
680                 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
681                     if self.pid and self.ppid:
682                         # Make sure the process is still running in background
683                         status = self.node.status(self.pid, self.ppid)
684
685                         if status == ProcStatus.FINISHED:
686                             # If the program finished, check if execution
687                             # errors occurred
688                             (out, err), proc = self.node.check_errors(
689                                     self.run_home)
690
691                             if err:
692                                 msg = "Failed to execute command '%s'" % \
693                                         self.get("command")
694                                 self.error(msg, out, err)
695                                 self.do_fail()
696                             else:
697                                 self.set_stopped()
698
699                     self._last_state_check = tnow()
700
701         return self._state
702
703     def execute_command(self, command, 
704             env = None,
705             sudo = False,
706             forward_x11 = False,
707             blocking = False):
708
709         environ = ""
710         if env:
711             environ = self.node.format_environment(env, inline = True)
712         command = environ + command
713         command = self.replace_paths(command)
714
715         return self.node.execute(command,
716                 sudo = sudo,
717                 forward_x11 = forward_x11,
718                 blocking = blocking)
719
720     def replace_paths(self, command):
721         """
722         Replace all special path tags with shell-escaped actual paths.
723         """
724         return ( command
725             .replace("${USR}", self.node.usr_dir)
726             .replace("${LIB}", self.node.lib_dir)
727             .replace("${BIN}", self.node.bin_dir)
728             .replace("${SRC}", self.node.src_dir)
729             .replace("${SHARE}", self.node.share_dir)
730             .replace("${EXP}", self.node.exp_dir)
731             .replace("${EXP_HOME}", self.node.exp_home)
732             .replace("${APP_HOME}", self.app_home)
733             .replace("${RUN_HOME}", self.run_home)
734             .replace("${NODE_HOME}", self.node.node_home)
735             .replace("${HOME}", self.node.home_dir)
736             )
737
738     def valid_connection(self, guid):
739         # TODO: Validate!
740         return True
741