Code cleanup. Setting resource state through specific functions
[nepi.git] / src / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
23         reschedule_delay
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sshfuncs import ProcStatus
26 from nepi.util.timefuncs import tnow, tdiffsec
27
28 import os
29 import subprocess
30
31 # TODO: Resolve wildcards in commands!!
32 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
33
34 @clsinit
35 class LinuxApplication(ResourceManager):
36     """
37     .. class:: Class Args :
38       
39         :param ec: The Experiment controller
40         :type ec: ExperimentController
41         :param guid: guid of the RM
42         :type guid: int
43
44     .. note::
45
46     A LinuxApplication RM represents a process that can be executed in
47     a remote Linux host using SSH.
48
49     The LinuxApplication RM takes care of uploadin sources and any files
50     needed to run the experiment, to the remote host. 
51     It also allows to provide source compilation (build) and installation 
52     instructions, and takes care of automating the sources build and 
53     installation tasks for the user.
54
55     It is important to note that files uploaded to the remote host have
56     two possible scopes: single-experiment or multi-experiment.
57     Single experiment files are those that will not be re-used by other 
58     experiments. Multi-experiment files are those that will.
59     Sources and shared files are always made available to all experiments.
60
61     Directory structure:
62
63     The directory structure used by LinuxApplication RM at the Linux
64     host is the following:
65
66         ${HOME}/nepi-usr --> Base directory for multi-experiment files
67                       |
68         ${LIB}        |- /lib --> Base directory for libraries
69         ${BIN}        |- /bin --> Base directory for binary files
70         ${SRC}        |- /src --> Base directory for sources
71         ${SHARE}      |- /share --> Base directory for other files
72
73         ${HOME}/nepi-exp --> Base directory for single-experiment files
74                       |
75         ${EXP_HOME}   |- /<exp-id>  --> Base directory for experiment exp-id
76                           |
77         ${APP_HOME}       |- /<app-guid> --> Base directory for application 
78                                |     specific files (e.g. command.sh, input)
79                                | 
80         ${RUN_HOME}            |- /<run-id> --> Base directory for run specific
81
82     """
83
84     _rtype = "LinuxApplication"
85
86     @classmethod
87     def _register_attributes(cls):
88         command = Attribute("command", "Command to execute at application start. "
89                 "Note that commands will be executed in the ${RUN_HOME} directory, "
90                 "make sure to take this into account when using relative paths. ", 
91                 flags = Flags.ExecReadOnly)
92         forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", 
93                 flags = Flags.ExecReadOnly)
94         env = Attribute("env", "Environment variables string for command execution",
95                 flags = Flags.ExecReadOnly)
96         sudo = Attribute("sudo", "Run with root privileges", 
97                 flags = Flags.ExecReadOnly)
98         depends = Attribute("depends", 
99                 "Space-separated list of packages required to run the application",
100                 flags = Flags.ExecReadOnly)
101         sources = Attribute("sources", 
102                 "Space-separated list of regular files to be uploaded to ${SRC} "
103                 "directory prior to building. Archives won't be expanded automatically. "
104                 "Sources are globally available for all experiments unless "
105                 "cleanHome is set to True (This will delete all sources). ",
106                 flags = Flags.ExecReadOnly)
107         files = Attribute("files", 
108                 "Space-separated list of regular miscellaneous files to be uploaded "
109                 "to ${SHARE} directory. "
110                 "Files are globally available for all experiments unless "
111                 "cleanHome is set to True (This will delete all files). ",
112                 flags = Flags.ExecReadOnly)
113         libs = Attribute("libs", 
114                 "Space-separated list of libraries (e.g. .so files) to be uploaded "
115                 "to ${LIB} directory. "
116                 "Libraries are globally available for all experiments unless "
117                 "cleanHome is set to True (This will delete all files). ",
118                 flags = Flags.ExecReadOnly)
119         bins = Attribute("bins", 
120                 "Space-separated list of binary files to be uploaded "
121                 "to ${BIN} directory. "
122                 "Binaries are globally available for all experiments unless "
123                 "cleanHome is set to True (This will delete all files). ",
124                 flags = Flags.ExecReadOnly)
125         code = Attribute("code", 
126                 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
127                 flags = Flags.ExecReadOnly)
128         build = Attribute("build", 
129                 "Build commands to execute after deploying the sources. "
130                 "Sources are uploaded to the ${SRC} directory and code "
131                 "is uploaded to the ${APP_HOME} directory. \n"
132                 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
133                 "./configure && make && make clean.\n"
134                 "Make sure to make the build commands return with a nonzero exit "
135                 "code on error.",
136                 flags = Flags.ReadOnly)
137         install = Attribute("install", 
138                 "Commands to transfer built files to their final destinations. "
139                 "Install commands are executed after build commands. ",
140                 flags = Flags.ReadOnly)
141         stdin = Attribute("stdin", "Standard input for the 'command'", 
142                 flags = Flags.ExecReadOnly)
143         tear_down = Attribute("tearDown", "Command to be executed just before " 
144                 "releasing the resource", 
145                 flags = Flags.ReadOnly)
146
147         cls._register_attribute(command)
148         cls._register_attribute(forward_x11)
149         cls._register_attribute(env)
150         cls._register_attribute(sudo)
151         cls._register_attribute(depends)
152         cls._register_attribute(sources)
153         cls._register_attribute(code)
154         cls._register_attribute(files)
155         cls._register_attribute(bins)
156         cls._register_attribute(libs)
157         cls._register_attribute(build)
158         cls._register_attribute(install)
159         cls._register_attribute(stdin)
160         cls._register_attribute(tear_down)
161
162     @classmethod
163     def _register_traces(cls):
164         stdout = Trace("stdout", "Standard output stream")
165         stderr = Trace("stderr", "Standard error stream")
166
167         cls._register_trace(stdout)
168         cls._register_trace(stderr)
169
170     def __init__(self, ec, guid):
171         super(LinuxApplication, self).__init__(ec, guid)
172         self._pid = None
173         self._ppid = None
174         self._home = "app-%s" % self.guid
175         # whether the command should run in foreground attached
176         # to a terminal
177         self._in_foreground = False
178
179         # whether to use sudo to kill the application process
180         self._sudo_kill = False
181
182         # keep a reference to the running process handler when 
183         # the command is not executed as remote daemon in background
184         self._proc = None
185
186         # timestamp of last state check of the application
187         self._last_state_check = tnow()
188
189     def log_message(self, msg):
190         return " guid %d - host %s - %s " % (self.guid, 
191                 self.node.get("hostname"), msg)
192
193     @property
194     def node(self):
195         node = self.get_connected(LinuxNode.rtype())
196         if node: return node[0]
197         return None
198
199     @property
200     def app_home(self):
201         return os.path.join(self.node.exp_home, self._home)
202
203     @property
204     def run_home(self):
205         return os.path.join(self.app_home, self.ec.run_id)
206
207     @property
208     def pid(self):
209         return self._pid
210
211     @property
212     def ppid(self):
213         return self._ppid
214
215     @property
216     def in_foreground(self):
217         """ Returns True if the command needs to be executed in foreground.
218         This means that command will be executed using 'execute' instead of
219         'run' ('run' executes a command in background and detached from the 
220         terminal)
221         
222         When using X11 forwarding option, the command can not run in background
223         and detached from a terminal, since we need to keep the terminal attached 
224         to interact with it.
225         """
226         return self.get("forwardX11") or self._in_foreground
227
228     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
229         self.info("Retrieving '%s' trace %s " % (name, attr))
230
231         path = os.path.join(self.run_home, name)
232         
233         command = "(test -f %s && echo 'success') || echo 'error'" % path
234         (out, err), proc = self.node.execute(command)
235
236         if (err and proc.poll()) or out.find("error") != -1:
237             msg = " Couldn't find trace %s " % name
238             self.error(msg, out, err)
239             return None
240     
241         if attr == TraceAttr.PATH:
242             return path
243
244         if attr == TraceAttr.ALL:
245             (out, err), proc = self.node.check_output(self.run_home, name)
246             
247             if proc.poll():
248                 msg = " Couldn't read trace %s " % name
249                 self.error(msg, out, err)
250                 return None
251
252             return out
253
254         if attr == TraceAttr.STREAM:
255             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
256         elif attr == TraceAttr.SIZE:
257             cmd = "stat -c%%s %s " % path
258
259         (out, err), proc = self.node.execute(cmd)
260
261         if proc.poll():
262             msg = " Couldn't find trace %s " % name
263             self.error(msg, out, err)
264             return None
265         
266         if attr == TraceAttr.SIZE:
267             out = int(out.strip())
268
269         return out
270             
271     def provision(self):
272         # create run dir for application
273         self.node.mkdir(self.run_home)
274    
275         # List of all the provision methods to invoke
276         steps = [
277             # upload sources
278             self.upload_sources,
279             # upload files
280             self.upload_files,
281             # upload binaries
282             self.upload_binaries,
283             # upload libraries
284             self.upload_libraries,
285             # upload code
286             self.upload_code,
287             # upload stdin
288             self.upload_stdin,
289             # install dependencies
290             self.install_dependencies,
291             # build
292             self.build,
293             # Install
294             self.install]
295
296         command = []
297
298         # Since provisioning takes a long time, before
299         # each step we check that the EC is still 
300         for step in steps:
301             if self.ec.finished:
302                 raise RuntimeError, "EC finished"
303             
304             ret = step()
305             if ret:
306                 command.append(ret)
307
308         # upload deploy script
309         deploy_command = ";".join(command)
310         self.execute_deploy_command(deploy_command)
311
312         # upload start script
313         self.upload_start_command()
314        
315         self.info("Provisioning finished")
316
317         super(LinuxApplication, self).provision()
318
319     def upload_start_command(self):
320         # Upload command to remote bash script
321         # - only if command can be executed in background and detached
322         command = self.get("command")
323
324         if command and not self.in_foreground:
325             self.info("Uploading command '%s'" % command)
326
327             # replace application specific paths in the command
328             command = self.replace_paths(command)
329             
330             # replace application specific paths in the environment
331             env = self.get("env")
332             env = env and self.replace_paths(env)
333
334             shfile = os.path.join(self.app_home, "start.sh")
335
336             self.node.upload_command(command, 
337                     shfile = shfile,
338                     env = env,
339                     overwrite = False)
340
341     def execute_deploy_command(self, command):
342         if command:
343             # Upload the command to a bash script and run it
344             # in background ( but wait until the command has
345             # finished to continue )
346             shfile = os.path.join(self.app_home, "deploy.sh")
347             self.node.run_and_wait(command, self.run_home,
348                     shfile = shfile, 
349                     overwrite = False,
350                     pidfile = "deploy_pidfile", 
351                     ecodefile = "deploy_exitcode", 
352                     stdout = "deploy_stdout", 
353                     stderr = "deploy_stderr")
354
355     def upload_sources(self):
356         sources = self.get("sources")
357    
358         command = ""
359
360         if sources:
361             self.info("Uploading sources ")
362
363             sources = sources.split(' ')
364
365             # Separate sources that should be downloaded from 
366             # the web, from sources that should be uploaded from
367             # the local machine
368             command = []
369             for source in list(sources):
370                 if source.startswith("http") or source.startswith("https"):
371                     # remove the hhtp source from the sources list
372                     sources.remove(source)
373
374                     command.append( " ( " 
375                             # Check if the source already exists
376                             " ls ${SRC}/%(basename)s "
377                             " || ( "
378                             # If source doesn't exist, download it and check
379                             # that it it downloaded ok
380                             "   wget -c --directory-prefix=${SRC} %(source)s && "
381                             "   ls ${SRC}/%(basename)s "
382                             " ) ) " % {
383                                 "basename": os.path.basename(source),
384                                 "source": source
385                                 })
386
387             command = " && ".join(command)
388
389             # replace application specific paths in the command
390             command = self.replace_paths(command)
391        
392             if sources:
393                 sources = ' '.join(sources)
394                 self.node.upload(sources, self.node.src_dir, overwrite = False)
395
396         return command
397
398     def upload_files(self):
399         files = self.get("files")
400
401         if files:
402             self.info("Uploading files %s " % files)
403             self.node.upload(files, self.node.share_dir, overwrite = False)
404
405     def upload_libraries(self):
406         libs = self.get("libs")
407
408         if libs:
409             self.info("Uploading libraries %s " % libaries)
410             self.node.upload(libs, self.node.lib_dir, overwrite = False)
411
412     def upload_binaries(self):
413         bins = self.get("bins")
414
415         if bins:
416             self.info("Uploading binaries %s " % binaries)
417             self.node.upload(bins, self.node.bin_dir, overwrite = False)
418
419     def upload_code(self):
420         code = self.get("code")
421
422         if code:
423             self.info("Uploading code")
424
425             dst = os.path.join(self.app_home, "code")
426             self.node.upload(code, dst, overwrite = False, text = True)
427
428     def upload_stdin(self):
429         stdin = self.get("stdin")
430         if stdin:
431             # create dir for sources
432             self.info("Uploading stdin")
433             
434             # upload stdin file to ${SHARE_DIR} directory
435             basename = os.path.basename(stdin)
436             dst = os.path.join(self.node.share_dir, basename)
437             self.node.upload(stdin, dst, overwrite = False, text = True)
438
439             # create "stdin" symlink on ${APP_HOME} directory
440             command = "( cd %(app_home)s ; [ ! -f stdin ] &&  ln -s %(stdin)s stdin )" % ({
441                 "app_home": self.app_home, 
442                 "stdin": dst })
443
444             return command
445
446     def install_dependencies(self):
447         depends = self.get("depends")
448         if depends:
449             self.info("Installing dependencies %s" % depends)
450             return self.node.install_packages_command(depends)
451
452     def build(self):
453         build = self.get("build")
454
455         if build:
456             self.info("Building sources ")
457             
458             # replace application specific paths in the command
459             return self.replace_paths(build)
460
461     def install(self):
462         install = self.get("install")
463
464         if install:
465             self.info("Installing sources ")
466
467             # replace application specific paths in the command
468             return self.replace_paths(install)
469
470     def deploy(self):
471         # Wait until node is associated and deployed
472         node = self.node
473         if not node or node.state < ResourceState.READY:
474             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
475             self.ec.schedule(reschedule_delay, self.deploy)
476         else:
477             try:
478                 command = self.get("command") or ""
479                 self.info("Deploying command '%s' " % command)
480                 self.discover()
481                 self.provision()
482             except:
483                 self.fail()
484                 raise
485
486             super(LinuxApplication, self).deploy()
487     
488     def start(self):
489         command = self.get("command")
490
491         self.info("Starting command '%s'" % command)
492
493         if not command:
494             # If no command was given (i.e. Application was used for dependency
495             # installation), then the application is directly marked as FINISHED
496             self.set_finished()
497         else:
498
499             if self.in_foreground:
500                 self._run_in_foreground()
501             else:
502                 self._run_in_background()
503
504             super(LinuxApplication, self).start()
505
506     def _run_in_foreground(self):
507         command = self.get("command")
508         sudo = self.get("sudo") or False
509         x11 = self.get("forwardX11")
510
511         # For a command being executed in foreground, if there is stdin,
512         # it is expected to be text string not a file or pipe
513         stdin = self.get("stdin") or None
514
515         # Command will be launched in foreground and attached to the
516         # terminal using the node 'execute' in non blocking mode.
517
518         # We save the reference to the process in self._proc 
519         # to be able to kill the process from the stop method.
520         # We also set blocking = False, since we don't want the
521         # thread to block until the execution finishes.
522         (out, err), self._proc = self.execute_command(self, command, 
523                 env = env,
524                 sudo = sudo,
525                 stdin = stdin,
526                 forward_x11 = x11,
527                 blocking = False)
528
529         if self._proc.poll():
530             self.fail()
531             self.error(msg, out, err)
532             raise RuntimeError, msg
533
534     def _run_in_background(self):
535         command = self.get("command")
536         env = self.get("env")
537         sudo = self.get("sudo") or False
538
539         stdout = "stdout"
540         stderr = "stderr"
541         stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
542                 else None
543
544         # Command will be run as a daemon in baground and detached from any
545         # terminal.
546         # The command to run was previously uploaded to a bash script
547         # during deployment, now we launch the remote script using 'run'
548         # method from the node.
549         cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
550         (out, err), proc = self.node.run(cmd, self.run_home, 
551             stdin = stdin, 
552             stdout = stdout,
553             stderr = stderr,
554             sudo = sudo)
555
556         # check if execution errors occurred
557         msg = " Failed to start command '%s' " % command
558         
559         if proc.poll():
560             self.fail()
561             self.error(msg, out, err)
562             raise RuntimeError, msg
563     
564         # Wait for pid file to be generated
565         pid, ppid = self.node.wait_pid(self.run_home)
566         if pid: self._pid = int(pid)
567         if ppid: self._ppid = int(ppid)
568
569         # If the process is not running, check for error information
570         # on the remote machine
571         if not self.pid or not self.ppid:
572             (out, err), proc = self.node.check_errors(self.run_home,
573                     stderr = stderr) 
574
575             # Out is what was written in the stderr file
576             if err:
577                 self.fail()
578                 msg = " Failed to start command '%s' " % command
579                 self.error(msg, out, err)
580                 raise RuntimeError, msg
581         
582     def stop(self):
583         """ Stops application execution
584         """
585         command = self.get('command') or ''
586
587         if self.state == ResourceState.STARTED:
588         
589             self.info("Stopping command '%s' " % command)
590         
591             # If the command is running in foreground (it was launched using
592             # the node 'execute' method), then we use the handler to the Popen
593             # process to kill it. Else we send a kill signal using the pid and ppid
594             # retrieved after running the command with the node 'run' method
595             if self._proc:
596                 self._proc.kill()
597             else:
598                 # Only try to kill the process if the pid and ppid
599                 # were retrieved
600                 if self.pid and self.ppid:
601                     (out, err), proc = self.node.kill(self.pid, self.ppid,
602                             sudo = self._sudo_kill)
603
604                     # TODO: check if execution errors occurred
605                     if proc.poll() or err:
606                         msg = " Failed to STOP command '%s' " % self.get("command")
607                         self.error(msg, out, err)
608                         self.fail()
609         
610         if self.state == ResourceState.STARTED:
611             super(LinuxApplication, self).stop()
612
613     def release(self):
614         self.info("Releasing resource")
615
616         tear_down = self.get("tearDown")
617         if tear_down:
618             self.node.execute(tear_down)
619
620         self.stop()
621
622         if self.state != ResourceState.FAILED:
623             self.info("Resource released")
624
625             super(LinuxApplication, self).release()
626    
627     @property
628     def state(self):
629         """ Returns the state of the application
630         """
631         if self._state == ResourceState.STARTED:
632             if self.in_foreground:
633                 # Check if the process we used to execute the command
634                 # is still running ...
635                 retcode = self._proc.poll()
636
637                 # retcode == None -> running
638                 # retcode > 0 -> error
639                 # retcode == 0 -> finished
640                 if retcode:
641                     out = ""
642                     msg = " Failed to execute command '%s'" % self.get("command")
643                     err = self._proc.stderr.read()
644                     self.error(msg, out, err)
645                     self.fail()
646
647                 elif retcode == 0:
648                     self.finish()
649             else:
650                 # We need to query the status of the command we launched in 
651                 # background. In order to avoid overwhelming the remote host and
652                 # the local processor with too many ssh queries, the state is only
653                 # requested every 'state_check_delay' seconds.
654                 state_check_delay = 0.5
655                 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
656                     if self.pid and self.ppid:
657                         # Make sure the process is still running in background
658                         status = self.node.status(self.pid, self.ppid)
659
660                         if status == ProcStatus.FINISHED:
661                             # If the program finished, check if execution
662                             # errors occurred
663                             (out, err), proc = self.node.check_errors(
664                                     self.run_home)
665
666                             if err:
667                                 msg = "Failed to execute command '%s'" % \
668                                         self.get("command")
669                                 self.error(msg, out, err)
670                                 self.fail()
671                             else:
672                                 self.finish()
673
674                     self._last_state_check = tnow()
675
676         return self._state
677
678     def execute_command(self, command, 
679             env = None,
680             sudo = False,
681             stdin = None,
682             forward_x11 = False,
683             blocking = False):
684
685         environ = ""
686         if env:
687             environ = self.node.format_environment(env, inline = True)
688         command = environ + command
689         command = self.replace_paths(command)
690
691         return self.node.execute(command,
692                 sudo = sudo,
693                 stdin = stdin,
694                 forward_x11 = forward_x11,
695                 blocking = blocking)
696
697     def replace_paths(self, command):
698         """
699         Replace all special path tags with shell-escaped actual paths.
700         """
701         return ( command
702             .replace("${USR}", self.node.usr_dir)
703             .replace("${LIB}", self.node.lib_dir)
704             .replace("${BIN}", self.node.bin_dir)
705             .replace("${SRC}", self.node.src_dir)
706             .replace("${SHARE}", self.node.share_dir)
707             .replace("${EXP}", self.node.exp_dir)
708             .replace("${EXP_HOME}", self.node.exp_home)
709             .replace("${APP_HOME}", self.app_home)
710             .replace("${RUN_HOME}", self.run_home)
711             .replace("${NODE_HOME}", self.node.node_home)
712             .replace("${HOME}", self.node.home_dir)
713             )
714
715     def valid_connection(self, guid):
716         # TODO: Validate!
717         return True
718