merged ex_shutdown into nepi-3-dev
[nepi.git] / src / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23         ResourceState, reschedule_delay, failtrap
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sshfuncs import ProcStatus
26 from nepi.util.timefuncs import tnow, tdiffsec
27
28 import os
29 import subprocess
30
31 # TODO: Resolve wildcards in commands!!
32 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
33
34 @clsinit_copy
35 class LinuxApplication(ResourceManager):
36     """
37     .. class:: Class Args :
38       
39         :param ec: The Experiment controller
40         :type ec: ExperimentController
41         :param guid: guid of the RM
42         :type guid: int
43
44     .. note::
45
46     A LinuxApplication RM represents a process that can be executed in
47     a remote Linux host using SSH.
48
49     The LinuxApplication RM takes care of uploadin sources and any files
50     needed to run the experiment, to the remote host. 
51     It also allows to provide source compilation (build) and installation 
52     instructions, and takes care of automating the sources build and 
53     installation tasks for the user.
54
55     It is important to note that files uploaded to the remote host have
56     two possible scopes: single-experiment or multi-experiment.
57     Single experiment files are those that will not be re-used by other 
58     experiments. Multi-experiment files are those that will.
59     Sources and shared files are always made available to all experiments.
60
61     Directory structure:
62
63     The directory structure used by LinuxApplication RM at the Linux
64     host is the following:
65
66         ${HOME}/nepi-usr --> Base directory for multi-experiment files
67                       |
68         ${LIB}        |- /lib --> Base directory for libraries
69         ${BIN}        |- /bin --> Base directory for binary files
70         ${SRC}        |- /src --> Base directory for sources
71         ${SHARE}      |- /share --> Base directory for other files
72
73         ${HOME}/nepi-exp --> Base directory for single-experiment files
74                       |
75         ${EXP_HOME}   |- /<exp-id>  --> Base directory for experiment exp-id
76                           |
77         ${APP_HOME}       |- /<app-guid> --> Base directory for application 
78                                |     specific files (e.g. command.sh, input)
79                                | 
80         ${RUN_HOME}            |- /<run-id> --> Base directory for run specific
81
82     """
83
84     _rtype = "LinuxApplication"
85     _help = "Runs an application on a Linux host with a BASH command "
86     _backend_type = "linux"
87
88     @classmethod
89     def _register_attributes(cls):
90         command = Attribute("command", "Command to execute at application start. "
91                 "Note that commands will be executed in the ${RUN_HOME} directory, "
92                 "make sure to take this into account when using relative paths. ", 
93                 flags = Flags.ExecReadOnly)
94         forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", 
95                 flags = Flags.ExecReadOnly)
96         env = Attribute("env", "Environment variables string for command execution",
97                 flags = Flags.ExecReadOnly)
98         sudo = Attribute("sudo", "Run with root privileges", 
99                 flags = Flags.ExecReadOnly)
100         depends = Attribute("depends", 
101                 "Space-separated list of packages required to run the application",
102                 flags = Flags.ExecReadOnly)
103         sources = Attribute("sources", 
104                 "Space-separated list of regular files to be uploaded to ${SRC} "
105                 "directory prior to building. Archives won't be expanded automatically. "
106                 "Sources are globally available for all experiments unless "
107                 "cleanHome is set to True (This will delete all sources). ",
108                 flags = Flags.ExecReadOnly)
109         files = Attribute("files", 
110                 "Space-separated list of regular miscellaneous files to be uploaded "
111                 "to ${SHARE} directory. "
112                 "Files are globally available for all experiments unless "
113                 "cleanHome is set to True (This will delete all files). ",
114                 flags = Flags.ExecReadOnly)
115         libs = Attribute("libs", 
116                 "Space-separated list of libraries (e.g. .so files) to be uploaded "
117                 "to ${LIB} directory. "
118                 "Libraries are globally available for all experiments unless "
119                 "cleanHome is set to True (This will delete all files). ",
120                 flags = Flags.ExecReadOnly)
121         bins = Attribute("bins", 
122                 "Space-separated list of binary files to be uploaded "
123                 "to ${BIN} directory. "
124                 "Binaries are globally available for all experiments unless "
125                 "cleanHome is set to True (This will delete all files). ",
126                 flags = Flags.ExecReadOnly)
127         code = Attribute("code", 
128                 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
129                 flags = Flags.ExecReadOnly)
130         build = Attribute("build", 
131                 "Build commands to execute after deploying the sources. "
132                 "Sources are uploaded to the ${SRC} directory and code "
133                 "is uploaded to the ${APP_HOME} directory. \n"
134                 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
135                 "./configure && make && make clean.\n"
136                 "Make sure to make the build commands return with a nonzero exit "
137                 "code on error.",
138                 flags = Flags.ReadOnly)
139         install = Attribute("install", 
140                 "Commands to transfer built files to their final destinations. "
141                 "Install commands are executed after build commands. ",
142                 flags = Flags.ReadOnly)
143         stdin = Attribute("stdin", "Standard input for the 'command'", 
144                 flags = Flags.ExecReadOnly)
145         tear_down = Attribute("tearDown", "Command to be executed just before " 
146                 "releasing the resource", 
147                 flags = Flags.ReadOnly)
148
149         cls._register_attribute(command)
150         cls._register_attribute(forward_x11)
151         cls._register_attribute(env)
152         cls._register_attribute(sudo)
153         cls._register_attribute(depends)
154         cls._register_attribute(sources)
155         cls._register_attribute(code)
156         cls._register_attribute(files)
157         cls._register_attribute(bins)
158         cls._register_attribute(libs)
159         cls._register_attribute(build)
160         cls._register_attribute(install)
161         cls._register_attribute(stdin)
162         cls._register_attribute(tear_down)
163
164     @classmethod
165     def _register_traces(cls):
166         stdout = Trace("stdout", "Standard output stream")
167         stderr = Trace("stderr", "Standard error stream")
168
169         cls._register_trace(stdout)
170         cls._register_trace(stderr)
171
172     def __init__(self, ec, guid):
173         super(LinuxApplication, self).__init__(ec, guid)
174         self._pid = None
175         self._ppid = None
176         self._home = "app-%s" % self.guid
177         # whether the command should run in foreground attached
178         # to a terminal
179         self._in_foreground = False
180
181         # whether to use sudo to kill the application process
182         self._sudo_kill = False
183
184         # keep a reference to the running process handler when 
185         # the command is not executed as remote daemon in background
186         self._proc = None
187
188         # timestamp of last state check of the application
189         self._last_state_check = tnow()
190
191     def log_message(self, msg):
192         return " guid %d - host %s - %s " % (self.guid, 
193                 self.node.get("hostname"), msg)
194
195     @property
196     def node(self):
197         node = self.get_connected(LinuxNode.rtype())
198         if node: return node[0]
199         return None
200
201     @property
202     def app_home(self):
203         return os.path.join(self.node.exp_home, self._home)
204
205     @property
206     def run_home(self):
207         return os.path.join(self.app_home, self.ec.run_id)
208
209     @property
210     def pid(self):
211         return self._pid
212
213     @property
214     def ppid(self):
215         return self._ppid
216
217     @property
218     def in_foreground(self):
219         """ Returns True if the command needs to be executed in foreground.
220         This means that command will be executed using 'execute' instead of
221         'run' ('run' executes a command in background and detached from the 
222         terminal)
223         
224         When using X11 forwarding option, the command can not run in background
225         and detached from a terminal, since we need to keep the terminal attached 
226         to interact with it.
227         """
228         return self.get("forwardX11") or self._in_foreground
229
230     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
231         self.info("Retrieving '%s' trace %s " % (name, attr))
232
233         path = os.path.join(self.run_home, name)
234         
235         command = "(test -f %s && echo 'success') || echo 'error'" % path
236         (out, err), proc = self.node.execute(command)
237
238         if (err and proc.poll()) or out.find("error") != -1:
239             msg = " Couldn't find trace %s " % name
240             self.error(msg, out, err)
241             return None
242     
243         if attr == TraceAttr.PATH:
244             return path
245
246         if attr == TraceAttr.ALL:
247             (out, err), proc = self.node.check_output(self.run_home, name)
248             
249             if proc.poll():
250                 msg = " Couldn't read trace %s " % name
251                 self.error(msg, out, err)
252                 return None
253
254             return out
255
256         if attr == TraceAttr.STREAM:
257             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
258         elif attr == TraceAttr.SIZE:
259             cmd = "stat -c%%s %s " % path
260
261         (out, err), proc = self.node.execute(cmd)
262
263         if proc.poll():
264             msg = " Couldn't find trace %s " % name
265             self.error(msg, out, err)
266             return None
267         
268         if attr == TraceAttr.SIZE:
269             out = int(out.strip())
270
271         return out
272
273     @failtrap
274     def provision(self):
275         # create run dir for application
276         self.node.mkdir(self.run_home)
277    
278         # List of all the provision methods to invoke
279         steps = [
280             # upload sources
281             self.upload_sources,
282             # upload files
283             self.upload_files,
284             # upload binaries
285             self.upload_binaries,
286             # upload libraries
287             self.upload_libraries,
288             # upload code
289             self.upload_code,
290             # upload stdin
291             self.upload_stdin,
292             # install dependencies
293             self.install_dependencies,
294             # build
295             self.build,
296             # Install
297             self.install]
298
299         command = []
300
301         # Since provisioning takes a long time, before
302         # each step we check that the EC is still 
303         for step in steps:
304             if self.ec.abort:
305                 self.debug("Interrupting provisioning. EC says 'ABORT")
306                 return
307             
308             ret = step()
309             if ret:
310                 command.append(ret)
311
312         # upload deploy script
313         deploy_command = ";".join(command)
314         self.execute_deploy_command(deploy_command)
315
316         # upload start script
317         self.upload_start_command()
318        
319         self.info("Provisioning finished")
320
321         super(LinuxApplication, self).provision()
322
323     def upload_start_command(self):
324         # Upload command to remote bash script
325         # - only if command can be executed in background and detached
326         command = self.get("command")
327
328         if command and not self.in_foreground:
329             self.info("Uploading command '%s'" % command)
330
331             # replace application specific paths in the command
332             command = self.replace_paths(command)
333             
334             # replace application specific paths in the environment
335             env = self.get("env")
336             env = env and self.replace_paths(env)
337
338             shfile = os.path.join(self.app_home, "start.sh")
339
340             self.node.upload_command(command, 
341                     shfile = shfile,
342                     env = env,
343                     overwrite = False)
344
345     def execute_deploy_command(self, command):
346         if command:
347             # Upload the command to a bash script and run it
348             # in background ( but wait until the command has
349             # finished to continue )
350             shfile = os.path.join(self.app_home, "deploy.sh")
351             self.node.run_and_wait(command, self.run_home,
352                     shfile = shfile, 
353                     overwrite = False,
354                     pidfile = "deploy_pidfile", 
355                     ecodefile = "deploy_exitcode", 
356                     stdout = "deploy_stdout", 
357                     stderr = "deploy_stderr")
358
359     def upload_sources(self):
360         sources = self.get("sources")
361    
362         command = ""
363
364         if sources:
365             self.info("Uploading sources ")
366
367             sources = sources.split(' ')
368
369             # Separate sources that should be downloaded from 
370             # the web, from sources that should be uploaded from
371             # the local machine
372             command = []
373             for source in list(sources):
374                 if source.startswith("http") or source.startswith("https"):
375                     # remove the hhtp source from the sources list
376                     sources.remove(source)
377
378                     command.append( " ( " 
379                             # Check if the source already exists
380                             " ls ${SRC}/%(basename)s "
381                             " || ( "
382                             # If source doesn't exist, download it and check
383                             # that it it downloaded ok
384                             "   wget -c --directory-prefix=${SRC} %(source)s && "
385                             "   ls ${SRC}/%(basename)s "
386                             " ) ) " % {
387                                 "basename": os.path.basename(source),
388                                 "source": source
389                                 })
390
391             command = " && ".join(command)
392
393             # replace application specific paths in the command
394             command = self.replace_paths(command)
395        
396             if sources:
397                 sources = ' '.join(sources)
398                 self.node.upload(sources, self.node.src_dir, overwrite = False)
399
400         return command
401
402     def upload_files(self):
403         files = self.get("files")
404
405         if files:
406             self.info("Uploading files %s " % files)
407             self.node.upload(files, self.node.share_dir, overwrite = False)
408
409     def upload_libraries(self):
410         libs = self.get("libs")
411
412         if libs:
413             self.info("Uploading libraries %s " % libaries)
414             self.node.upload(libs, self.node.lib_dir, overwrite = False)
415
416     def upload_binaries(self):
417         bins = self.get("bins")
418
419         if bins:
420             self.info("Uploading binaries %s " % binaries)
421             self.node.upload(bins, self.node.bin_dir, overwrite = False)
422
423     def upload_code(self):
424         code = self.get("code")
425
426         if code:
427             self.info("Uploading code")
428
429             dst = os.path.join(self.app_home, "code")
430             self.node.upload(code, dst, overwrite = False, text = True)
431
432     def upload_stdin(self):
433         stdin = self.get("stdin")
434         if stdin:
435             # create dir for sources
436             self.info("Uploading stdin")
437             
438             # upload stdin file to ${SHARE_DIR} directory
439             basename = os.path.basename(stdin)
440             dst = os.path.join(self.node.share_dir, basename)
441             self.node.upload(stdin, dst, overwrite = False, text = True)
442
443             # create "stdin" symlink on ${APP_HOME} directory
444             command = "( cd %(app_home)s ; [ ! -f stdin ] &&  ln -s %(stdin)s stdin )" % ({
445                 "app_home": self.app_home, 
446                 "stdin": dst })
447
448             return command
449
450     def install_dependencies(self):
451         depends = self.get("depends")
452         if depends:
453             self.info("Installing dependencies %s" % depends)
454             return self.node.install_packages_command(depends)
455
456     def build(self):
457         build = self.get("build")
458
459         if build:
460             self.info("Building sources ")
461             
462             # replace application specific paths in the command
463             return self.replace_paths(build)
464
465     def install(self):
466         install = self.get("install")
467
468         if install:
469             self.info("Installing sources ")
470
471             # replace application specific paths in the command
472             return self.replace_paths(install)
473
474     @failtrap
475     def deploy(self):
476         # Wait until node is associated and deployed
477         node = self.node
478         if not node or node.state < ResourceState.READY:
479             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
480             self.ec.schedule(reschedule_delay, self.deploy)
481         else:
482             command = self.get("command") or ""
483             self.info("Deploying command '%s' " % command)
484             self.discover()
485             self.provision()
486
487             super(LinuxApplication, self).deploy()
488    
489     @failtrap
490     def start(self):
491         command = self.get("command")
492
493         self.info("Starting command '%s'" % command)
494
495         if not command:
496             # If no command was given (i.e. Application was used for dependency
497             # installation), then the application is directly marked as FINISHED
498             self.set_finished()
499         else:
500             if self.in_foreground:
501                 self._run_in_foreground()
502             else:
503                 self._run_in_background()
504
505             super(LinuxApplication, self).start()
506
507     def _run_in_foreground(self):
508         command = self.get("command")
509         sudo = self.get("sudo") or False
510         x11 = self.get("forwardX11")
511         env = self.get("env")
512
513         # For a command being executed in foreground, if there is stdin,
514         # it is expected to be text string not a file or pipe
515         stdin = self.get("stdin") or None
516
517         # Command will be launched in foreground and attached to the
518         # terminal using the node 'execute' in non blocking mode.
519
520         # We save the reference to the process in self._proc 
521         # to be able to kill the process from the stop method.
522         # We also set blocking = False, since we don't want the
523         # thread to block until the execution finishes.
524         (out, err), self._proc = self.execute_command(command, 
525                 env = env,
526                 sudo = sudo,
527                 stdin = stdin,
528                 forward_x11 = x11,
529                 blocking = False)
530
531         if self._proc.poll():
532             self.error(msg, out, err)
533             raise RuntimeError, msg
534
535     def _run_in_background(self):
536         command = self.get("command")
537         env = self.get("env")
538         sudo = self.get("sudo") or False
539
540         stdout = "stdout"
541         stderr = "stderr"
542         stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
543                 else None
544
545         # Command will be run as a daemon in baground and detached from any
546         # terminal.
547         # The command to run was previously uploaded to a bash script
548         # during deployment, now we launch the remote script using 'run'
549         # method from the node.
550         cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
551         (out, err), proc = self.node.run(cmd, self.run_home, 
552             stdin = stdin, 
553             stdout = stdout,
554             stderr = stderr,
555             sudo = sudo)
556
557         # check if execution errors occurred
558         msg = " Failed to start command '%s' " % command
559         
560         if proc.poll():
561             self.error(msg, out, err)
562             raise RuntimeError, msg
563     
564         # Wait for pid file to be generated
565         pid, ppid = self.node.wait_pid(self.run_home)
566         if pid: self._pid = int(pid)
567         if ppid: self._ppid = int(ppid)
568
569         # If the process is not running, check for error information
570         # on the remote machine
571         if not self.pid or not self.ppid:
572             (out, err), proc = self.node.check_errors(self.run_home,
573                     stderr = stderr) 
574
575             # Out is what was written in the stderr file
576             if err:
577                 msg = " Failed to start command '%s' " % command
578                 self.error(msg, out, err)
579                 raise RuntimeError, msg
580     
581     @failtrap
582     def stop(self):
583         """ Stops application execution
584         """
585         command = self.get('command') or ''
586
587         if self.state == ResourceState.STARTED:
588         
589             self.info("Stopping command '%s' " % command)
590         
591             # If the command is running in foreground (it was launched using
592             # the node 'execute' method), then we use the handler to the Popen
593             # process to kill it. Else we send a kill signal using the pid and ppid
594             # retrieved after running the command with the node 'run' method
595             if self._proc:
596                 self._proc.kill()
597             else:
598                 # Only try to kill the process if the pid and ppid
599                 # were retrieved
600                 if self.pid and self.ppid:
601                     (out, err), proc = self.node.kill(self.pid, self.ppid,
602                             sudo = self._sudo_kill)
603
604                     # TODO: check if execution errors occurred
605                     if proc.poll() or err:
606                         msg = " Failed to STOP command '%s' " % self.get("command")
607                         self.error(msg, out, err)
608         
609             super(LinuxApplication, self).stop()
610
611     def release(self):
612         self.info("Releasing resource")
613
614         try:
615             tear_down = self.get("tearDown")
616             if tear_down:
617                 self.node.execute(tear_down)
618
619             self.stop()
620         except:
621             import traceback
622             err = traceback.format_exc()
623             self.error(err)
624
625         super(LinuxApplication, self).release()
626         
627     @property
628     def state(self):
629         """ Returns the state of the application
630         """
631         if self._state == ResourceState.STARTED:
632             if self.in_foreground:
633                 # Check if the process we used to execute the command
634                 # is still running ...
635                 retcode = self._proc.poll()
636
637                 # retcode == None -> running
638                 # retcode > 0 -> error
639                 # retcode == 0 -> finished
640                 if retcode:
641                     out = ""
642                     msg = " Failed to execute command '%s'" % self.get("command")
643                     err = self._proc.stderr.read()
644                     self.error(msg, out, err)
645                     self.fail()
646
647                 elif retcode == 0:
648                     self.finish()
649             else:
650                 # We need to query the status of the command we launched in 
651                 # background. In order to avoid overwhelming the remote host and
652                 # the local processor with too many ssh queries, the state is only
653                 # requested every 'state_check_delay' seconds.
654                 state_check_delay = 0.5
655                 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
656                     if self.pid and self.ppid:
657                         # Make sure the process is still running in background
658                         status = self.node.status(self.pid, self.ppid)
659
660                         if status == ProcStatus.FINISHED:
661                             # If the program finished, check if execution
662                             # errors occurred
663                             (out, err), proc = self.node.check_errors(
664                                     self.run_home)
665
666                             if err:
667                                 msg = "Failed to execute command '%s'" % \
668                                         self.get("command")
669                                 self.error(msg, out, err)
670                                 self.fail()
671                             else:
672                                 self.finish()
673
674                     self._last_state_check = tnow()
675
676         return self._state
677
678     def execute_command(self, command, 
679             env = None,
680             sudo = False,
681             stdin = None,
682             forward_x11 = False,
683             blocking = False):
684
685         environ = ""
686         if env:
687             environ = self.node.format_environment(env, inline = True)
688         command = environ + command
689         command = self.replace_paths(command)
690
691         return self.node.execute(command,
692                 sudo = sudo,
693                 stdin = stdin,
694                 forward_x11 = forward_x11,
695                 blocking = blocking)
696
697     def replace_paths(self, command):
698         """
699         Replace all special path tags with shell-escaped actual paths.
700         """
701         return ( command
702             .replace("${USR}", self.node.usr_dir)
703             .replace("${LIB}", self.node.lib_dir)
704             .replace("${BIN}", self.node.bin_dir)
705             .replace("${SRC}", self.node.src_dir)
706             .replace("${SHARE}", self.node.share_dir)
707             .replace("${EXP}", self.node.exp_dir)
708             .replace("${EXP_HOME}", self.node.exp_home)
709             .replace("${APP_HOME}", self.app_home)
710             .replace("${RUN_HOME}", self.run_home)
711             .replace("${NODE_HOME}", self.node.node_home)
712             .replace("${HOME}", self.node.home_dir)
713             )
714
715     def valid_connection(self, guid):
716         # TODO: Validate!
717         return True
718