e73c867fec4509452461849cb32bc62adcb2c97a
[nepi.git] / src / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.trace import Trace, TraceAttr
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sshfuncs import ProcStatus
25 from nepi.util.timefuncs import tnow, tdiffsec
26
27 import os
28 import subprocess
29
30 # TODO: Resolve wildcards in commands!!
31 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
32
33 @clsinit_copy
34 class LinuxApplication(ResourceManager):
35     """
36     .. class:: Class Args :
37       
38         :param ec: The Experiment controller
39         :type ec: ExperimentController
40         :param guid: guid of the RM
41         :type guid: int
42
43     .. note::
44
45         A LinuxApplication RM represents a process that can be executed in
46         a remote Linux host using SSH.
47
48         The LinuxApplication RM takes care of uploadin sources and any files
49         needed to run the experiment, to the remote host. 
50         It also allows to provide source compilation (build) and installation 
51         instructions, and takes care of automating the sources build and 
52         installation tasks for the user.
53
54         It is important to note that files uploaded to the remote host have
55         two possible scopes: single-experiment or multi-experiment.
56         Single experiment files are those that will not be re-used by other 
57         experiments. Multi-experiment files are those that will.
58         Sources and shared files are always made available to all experiments.
59
60         Directory structure:
61
62         The directory structure used by LinuxApplication RM at the Linux
63         host is the following:
64
65         ${HOME}/.nepi/nepi-usr --> Base directory for multi-experiment files
66                       |
67         ${LIB}        |- /lib --> Base directory for libraries
68         ${BIN}        |- /bin --> Base directory for binary files
69         ${SRC}        |- /src --> Base directory for sources
70         ${SHARE}      |- /share --> Base directory for other files
71
72         ${HOME}/.nepi/nepi-exp --> Base directory for single-experiment files
73                       |
74         ${EXP_HOME}   |- /<exp-id>  --> Base directory for experiment exp-id
75                           |
76         ${APP_HOME}       |- /<app-guid> --> Base directory for application 
77                                |     specific files (e.g. command.sh, input)
78                                | 
79         ${RUN_HOME}            |- /<run-id> --> Base directory for run specific
80
81     """
82
83     _rtype = "linux::Application"
84     _help = "Runs an application on a Linux host with a BASH command "
85     _platform = "linux"
86
87     @classmethod
88     def _register_attributes(cls):
89         command = Attribute("command", "Command to execute at application start. "
90                 "Note that commands will be executed in the ${RUN_HOME} directory, "
91                 "make sure to take this into account when using relative paths. ", 
92                 flags = Flags.Design)
93         forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", 
94                 flags = Flags.Design)
95         env = Attribute("env", "Environment variables string for command execution",
96                 flags = Flags.Design)
97         sudo = Attribute("sudo", "Run with root privileges", 
98                 flags = Flags.Design)
99         depends = Attribute("depends", 
100                 "Space-separated list of packages required to run the application",
101                 flags = Flags.Design)
102         sources = Attribute("sources", 
103                 "semi-colon separated list of regular files to be uploaded to ${SRC} "
104                 "directory prior to building. Archives won't be expanded automatically. "
105                 "Sources are globally available for all experiments unless "
106                 "cleanHome is set to True (This will delete all sources). ",
107                 flags = Flags.Design)
108         files = Attribute("files", 
109                 "semi-colon separated list of regular miscellaneous files to be uploaded "
110                 "to ${SHARE} directory. "
111                 "Files are globally available for all experiments unless "
112                 "cleanHome is set to True (This will delete all files). ",
113                 flags = Flags.Design)
114         libs = Attribute("libs", 
115                 "semi-colon separated list of libraries (e.g. .so files) to be uploaded "
116                 "to ${LIB} directory. "
117                 "Libraries are globally available for all experiments unless "
118                 "cleanHome is set to True (This will delete all files). ",
119                 flags = Flags.Design)
120         bins = Attribute("bins", 
121                 "semi-colon separated list of binary files to be uploaded "
122                 "to ${BIN} directory. "
123                 "Binaries are globally available for all experiments unless "
124                 "cleanHome is set to True (This will delete all files). ",
125                 flags = Flags.Design)
126         code = Attribute("code", 
127                 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
128                 flags = Flags.Design)
129         build = Attribute("build", 
130                 "Build commands to execute after deploying the sources. "
131                 "Sources are uploaded to the ${SRC} directory and code "
132                 "is uploaded to the ${APP_HOME} directory. \n"
133                 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
134                 "./configure && make && make clean.\n"
135                 "Make sure to make the build commands return with a nonzero exit "
136                 "code on error.",
137                 flags = Flags.Design)
138         install = Attribute("install", 
139                 "Commands to transfer built files to their final destinations. "
140                 "Install commands are executed after build commands. ",
141                 flags = Flags.Design)
142         stdin = Attribute("stdin", "Standard input for the 'command'", 
143                 flags = Flags.Design)
144         tear_down = Attribute("tearDown", "Command to be executed just before " 
145                 "releasing the resource", 
146                 flags = Flags.Design)
147
148         cls._register_attribute(command)
149         cls._register_attribute(forward_x11)
150         cls._register_attribute(env)
151         cls._register_attribute(sudo)
152         cls._register_attribute(depends)
153         cls._register_attribute(sources)
154         cls._register_attribute(code)
155         cls._register_attribute(files)
156         cls._register_attribute(bins)
157         cls._register_attribute(libs)
158         cls._register_attribute(build)
159         cls._register_attribute(install)
160         cls._register_attribute(stdin)
161         cls._register_attribute(tear_down)
162
163     @classmethod
164     def _register_traces(cls):
165         stdout = Trace("stdout", "Standard output stream", enabled = True)
166         stderr = Trace("stderr", "Standard error stream", enabled = True)
167
168         cls._register_trace(stdout)
169         cls._register_trace(stderr)
170
171     def __init__(self, ec, guid):
172         super(LinuxApplication, self).__init__(ec, guid)
173         self._pid = None
174         self._ppid = None
175         self._node = None
176         self._home = "app-%s" % self.guid
177
178         # whether the command should run in foreground attached
179         # to a terminal
180         self._in_foreground = False
181
182         # whether to use sudo to kill the application process
183         self._sudo_kill = False
184
185         # keep a reference to the running process handler when 
186         # the command is not executed as remote daemon in background
187         self._proc = None
188
189         # timestamp of last state check of the application
190         self._last_state_check = tnow()
191         
192     def log_message(self, msg):
193         return " guid %d - host %s - %s " % (self.guid, 
194                 self.node.get("hostname"), msg)
195
196     @property
197     def node(self):
198         if not self._node:
199             node = self.get_connected(LinuxNode.get_rtype())
200             if not node: 
201                 msg = "Application %s guid %d NOT connected to Node" % (
202                         self._rtype, self.guid)
203                 raise RuntimeError, msg
204
205             self._node = node[0]
206
207         return self._node
208
209     @property
210     def app_home(self):
211         return os.path.join(self.node.exp_home, self._home)
212
213     @property
214     def run_home(self):
215         return os.path.join(self.app_home, self.ec.run_id)
216
217     @property
218     def pid(self):
219         return self._pid
220
221     @property
222     def ppid(self):
223         return self._ppid
224
225     @property
226     def in_foreground(self):
227         """ Returns True if the command needs to be executed in foreground.
228         This means that command will be executed using 'execute' instead of
229         'run' ('run' executes a command in background and detached from the 
230         terminal)
231         
232         When using X11 forwarding option, the command can not run in background
233         and detached from a terminal, since we need to keep the terminal attached 
234         to interact with it.
235         """
236         return self.get("forwardX11") or self._in_foreground
237
238     def trace_filepath(self, filename):
239         return os.path.join(self.run_home, filename)
240
241     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
242         self.info("Retrieving '%s' trace %s " % (name, attr))
243
244         path = self.trace_filepath(name)
245         
246         command = "(test -f %s && echo 'success') || echo 'error'" % path
247         (out, err), proc = self.node.execute(command)
248
249         if (err and proc.poll()) or out.find("error") != -1:
250             msg = " Couldn't find trace %s " % name
251             self.error(msg, out, err)
252             return None
253     
254         if attr == TraceAttr.PATH:
255             return path
256
257         if attr == TraceAttr.ALL:
258             (out, err), proc = self.node.check_output(self.run_home, name)
259             
260             if proc.poll():
261                 msg = " Couldn't read trace %s " % name
262                 self.error(msg, out, err)
263                 return None
264
265             return out
266
267         if attr == TraceAttr.STREAM:
268             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
269         elif attr == TraceAttr.SIZE:
270             cmd = "stat -c%%s %s " % path
271
272         (out, err), proc = self.node.execute(cmd)
273
274         if proc.poll():
275             msg = " Couldn't find trace %s " % name
276             self.error(msg, out, err)
277             return None
278         
279         if attr == TraceAttr.SIZE:
280             out = int(out.strip())
281
282         return out
283
284     def do_provision(self):
285         # take a snapshot of the system if user is root
286         # to ensure that cleanProcess will not kill
287         # pre-existent processes
288         if self.node.get("username") == 'root':
289             import pickle
290             procs = dict()
291             ps_aux = "ps aux |awk '{print $2,$11}'"
292             (out, err), proc = self.node.execute(ps_aux)
293             if len(out) != 0:
294                 for line in out.strip().split("\n"):
295                     parts = line.strip().split(" ")
296                     procs[parts[0]] = parts[1]
297                 pickle.dump(procs, open("/tmp/save.proc", "wb"))
298             
299         # create run dir for application
300         self.node.mkdir(self.run_home)
301    
302         # List of all the provision methods to invoke
303         steps = [
304             # upload sources
305             self.upload_sources,
306             # upload files
307             self.upload_files,
308             # upload binaries
309             self.upload_binaries,
310             # upload libraries
311             self.upload_libraries,
312             # upload code
313             self.upload_code,
314             # upload stdin
315             self.upload_stdin,
316             # install dependencies
317             self.install_dependencies,
318             # build
319             self.build,
320             # Install
321             self.install]
322
323         command = []
324
325         # Since provisioning takes a long time, before
326         # each step we check that the EC is still 
327         for step in steps:
328             if self.ec.abort:
329                 self.debug("Interrupting provisioning. EC says 'ABORT")
330                 return
331             
332             ret = step()
333             if ret:
334                 command.append(ret)
335
336         # upload deploy script
337         deploy_command = ";".join(command)
338         self.execute_deploy_command(deploy_command)
339
340         # upload start script
341         self.upload_start_command()
342        
343         self.info("Provisioning finished")
344
345         super(LinuxApplication, self).do_provision()
346
347     def upload_start_command(self, overwrite = False):
348         # Upload command to remote bash script
349         # - only if command can be executed in background and detached
350         command = self.get("command")
351
352         if command and not self.in_foreground:
353             self.info("Uploading command '%s'" % command)
354
355             # replace application specific paths in the command
356             command = self.replace_paths(command)
357             # replace application specific paths in the environment
358             env = self.get("env")
359             env = env and self.replace_paths(env)
360
361             shfile = os.path.join(self.app_home, "start.sh")
362
363             self.node.upload_command(command, 
364                     shfile = shfile,
365                     env = env,
366                     overwrite = overwrite)
367
368     def execute_deploy_command(self, command, prefix="deploy"):
369         if command:
370             # replace application specific paths in the command
371             command = self.replace_paths(command)
372             
373             # replace application specific paths in the environment
374             env = self.get("env")
375             env = env and self.replace_paths(env)
376
377             # Upload the command to a bash script and run it
378             # in background ( but wait until the command has
379             # finished to continue )
380             shfile = os.path.join(self.app_home, "%s.sh" % prefix)
381             self.node.run_and_wait(command, self.run_home,
382                     shfile = shfile, 
383                     overwrite = False,
384                     pidfile = "%s_pidfile" % prefix, 
385                     ecodefile = "%s_exitcode" % prefix, 
386                     stdout = "%s_stdout" % prefix, 
387                     stderr = "%s_stderr" % prefix)
388
389     def upload_sources(self, sources = None, src_dir = None):
390         if not sources:
391             sources = self.get("sources")
392    
393         command = ""
394
395         if not src_dir:
396             src_dir = self.node.src_dir
397
398         if sources:
399             self.info("Uploading sources ")
400
401             sources = map(str.strip, sources.split(";"))
402
403             # Separate sources that should be downloaded from 
404             # the web, from sources that should be uploaded from
405             # the local machine
406             command = []
407             for source in list(sources):
408                 if source.startswith("http") or source.startswith("https"):
409                     # remove the hhtp source from the sources list
410                     sources.remove(source)
411
412                     command.append( " ( " 
413                             # Check if the source already exists
414                             " ls %(src_dir)s/%(basename)s "
415                             " || ( "
416                             # If source doesn't exist, download it and check
417                             # that it it downloaded ok
418                             "   wget -c --directory-prefix=%(src_dir)s %(source)s && "
419                             "   ls %(src_dir)s/%(basename)s "
420                             " ) ) " % {
421                                 "basename": os.path.basename(source),
422                                 "source": source,
423                                 "src_dir": src_dir
424                                 })
425
426             command = " && ".join(command)
427
428             # replace application specific paths in the command
429             command = self.replace_paths(command)
430        
431             if sources:
432                 sources = ';'.join(sources)
433                 self.node.upload(sources, src_dir, overwrite = False)
434
435         return command
436
437     def upload_files(self, files = None):
438         if not files:
439             files = self.get("files")
440
441         if files:
442             self.info("Uploading files %s " % files)
443             self.node.upload(files, self.node.share_dir, overwrite = False)
444
445     def upload_libraries(self, libs = None):
446         if not libs:
447             libs = self.get("libs")
448
449         if libs:
450             self.info("Uploading libraries %s " % libaries)
451             self.node.upload(libs, self.node.lib_dir, overwrite = False)
452
453     def upload_binaries(self, bins = None):
454         if not bins:
455             bins = self.get("bins")
456
457         if bins:
458             self.info("Uploading binaries %s " % binaries)
459             self.node.upload(bins, self.node.bin_dir, overwrite = False)
460
461     def upload_code(self, code = None):
462         if not code:
463             code = self.get("code")
464
465         if code:
466             self.info("Uploading code")
467
468             dst = os.path.join(self.app_home, "code")
469             self.node.upload(code, dst, overwrite = False, text = True)
470
471     def upload_stdin(self, stdin = None):
472         if not stdin:
473            stdin = self.get("stdin")
474
475         if stdin:
476             # create dir for sources
477             self.info("Uploading stdin")
478             
479             # upload stdin file to ${SHARE_DIR} directory
480             if os.path.isfile(stdin):
481                 basename = os.path.basename(stdin)
482                 dst = os.path.join(self.node.share_dir, basename)
483             else:
484                 dst = os.path.join(self.app_home, "stdin")
485
486             self.node.upload(stdin, dst, overwrite = False, text = True)
487
488             # create "stdin" symlink on ${APP_HOME} directory
489             command = "( cd %(app_home)s ; [ ! -f stdin ] &&  ln -s %(stdin)s stdin )" % ({
490                 "app_home": self.app_home, 
491                 "stdin": dst })
492
493             return command
494
495     def install_dependencies(self, depends = None):
496         if not depends:
497             depends = self.get("depends")
498
499         if depends:
500             self.info("Installing dependencies %s" % depends)
501             return self.node.install_packages_command(depends)
502
503     def build(self, build = None):
504         if not build:
505             build = self.get("build")
506
507         if build:
508             self.info("Building sources ")
509             
510             # replace application specific paths in the command
511             return self.replace_paths(build)
512
513     def install(self, install = None):
514         if not install:
515             install = self.get("install")
516
517         if install:
518             self.info("Installing sources ")
519
520             # replace application specific paths in the command
521             return self.replace_paths(install)
522
523     def do_deploy(self):
524         # Wait until node is associated and deployed
525         node = self.node
526         if not node or node.state < ResourceState.READY:
527             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state)
528             self.ec.schedule(self.reschedule_delay, self.deploy)
529         else:
530             command = self.get("command") or ""
531             self.info("Deploying command '%s' " % command)
532             self.do_discover()
533             self.do_provision()
534
535             super(LinuxApplication, self).do_deploy()
536    
537     def do_start(self):
538         command = self.get("command")
539
540         self.info("Starting command '%s'" % command)
541
542         if not command:
543             # If no command was given (i.e. Application was used for dependency
544             # installation), then the application is directly marked as STOPPED
545             super(LinuxApplication, self).set_stopped()
546         else:
547             if self.in_foreground:
548                 self._run_in_foreground()
549             else:
550                 self._run_in_background()
551
552             super(LinuxApplication, self).do_start()
553
554     def _run_in_foreground(self):
555         command = self.get("command")
556         sudo = self.get("sudo") or False
557         x11 = self.get("forwardX11")
558         env = self.get("env")
559
560         # Command will be launched in foreground and attached to the
561         # terminal using the node 'execute' in non blocking mode.
562
563         # We save the reference to the process in self._proc 
564         # to be able to kill the process from the stop method.
565         # We also set blocking = False, since we don't want the
566         # thread to block until the execution finishes.
567         (out, err), self._proc = self.execute_command(command, 
568                 env = env,
569                 sudo = sudo,
570                 forward_x11 = x11,
571                 blocking = False)
572
573         if self._proc.poll():
574             self.error(msg, out, err)
575             raise RuntimeError, msg
576
577     def _run_in_background(self):
578         command = self.get("command")
579         env = self.get("env")
580         sudo = self.get("sudo") or False
581
582         stdout = "stdout"
583         stderr = "stderr"
584         stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
585                 else None
586
587         # Command will be run as a daemon in baground and detached from any
588         # terminal.
589         # The command to run was previously uploaded to a bash script
590         # during deployment, now we launch the remote script using 'run'
591         # method from the node.
592         cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
593         (out, err), proc = self.node.run(cmd, self.run_home, 
594             stdin = stdin, 
595             stdout = stdout,
596             stderr = stderr,
597             sudo = sudo)
598
599         # check if execution errors occurred
600         msg = " Failed to start command '%s' " % command
601         
602         if proc.poll():
603             self.error(msg, out, err)
604             raise RuntimeError, msg
605     
606         # Wait for pid file to be generated
607         pid, ppid = self.node.wait_pid(self.run_home)
608         if pid: self._pid = int(pid)
609         if ppid: self._ppid = int(ppid)
610
611         # If the process is not running, check for error information
612         # on the remote machine
613         if not self.pid or not self.ppid:
614             (out, err), proc = self.node.check_errors(self.run_home,
615                     stderr = stderr) 
616
617             # Out is what was written in the stderr file
618             if err:
619                 msg = " Failed to start command '%s' " % command
620                 self.error(msg, out, err)
621                 raise RuntimeError, msg
622     
623     def do_stop(self):
624         """ Stops application execution
625         """
626         command = self.get('command') or ''
627
628         if self.state == ResourceState.STARTED:
629         
630             self.info("Stopping command '%s' " % command)
631         
632             # If the command is running in foreground (it was launched using
633             # the node 'execute' method), then we use the handler to the Popen
634             # process to kill it. Else we send a kill signal using the pid and ppid
635             # retrieved after running the command with the node 'run' method
636             if self._proc:
637                 self._proc.kill()
638             else:
639                 # Only try to kill the process if the pid and ppid
640                 # were retrieved
641                 if self.pid and self.ppid:
642                     (out, err), proc = self.node.kill(self.pid, self.ppid,
643                             sudo = self._sudo_kill)
644
645                     """
646                     # TODO: check if execution errors occurred
647                     if (proc and proc.poll()) or err:
648                         msg = " Failed to STOP command '%s' " % self.get("command")
649                         self.error(msg, out, err)
650                     """
651
652             super(LinuxApplication, self).do_stop()
653
654     def do_release(self):
655         self.info("Releasing resource")
656
657         self.do_stop()
658         
659         tear_down = self.get("tearDown")
660         if tear_down:
661             self.node.execute(tear_down)
662
663         hard_release = self.get("hardRelease")
664         if hard_release:
665             self.node.rmdir(self.app_home)
666
667         super(LinuxApplication, self).do_release()
668         
669     @property
670     def state(self):
671         """ Returns the state of the application
672         """
673         if self._state == ResourceState.STARTED:
674             if self.in_foreground:
675                 # Check if the process we used to execute the command
676                 # is still running ...
677                 retcode = self._proc.poll()
678
679                 # retcode == None -> running
680                 # retcode > 0 -> error
681                 # retcode == 0 -> finished
682                 if retcode:
683                     out = ""
684                     msg = " Failed to execute command '%s'" % self.get("command")
685                     err = self._proc.stderr.read()
686                     self.error(msg, out, err)
687                     self.do_fail()
688
689                 elif retcode == 0:
690                     self.set_stopped()
691             else:
692                 # We need to query the status of the command we launched in 
693                 # background. In order to avoid overwhelming the remote host and
694                 # the local processor with too many ssh queries, the state is only
695                 # requested every 'state_check_delay' seconds.
696                 state_check_delay = 0.5
697                 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
698                     if self.pid and self.ppid:
699                         # Make sure the process is still running in background
700                         status = self.node.status(self.pid, self.ppid)
701
702                         if status == ProcStatus.FINISHED:
703                             # If the program finished, check if execution
704                             # errors occurred
705                             (out, err), proc = self.node.check_errors(
706                                     self.run_home)
707
708                             if err:
709                                 msg = "Failed to execute command '%s'" % \
710                                         self.get("command")
711                                 self.error(msg, out, err)
712                                 self.do_fail()
713                             else:
714                                 self.set_stopped()
715
716                     self._last_state_check = tnow()
717
718         return self._state
719
720     def execute_command(self, command, 
721             env=None,
722             sudo=False,
723             tty=False,
724             forward_x11=False,
725             blocking=False):
726
727         environ = ""
728         if env:
729             environ = self.node.format_environment(env, inline=True)
730         command = environ + command
731         command = self.replace_paths(command)
732
733         return self.node.execute(command,
734                 sudo=sudo,
735                 tty=tty,
736                 forward_x11=forward_x11,
737                 blocking=blocking)
738
739     def replace_paths(self, command, node=None, app_home=None, run_home=None):
740         """
741         Replace all special path tags with shell-escaped actual paths.
742         """
743         if not node:
744             node=self.node
745
746         if not app_home:
747             app_home=self.app_home
748
749         if not run_home:
750             run_home = self.run_home
751
752         return ( command
753             .replace("${USR}", node.usr_dir)
754             .replace("${LIB}", node.lib_dir)
755             .replace("${BIN}", node.bin_dir)
756             .replace("${SRC}", node.src_dir)
757             .replace("${SHARE}", node.share_dir)
758             .replace("${EXP}", node.exp_dir)
759             .replace("${EXP_HOME}", node.exp_home)
760             .replace("${APP_HOME}", app_home)
761             .replace("${RUN_HOME}", run_home)
762             .replace("${NODE_HOME}", node.node_home)
763             .replace("${HOME}", node.home_dir)
764             )
765
766     def valid_connection(self, guid):
767         # TODO: Validate!
768         return True
769