applied the except and raise fixers to the master branch to close the gap with py3
[nepi.git] / src / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.trace import Trace, TraceAttr
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sshfuncs import ProcStatus
25 from nepi.util.timefuncs import tnow, tdiffsec
26
27 import os
28 import subprocess
29
30 # TODO: Resolve wildcards in commands!!
31 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
32
33 @clsinit_copy
34 class LinuxApplication(ResourceManager):
35     """
36     .. class:: Class Args :
37       
38         :param ec: The Experiment controller
39         :type ec: ExperimentController
40         :param guid: guid of the RM
41         :type guid: int
42
43     .. note::
44
45         A LinuxApplication RM represents a process that can be executed in
46         a remote Linux host using SSH.
47
48         The LinuxApplication RM takes care of uploadin sources and any files
49         needed to run the experiment, to the remote host. 
50         It also allows to provide source compilation (build) and installation 
51         instructions, and takes care of automating the sources build and 
52         installation tasks for the user.
53
54         It is important to note that files uploaded to the remote host have
55         two possible scopes: single-experiment or multi-experiment.
56         Single experiment files are those that will not be re-used by other 
57         experiments. Multi-experiment files are those that will.
58         Sources and shared files are always made available to all experiments.
59
60         Directory structure:
61
62         The directory structure used by LinuxApplication RM at the Linux
63         host is the following:
64
65         ${HOME}/.nepi/nepi-usr --> Base directory for multi-experiment files
66                       |
67         ${LIB}        |- /lib --> Base directory for libraries
68         ${BIN}        |- /bin --> Base directory for binary files
69         ${SRC}        |- /src --> Base directory for sources
70         ${SHARE}      |- /share --> Base directory for other files
71
72         ${HOME}/.nepi/nepi-exp --> Base directory for single-experiment files
73                       |
74         ${EXP_HOME}   |- /<exp-id>  --> Base directory for experiment exp-id
75                           |
76         ${APP_HOME}       |- /<app-guid> --> Base directory for application 
77                                |     specific files (e.g. command.sh, input)
78                                | 
79         ${RUN_HOME}            |- /<run-id> --> Base directory for run specific
80
81     """
82
83     _rtype = "linux::Application"
84     _help = "Runs an application on a Linux host with a BASH command "
85     _platform = "linux"
86
87     @classmethod
88     def _register_attributes(cls):
89         command = Attribute("command", "Command to execute at application start. "
90                 "Note that commands will be executed in the ${RUN_HOME} directory, "
91                 "make sure to take this into account when using relative paths. ", 
92                 flags = Flags.Design)
93         forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", 
94                 flags = Flags.Design)
95         env = Attribute("env", "Environment variables string for command execution",
96                 flags = Flags.Design)
97         sudo = Attribute("sudo", "Run with root privileges", 
98                 flags = Flags.Design)
99         depends = Attribute("depends", 
100                 "Space-separated list of packages required to run the application",
101                 flags = Flags.Design)
102         sources = Attribute("sources", 
103                 "semi-colon separated list of regular files to be uploaded to ${SRC} "
104                 "directory prior to building. Archives won't be expanded automatically. "
105                 "Sources are globally available for all experiments unless "
106                 "cleanHome is set to True (This will delete all sources). ",
107                 flags = Flags.Design)
108         files = Attribute("files", 
109                 "semi-colon separated list of regular miscellaneous files to be uploaded "
110                 "to ${SHARE} directory. "
111                 "Files are globally available for all experiments unless "
112                 "cleanHome is set to True (This will delete all files). ",
113                 flags = Flags.Design)
114         libs = Attribute("libs", 
115                 "semi-colon separated list of libraries (e.g. .so files) to be uploaded "
116                 "to ${LIB} directory. "
117                 "Libraries are globally available for all experiments unless "
118                 "cleanHome is set to True (This will delete all files). ",
119                 flags = Flags.Design)
120         bins = Attribute("bins", 
121                 "semi-colon separated list of binary files to be uploaded "
122                 "to ${BIN} directory. "
123                 "Binaries are globally available for all experiments unless "
124                 "cleanHome is set to True (This will delete all files). ",
125                 flags = Flags.Design)
126         code = Attribute("code", 
127                 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
128                 flags = Flags.Design)
129         build = Attribute("build", 
130                 "Build commands to execute after deploying the sources. "
131                 "Sources are uploaded to the ${SRC} directory and code "
132                 "is uploaded to the ${APP_HOME} directory. \n"
133                 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
134                 "./configure && make && make clean.\n"
135                 "Make sure to make the build commands return with a nonzero exit "
136                 "code on error.",
137                 flags = Flags.Design)
138         install = Attribute("install", 
139                 "Commands to transfer built files to their final destinations. "
140                 "Install commands are executed after build commands. ",
141                 flags = Flags.Design)
142         stdin = Attribute("stdin", "Standard input for the 'command'", 
143                 flags = Flags.Design)
144         tear_down = Attribute("tearDown", "Command to be executed just before " 
145                 "releasing the resource", 
146                 flags = Flags.Design)
147
148         cls._register_attribute(command)
149         cls._register_attribute(forward_x11)
150         cls._register_attribute(env)
151         cls._register_attribute(sudo)
152         cls._register_attribute(depends)
153         cls._register_attribute(sources)
154         cls._register_attribute(code)
155         cls._register_attribute(files)
156         cls._register_attribute(bins)
157         cls._register_attribute(libs)
158         cls._register_attribute(build)
159         cls._register_attribute(install)
160         cls._register_attribute(stdin)
161         cls._register_attribute(tear_down)
162
163     @classmethod
164     def _register_traces(cls):
165         stdout = Trace("stdout", "Standard output stream", enabled = True)
166         stderr = Trace("stderr", "Standard error stream", enabled = True)
167
168         cls._register_trace(stdout)
169         cls._register_trace(stderr)
170
171     def __init__(self, ec, guid):
172         super(LinuxApplication, self).__init__(ec, guid)
173         self._pid = None
174         self._ppid = None
175         self._node = None
176         self._home = "app-%s" % self.guid
177
178         # whether the command should run in foreground attached
179         # to a terminal
180         self._in_foreground = False
181
182         # whether to use sudo to kill the application process
183         self._sudo_kill = False
184
185         # keep a reference to the running process handler when 
186         # the command is not executed as remote daemon in background
187         self._proc = None
188
189         # timestamp of last state check of the application
190         self._last_state_check = tnow()
191         
192     def log_message(self, msg):
193         return " guid %d - host %s - %s " % (self.guid, 
194                 self.node.get("hostname"), msg)
195
196     @property
197     def node(self):
198         if not self._node:
199             node = self.get_connected(LinuxNode.get_rtype())
200             if not node: 
201                 msg = "Application %s guid %d NOT connected to Node" % (
202                         self._rtype, self.guid)
203                 raise RuntimeError(msg)
204
205             self._node = node[0]
206
207         return self._node
208
209     @property
210     def app_home(self):
211         return os.path.join(self.node.exp_home, self._home)
212
213     @property
214     def run_home(self):
215         return os.path.join(self.app_home, self.ec.run_id)
216
217     @property
218     def pid(self):
219         return self._pid
220
221     @property
222     def ppid(self):
223         return self._ppid
224
225     @property
226     def in_foreground(self):
227         """ Returns True if the command needs to be executed in foreground.
228         This means that command will be executed using 'execute' instead of
229         'run' ('run' executes a command in background and detached from the 
230         terminal)
231         
232         When using X11 forwarding option, the command can not run in background
233         and detached from a terminal, since we need to keep the terminal attached 
234         to interact with it.
235         """
236         return self.get("forwardX11") or self._in_foreground
237
238     def trace_filepath(self, filename):
239         return os.path.join(self.run_home, filename)
240
241     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
242         self.info("Retrieving '%s' trace %s " % (name, attr))
243
244         path = self.trace_filepath(name)
245         
246         command = "(test -f %s && echo 'success') || echo 'error'" % path
247         (out, err), proc = self.node.execute(command)
248
249         if (err and proc.poll()) or out.find("error") != -1:
250             msg = " Couldn't find trace %s " % name
251             self.error(msg, out, err)
252             return None
253     
254         if attr == TraceAttr.PATH:
255             return path
256
257         if attr == TraceAttr.ALL:
258             (out, err), proc = self.node.check_output(self.run_home, name)
259             
260             if proc.poll():
261                 msg = " Couldn't read trace %s " % name
262                 self.error(msg, out, err)
263                 return None
264
265             return out
266
267         if attr == TraceAttr.STREAM:
268             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
269         elif attr == TraceAttr.SIZE:
270             cmd = "stat -c%%s %s " % path
271
272         (out, err), proc = self.node.execute(cmd)
273
274         if proc.poll():
275             msg = " Couldn't find trace %s " % name
276             self.error(msg, out, err)
277             return None
278         
279         if attr == TraceAttr.SIZE:
280             out = int(out.strip())
281
282         return out
283
284     def do_provision(self):
285         # take a snapshot of the system if user is root
286         # to ensure that cleanProcess will not kill
287         # pre-existent processes
288         if self.node.get("username") == 'root':
289             import pickle
290             procs = dict()
291             ps_aux = "ps aux |awk '{print $2,$11}'"
292             (out, err), proc = self.node.execute(ps_aux)
293             if len(out) != 0:
294                 for line in out.strip().split("\n"):
295                     parts = line.strip().split(" ")
296                     procs[parts[0]] = parts[1]
297                 with open("/tmp/save.proc", "wb") as pickle_file:
298                     pickle.dump(procs, pickle_file)
299             
300         # create run dir for application
301         self.node.mkdir(self.run_home)
302    
303         # List of all the provision methods to invoke
304         steps = [
305             # upload sources
306             self.upload_sources,
307             # upload files
308             self.upload_files,
309             # upload binaries
310             self.upload_binaries,
311             # upload libraries
312             self.upload_libraries,
313             # upload code
314             self.upload_code,
315             # upload stdin
316             self.upload_stdin,
317             # install dependencies
318             self.install_dependencies,
319             # build
320             self.build,
321             # Install
322             self.install]
323
324         command = []
325
326         # Since provisioning takes a long time, before
327         # each step we check that the EC is still 
328         for step in steps:
329             if self.ec.abort:
330                 self.debug("Interrupting provisioning. EC says 'ABORT")
331                 return
332             
333             ret = step()
334             if ret:
335                 command.append(ret)
336
337         # upload deploy script
338         deploy_command = ";".join(command)
339         self.execute_deploy_command(deploy_command)
340
341         # upload start script
342         self.upload_start_command()
343        
344         self.info("Provisioning finished")
345
346         super(LinuxApplication, self).do_provision()
347
348     def upload_start_command(self, overwrite = False):
349         # Upload command to remote bash script
350         # - only if command can be executed in background and detached
351         command = self.get("command")
352
353         if command and not self.in_foreground:
354             self.info("Uploading command '%s'" % command)
355
356             # replace application specific paths in the command
357             command = self.replace_paths(command)
358             # replace application specific paths in the environment
359             env = self.get("env")
360             env = env and self.replace_paths(env)
361
362             shfile = os.path.join(self.app_home, "start.sh")
363
364             self.node.upload_command(command, 
365                     shfile = shfile,
366                     env = env,
367                     overwrite = overwrite)
368
369     def execute_deploy_command(self, command, prefix="deploy"):
370         if command:
371             # replace application specific paths in the command
372             command = self.replace_paths(command)
373             
374             # replace application specific paths in the environment
375             env = self.get("env")
376             env = env and self.replace_paths(env)
377
378             # Upload the command to a bash script and run it
379             # in background ( but wait until the command has
380             # finished to continue )
381             shfile = os.path.join(self.app_home, "%s.sh" % prefix)
382             self.node.run_and_wait(command, self.run_home,
383                     shfile = shfile, 
384                     overwrite = False,
385                     pidfile = "%s_pidfile" % prefix, 
386                     ecodefile = "%s_exitcode" % prefix, 
387                     stdout = "%s_stdout" % prefix, 
388                     stderr = "%s_stderr" % prefix)
389
390     def upload_sources(self, sources = None, src_dir = None):
391         if not sources:
392             sources = self.get("sources")
393    
394         command = ""
395
396         if not src_dir:
397             src_dir = self.node.src_dir
398
399         if sources:
400             self.info("Uploading sources ")
401
402             sources = map(str.strip, sources.split(";"))
403
404             # Separate sources that should be downloaded from 
405             # the web, from sources that should be uploaded from
406             # the local machine
407             command = []
408             for source in list(sources):
409                 if source.startswith("http") or source.startswith("https"):
410                     # remove the hhtp source from the sources list
411                     sources.remove(source)
412
413                     command.append( " ( " 
414                             # Check if the source already exists
415                             " ls %(src_dir)s/%(basename)s "
416                             " || ( "
417                             # If source doesn't exist, download it and check
418                             # that it it downloaded ok
419                             "   wget -c --directory-prefix=%(src_dir)s %(source)s && "
420                             "   ls %(src_dir)s/%(basename)s "
421                             " ) ) " % {
422                                 "basename": os.path.basename(source),
423                                 "source": source,
424                                 "src_dir": src_dir
425                                 })
426
427             command = " && ".join(command)
428
429             # replace application specific paths in the command
430             command = self.replace_paths(command)
431        
432             if sources:
433                 sources = ';'.join(sources)
434                 self.node.upload(sources, src_dir, overwrite = False)
435
436         return command
437
438     def upload_files(self, files = None):
439         if not files:
440             files = self.get("files")
441
442         if files:
443             self.info("Uploading files %s " % files)
444             self.node.upload(files, self.node.share_dir, overwrite = False)
445
446     def upload_libraries(self, libs = None):
447         if not libs:
448             libs = self.get("libs")
449
450         if libs:
451             self.info("Uploading libraries %s " % libaries)
452             self.node.upload(libs, self.node.lib_dir, overwrite = False)
453
454     def upload_binaries(self, bins = None):
455         if not bins:
456             bins = self.get("bins")
457
458         if bins:
459             self.info("Uploading binaries %s " % binaries)
460             self.node.upload(bins, self.node.bin_dir, overwrite = False)
461
462     def upload_code(self, code = None):
463         if not code:
464             code = self.get("code")
465
466         if code:
467             self.info("Uploading code")
468
469             dst = os.path.join(self.app_home, "code")
470             self.node.upload(code, dst, overwrite = False, text = True)
471
472     def upload_stdin(self, stdin = None):
473         if not stdin:
474            stdin = self.get("stdin")
475
476         if stdin:
477             # create dir for sources
478             self.info("Uploading stdin")
479             
480             # upload stdin file to ${SHARE_DIR} directory
481             if os.path.isfile(stdin):
482                 basename = os.path.basename(stdin)
483                 dst = os.path.join(self.node.share_dir, basename)
484             else:
485                 dst = os.path.join(self.app_home, "stdin")
486
487             self.node.upload(stdin, dst, overwrite = False, text = True)
488
489             # create "stdin" symlink on ${APP_HOME} directory
490             command = "( cd %(app_home)s ; [ ! -f stdin ] &&  ln -s %(stdin)s stdin )" % ({
491                 "app_home": self.app_home, 
492                 "stdin": dst })
493
494             return command
495
496     def install_dependencies(self, depends = None):
497         if not depends:
498             depends = self.get("depends")
499
500         if depends:
501             self.info("Installing dependencies %s" % depends)
502             return self.node.install_packages_command(depends)
503
504     def build(self, build = None):
505         if not build:
506             build = self.get("build")
507
508         if build:
509             self.info("Building sources ")
510             
511             # replace application specific paths in the command
512             return self.replace_paths(build)
513
514     def install(self, install = None):
515         if not install:
516             install = self.get("install")
517
518         if install:
519             self.info("Installing sources ")
520
521             # replace application specific paths in the command
522             return self.replace_paths(install)
523
524     def do_deploy(self):
525         # Wait until node is associated and deployed
526         node = self.node
527         if not node or node.state < ResourceState.READY:
528             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state)
529             self.ec.schedule(self.reschedule_delay, self.deploy)
530         else:
531             command = self.get("command") or ""
532             self.info("Deploying command '%s' " % command)
533             self.do_discover()
534             self.do_provision()
535
536             super(LinuxApplication, self).do_deploy()
537    
538     def do_start(self):
539         command = self.get("command")
540
541         self.info("Starting command '%s'" % command)
542
543         if not command:
544             # If no command was given (i.e. Application was used for dependency
545             # installation), then the application is directly marked as STOPPED
546             super(LinuxApplication, self).set_stopped()
547         else:
548             if self.in_foreground:
549                 self._run_in_foreground()
550             else:
551                 self._run_in_background()
552
553             super(LinuxApplication, self).do_start()
554
555     def _run_in_foreground(self):
556         command = self.get("command")
557         sudo = self.get("sudo") or False
558         x11 = self.get("forwardX11")
559         env = self.get("env")
560
561         # Command will be launched in foreground and attached to the
562         # terminal using the node 'execute' in non blocking mode.
563
564         # We save the reference to the process in self._proc 
565         # to be able to kill the process from the stop method.
566         # We also set blocking = False, since we don't want the
567         # thread to block until the execution finishes.
568         (out, err), self._proc = self.execute_command(command, 
569                 env = env,
570                 sudo = sudo,
571                 forward_x11 = x11,
572                 blocking = False)
573
574         if self._proc.poll():
575             self.error(msg, out, err)
576             raise RuntimeError(msg)
577
578     def _run_in_background(self):
579         command = self.get("command")
580         env = self.get("env")
581         sudo = self.get("sudo") or False
582
583         stdout = "stdout"
584         stderr = "stderr"
585         stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
586                 else None
587
588         # Command will be run as a daemon in baground and detached from any
589         # terminal.
590         # The command to run was previously uploaded to a bash script
591         # during deployment, now we launch the remote script using 'run'
592         # method from the node.
593         cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
594         (out, err), proc = self.node.run(cmd, self.run_home, 
595             stdin = stdin, 
596             stdout = stdout,
597             stderr = stderr,
598             sudo = sudo)
599
600         # check if execution errors occurred
601         msg = " Failed to start command '%s' " % command
602         
603         if proc.poll():
604             self.error(msg, out, err)
605             raise RuntimeError(msg)
606     
607         # Wait for pid file to be generated
608         pid, ppid = self.node.wait_pid(self.run_home)
609         if pid: self._pid = int(pid)
610         if ppid: self._ppid = int(ppid)
611
612         # If the process is not running, check for error information
613         # on the remote machine
614         if not self.pid or not self.ppid:
615             (out, err), proc = self.node.check_errors(self.run_home,
616                     stderr = stderr) 
617
618             # Out is what was written in the stderr file
619             if err:
620                 msg = " Failed to start command '%s' " % command
621                 self.error(msg, out, err)
622                 raise RuntimeError(msg)
623     
624     def do_stop(self):
625         """ Stops application execution
626         """
627         command = self.get('command') or ''
628
629         if self.state == ResourceState.STARTED:
630         
631             self.info("Stopping command '%s' " % command)
632         
633             # If the command is running in foreground (it was launched using
634             # the node 'execute' method), then we use the handler to the Popen
635             # process to kill it. Else we send a kill signal using the pid and ppid
636             # retrieved after running the command with the node 'run' method
637             if self._proc:
638                 self._proc.kill()
639             else:
640                 # Only try to kill the process if the pid and ppid
641                 # were retrieved
642                 if self.pid and self.ppid:
643                     (out, err), proc = self.node.kill(self.pid, self.ppid,
644                             sudo = self._sudo_kill)
645
646                     """
647                     # TODO: check if execution errors occurred
648                     if (proc and proc.poll()) or err:
649                         msg = " Failed to STOP command '%s' " % self.get("command")
650                         self.error(msg, out, err)
651                     """
652
653             super(LinuxApplication, self).do_stop()
654
655     def do_release(self):
656         self.info("Releasing resource")
657
658         self.do_stop()
659         
660         tear_down = self.get("tearDown")
661         if tear_down:
662             self.node.execute(tear_down)
663
664         hard_release = self.get("hardRelease")
665         if hard_release:
666             self.node.rmdir(self.app_home)
667
668         super(LinuxApplication, self).do_release()
669         
670     @property
671     def state(self):
672         """ Returns the state of the application
673         """
674         if self._state == ResourceState.STARTED:
675             if self.in_foreground:
676                 # Check if the process we used to execute the command
677                 # is still running ...
678                 retcode = self._proc.poll()
679
680                 # retcode == None -> running
681                 # retcode > 0 -> error
682                 # retcode == 0 -> finished
683                 if retcode:
684                     out = ""
685                     msg = " Failed to execute command '%s'" % self.get("command")
686                     err = self._proc.stderr.read()
687                     self.error(msg, out, err)
688                     self.do_fail()
689
690                 elif retcode == 0:
691                     self.set_stopped()
692             else:
693                 # We need to query the status of the command we launched in 
694                 # background. In order to avoid overwhelming the remote host and
695                 # the local processor with too many ssh queries, the state is only
696                 # requested every 'state_check_delay' seconds.
697                 state_check_delay = 0.5
698                 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
699                     if self.pid and self.ppid:
700                         # Make sure the process is still running in background
701                         status = self.node.status(self.pid, self.ppid)
702
703                         if status == ProcStatus.FINISHED:
704                             # If the program finished, check if execution
705                             # errors occurred
706                             (out, err), proc = self.node.check_errors(
707                                     self.run_home)
708
709                             if err:
710                                 msg = "Failed to execute command '%s'" % \
711                                         self.get("command")
712                                 self.error(msg, out, err)
713                                 self.do_fail()
714                             else:
715                                 self.set_stopped()
716
717                     self._last_state_check = tnow()
718
719         return self._state
720
721     def execute_command(self, command, 
722             env=None,
723             sudo=False,
724             tty=False,
725             forward_x11=False,
726             blocking=False):
727
728         environ = ""
729         if env:
730             environ = self.node.format_environment(env, inline=True)
731         command = environ + command
732         command = self.replace_paths(command)
733
734         return self.node.execute(command,
735                 sudo=sudo,
736                 tty=tty,
737                 forward_x11=forward_x11,
738                 blocking=blocking)
739
740     def replace_paths(self, command, node=None, app_home=None, run_home=None):
741         """
742         Replace all special path tags with shell-escaped actual paths.
743         """
744         if not node:
745             node=self.node
746
747         if not app_home:
748             app_home=self.app_home
749
750         if not run_home:
751             run_home = self.run_home
752
753         return ( command
754             .replace("${USR}", node.usr_dir)
755             .replace("${LIB}", node.lib_dir)
756             .replace("${BIN}", node.bin_dir)
757             .replace("${SRC}", node.src_dir)
758             .replace("${SHARE}", node.share_dir)
759             .replace("${EXP}", node.exp_dir)
760             .replace("${EXP_HOME}", node.exp_home)
761             .replace("${APP_HOME}", app_home)
762             .replace("${RUN_HOME}", run_home)
763             .replace("${NODE_HOME}", node.node_home)
764             .replace("${HOME}", node.home_dir)
765             )
766
767     def valid_connection(self, guid):
768         # TODO: Validate!
769         return True
770