Adding help and background class atrributes to ResourceManager
[nepi.git] / src / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
23         reschedule_delay
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sshfuncs import ProcStatus
26 from nepi.util.timefuncs import tnow, tdiffsec
27
28 import os
29 import subprocess
30
31 # TODO: Resolve wildcards in commands!!
32 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
33
34 @clsinit
35 class LinuxApplication(ResourceManager):
36     """
37     .. class:: Class Args :
38       
39         :param ec: The Experiment controller
40         :type ec: ExperimentController
41         :param guid: guid of the RM
42         :type guid: int
43
44     .. note::
45
46     A LinuxApplication RM represents a process that can be executed in
47     a remote Linux host using SSH.
48
49     The LinuxApplication RM takes care of uploadin sources and any files
50     needed to run the experiment, to the remote host. 
51     It also allows to provide source compilation (build) and installation 
52     instructions, and takes care of automating the sources build and 
53     installation tasks for the user.
54
55     It is important to note that files uploaded to the remote host have
56     two possible scopes: single-experiment or multi-experiment.
57     Single experiment files are those that will not be re-used by other 
58     experiments. Multi-experiment files are those that will.
59     Sources and shared files are always made available to all experiments.
60
61     Directory structure:
62
63     The directory structure used by LinuxApplication RM at the Linux
64     host is the following:
65
66         ${HOME}/nepi-usr --> Base directory for multi-experiment files
67                       |
68         ${LIB}        |- /lib --> Base directory for libraries
69         ${BIN}        |- /bin --> Base directory for binary files
70         ${SRC}        |- /src --> Base directory for sources
71         ${SHARE}      |- /share --> Base directory for other files
72
73         ${HOME}/nepi-exp --> Base directory for single-experiment files
74                       |
75         ${EXP_HOME}   |- /<exp-id>  --> Base directory for experiment exp-id
76                           |
77         ${APP_HOME}       |- /<app-guid> --> Base directory for application 
78                                |     specific files (e.g. command.sh, input)
79                                | 
80         ${RUN_HOME}            |- /<run-id> --> Base directory for run specific
81
82     """
83
84     _rtype = "LinuxApplication"
85     _help = "Runs an application on a Linux host with a BASH command "
86     _backend_type = "linux"
87
88
89     @classmethod
90     def _register_attributes(cls):
91         command = Attribute("command", "Command to execute at application start. "
92                 "Note that commands will be executed in the ${RUN_HOME} directory, "
93                 "make sure to take this into account when using relative paths. ", 
94                 flags = Flags.ExecReadOnly)
95         forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", 
96                 flags = Flags.ExecReadOnly)
97         env = Attribute("env", "Environment variables string for command execution",
98                 flags = Flags.ExecReadOnly)
99         sudo = Attribute("sudo", "Run with root privileges", 
100                 flags = Flags.ExecReadOnly)
101         depends = Attribute("depends", 
102                 "Space-separated list of packages required to run the application",
103                 flags = Flags.ExecReadOnly)
104         sources = Attribute("sources", 
105                 "Space-separated list of regular files to be uploaded to ${SRC} "
106                 "directory prior to building. Archives won't be expanded automatically. "
107                 "Sources are globally available for all experiments unless "
108                 "cleanHome is set to True (This will delete all sources). ",
109                 flags = Flags.ExecReadOnly)
110         files = Attribute("files", 
111                 "Space-separated list of regular miscellaneous files to be uploaded "
112                 "to ${SHARE} directory. "
113                 "Files are globally available for all experiments unless "
114                 "cleanHome is set to True (This will delete all files). ",
115                 flags = Flags.ExecReadOnly)
116         libs = Attribute("libs", 
117                 "Space-separated list of libraries (e.g. .so files) to be uploaded "
118                 "to ${LIB} directory. "
119                 "Libraries are globally available for all experiments unless "
120                 "cleanHome is set to True (This will delete all files). ",
121                 flags = Flags.ExecReadOnly)
122         bins = Attribute("bins", 
123                 "Space-separated list of binary files to be uploaded "
124                 "to ${BIN} directory. "
125                 "Binaries are globally available for all experiments unless "
126                 "cleanHome is set to True (This will delete all files). ",
127                 flags = Flags.ExecReadOnly)
128         code = Attribute("code", 
129                 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
130                 flags = Flags.ExecReadOnly)
131         build = Attribute("build", 
132                 "Build commands to execute after deploying the sources. "
133                 "Sources are uploaded to the ${SRC} directory and code "
134                 "is uploaded to the ${APP_HOME} directory. \n"
135                 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
136                 "./configure && make && make clean.\n"
137                 "Make sure to make the build commands return with a nonzero exit "
138                 "code on error.",
139                 flags = Flags.ReadOnly)
140         install = Attribute("install", 
141                 "Commands to transfer built files to their final destinations. "
142                 "Install commands are executed after build commands. ",
143                 flags = Flags.ReadOnly)
144         stdin = Attribute("stdin", "Standard input for the 'command'", 
145                 flags = Flags.ExecReadOnly)
146         tear_down = Attribute("tearDown", "Command to be executed just before " 
147                 "releasing the resource", 
148                 flags = Flags.ReadOnly)
149
150         cls._register_attribute(command)
151         cls._register_attribute(forward_x11)
152         cls._register_attribute(env)
153         cls._register_attribute(sudo)
154         cls._register_attribute(depends)
155         cls._register_attribute(sources)
156         cls._register_attribute(code)
157         cls._register_attribute(files)
158         cls._register_attribute(bins)
159         cls._register_attribute(libs)
160         cls._register_attribute(build)
161         cls._register_attribute(install)
162         cls._register_attribute(stdin)
163         cls._register_attribute(tear_down)
164
165     @classmethod
166     def _register_traces(cls):
167         stdout = Trace("stdout", "Standard output stream")
168         stderr = Trace("stderr", "Standard error stream")
169
170         cls._register_trace(stdout)
171         cls._register_trace(stderr)
172
173     def __init__(self, ec, guid):
174         super(LinuxApplication, self).__init__(ec, guid)
175         self._pid = None
176         self._ppid = None
177         self._home = "app-%s" % self.guid
178         # whether the command should run in foreground attached
179         # to a terminal
180         self._in_foreground = False
181
182         # whether to use sudo to kill the application process
183         self._sudo_kill = False
184
185         # keep a reference to the running process handler when 
186         # the command is not executed as remote daemon in background
187         self._proc = None
188
189         # timestamp of last state check of the application
190         self._last_state_check = tnow()
191
192     def log_message(self, msg):
193         return " guid %d - host %s - %s " % (self.guid, 
194                 self.node.get("hostname"), msg)
195
196     @property
197     def node(self):
198         node = self.get_connected(LinuxNode.rtype())
199         if node: return node[0]
200         return None
201
202     @property
203     def app_home(self):
204         return os.path.join(self.node.exp_home, self._home)
205
206     @property
207     def run_home(self):
208         return os.path.join(self.app_home, self.ec.run_id)
209
210     @property
211     def pid(self):
212         return self._pid
213
214     @property
215     def ppid(self):
216         return self._ppid
217
218     @property
219     def in_foreground(self):
220         """ Returns True if the command needs to be executed in foreground.
221         This means that command will be executed using 'execute' instead of
222         'run' ('run' executes a command in background and detached from the 
223         terminal)
224         
225         When using X11 forwarding option, the command can not run in background
226         and detached from a terminal, since we need to keep the terminal attached 
227         to interact with it.
228         """
229         return self.get("forwardX11") or self._in_foreground
230
231     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
232         self.info("Retrieving '%s' trace %s " % (name, attr))
233
234         path = os.path.join(self.run_home, name)
235         
236         command = "(test -f %s && echo 'success') || echo 'error'" % path
237         (out, err), proc = self.node.execute(command)
238
239         if (err and proc.poll()) or out.find("error") != -1:
240             msg = " Couldn't find trace %s " % name
241             self.error(msg, out, err)
242             return None
243     
244         if attr == TraceAttr.PATH:
245             return path
246
247         if attr == TraceAttr.ALL:
248             (out, err), proc = self.node.check_output(self.run_home, name)
249             
250             if proc.poll():
251                 msg = " Couldn't read trace %s " % name
252                 self.error(msg, out, err)
253                 return None
254
255             return out
256
257         if attr == TraceAttr.STREAM:
258             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
259         elif attr == TraceAttr.SIZE:
260             cmd = "stat -c%%s %s " % path
261
262         (out, err), proc = self.node.execute(cmd)
263
264         if proc.poll():
265             msg = " Couldn't find trace %s " % name
266             self.error(msg, out, err)
267             return None
268         
269         if attr == TraceAttr.SIZE:
270             out = int(out.strip())
271
272         return out
273             
274     def provision(self):
275         # create run dir for application
276         self.node.mkdir(self.run_home)
277    
278         # List of all the provision methods to invoke
279         steps = [
280             # upload sources
281             self.upload_sources,
282             # upload files
283             self.upload_files,
284             # upload binaries
285             self.upload_binaries,
286             # upload libraries
287             self.upload_libraries,
288             # upload code
289             self.upload_code,
290             # upload stdin
291             self.upload_stdin,
292             # install dependencies
293             self.install_dependencies,
294             # build
295             self.build,
296             # Install
297             self.install]
298
299         command = []
300
301         # Since provisioning takes a long time, before
302         # each step we check that the EC is still 
303         for step in steps:
304             if self.ec.finished:
305                 raise RuntimeError, "EC finished"
306             
307             ret = step()
308             if ret:
309                 command.append(ret)
310
311         # upload deploy script
312         deploy_command = ";".join(command)
313         self.execute_deploy_command(deploy_command)
314
315         # upload start script
316         self.upload_start_command()
317        
318         self.info("Provisioning finished")
319
320         super(LinuxApplication, self).provision()
321
322     def upload_start_command(self):
323         # Upload command to remote bash script
324         # - only if command can be executed in background and detached
325         command = self.get("command")
326
327         if command and not self.in_foreground:
328             self.info("Uploading command '%s'" % command)
329
330             # replace application specific paths in the command
331             command = self.replace_paths(command)
332             
333             # replace application specific paths in the environment
334             env = self.get("env")
335             env = env and self.replace_paths(env)
336
337             shfile = os.path.join(self.app_home, "start.sh")
338
339             self.node.upload_command(command, 
340                     shfile = shfile,
341                     env = env,
342                     overwrite = False)
343
344     def execute_deploy_command(self, command):
345         if command:
346             # Upload the command to a bash script and run it
347             # in background ( but wait until the command has
348             # finished to continue )
349             shfile = os.path.join(self.app_home, "deploy.sh")
350             self.node.run_and_wait(command, self.run_home,
351                     shfile = shfile, 
352                     overwrite = False,
353                     pidfile = "deploy_pidfile", 
354                     ecodefile = "deploy_exitcode", 
355                     stdout = "deploy_stdout", 
356                     stderr = "deploy_stderr")
357
358     def upload_sources(self):
359         sources = self.get("sources")
360    
361         command = ""
362
363         if sources:
364             self.info("Uploading sources ")
365
366             sources = sources.split(' ')
367
368             # Separate sources that should be downloaded from 
369             # the web, from sources that should be uploaded from
370             # the local machine
371             command = []
372             for source in list(sources):
373                 if source.startswith("http") or source.startswith("https"):
374                     # remove the hhtp source from the sources list
375                     sources.remove(source)
376
377                     command.append( " ( " 
378                             # Check if the source already exists
379                             " ls ${SRC}/%(basename)s "
380                             " || ( "
381                             # If source doesn't exist, download it and check
382                             # that it it downloaded ok
383                             "   wget -c --directory-prefix=${SRC} %(source)s && "
384                             "   ls ${SRC}/%(basename)s "
385                             " ) ) " % {
386                                 "basename": os.path.basename(source),
387                                 "source": source
388                                 })
389
390             command = " && ".join(command)
391
392             # replace application specific paths in the command
393             command = self.replace_paths(command)
394        
395             if sources:
396                 sources = ' '.join(sources)
397                 self.node.upload(sources, self.node.src_dir, overwrite = False)
398
399         return command
400
401     def upload_files(self):
402         files = self.get("files")
403
404         if files:
405             self.info("Uploading files %s " % files)
406             self.node.upload(files, self.node.share_dir, overwrite = False)
407
408     def upload_libraries(self):
409         libs = self.get("libs")
410
411         if libs:
412             self.info("Uploading libraries %s " % libaries)
413             self.node.upload(libs, self.node.lib_dir, overwrite = False)
414
415     def upload_binaries(self):
416         bins = self.get("bins")
417
418         if bins:
419             self.info("Uploading binaries %s " % binaries)
420             self.node.upload(bins, self.node.bin_dir, overwrite = False)
421
422     def upload_code(self):
423         code = self.get("code")
424
425         if code:
426             self.info("Uploading code")
427
428             dst = os.path.join(self.app_home, "code")
429             self.node.upload(code, dst, overwrite = False, text = True)
430
431     def upload_stdin(self):
432         stdin = self.get("stdin")
433         if stdin:
434             # create dir for sources
435             self.info("Uploading stdin")
436             
437             # upload stdin file to ${SHARE_DIR} directory
438             basename = os.path.basename(stdin)
439             dst = os.path.join(self.node.share_dir, basename)
440             self.node.upload(stdin, dst, overwrite = False, text = True)
441
442             # create "stdin" symlink on ${APP_HOME} directory
443             command = "( cd %(app_home)s ; [ ! -f stdin ] &&  ln -s %(stdin)s stdin )" % ({
444                 "app_home": self.app_home, 
445                 "stdin": dst })
446
447             return command
448
449     def install_dependencies(self):
450         depends = self.get("depends")
451         if depends:
452             self.info("Installing dependencies %s" % depends)
453             return self.node.install_packages_command(depends)
454
455     def build(self):
456         build = self.get("build")
457
458         if build:
459             self.info("Building sources ")
460             
461             # replace application specific paths in the command
462             return self.replace_paths(build)
463
464     def install(self):
465         install = self.get("install")
466
467         if install:
468             self.info("Installing sources ")
469
470             # replace application specific paths in the command
471             return self.replace_paths(install)
472
473     def deploy(self):
474         # Wait until node is associated and deployed
475         node = self.node
476         if not node or node.state < ResourceState.READY:
477             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
478             self.ec.schedule(reschedule_delay, self.deploy)
479         else:
480             try:
481                 command = self.get("command") or ""
482                 self.info("Deploying command '%s' " % command)
483                 self.discover()
484                 self.provision()
485             except:
486                 self.fail()
487                 raise
488
489             super(LinuxApplication, self).deploy()
490     
491     def start(self):
492         command = self.get("command")
493
494         self.info("Starting command '%s'" % command)
495
496         if not command:
497             # If no command was given (i.e. Application was used for dependency
498             # installation), then the application is directly marked as FINISHED
499             self.set_finished()
500         else:
501
502             if self.in_foreground:
503                 self._run_in_foreground()
504             else:
505                 self._run_in_background()
506
507             super(LinuxApplication, self).start()
508
509     def _run_in_foreground(self):
510         command = self.get("command")
511         sudo = self.get("sudo") or False
512         x11 = self.get("forwardX11")
513
514         # For a command being executed in foreground, if there is stdin,
515         # it is expected to be text string not a file or pipe
516         stdin = self.get("stdin") or None
517
518         # Command will be launched in foreground and attached to the
519         # terminal using the node 'execute' in non blocking mode.
520
521         # We save the reference to the process in self._proc 
522         # to be able to kill the process from the stop method.
523         # We also set blocking = False, since we don't want the
524         # thread to block until the execution finishes.
525         (out, err), self._proc = self.execute_command(self, command, 
526                 env = env,
527                 sudo = sudo,
528                 stdin = stdin,
529                 forward_x11 = x11,
530                 blocking = False)
531
532         if self._proc.poll():
533             self.fail()
534             self.error(msg, out, err)
535             raise RuntimeError, msg
536
537     def _run_in_background(self):
538         command = self.get("command")
539         env = self.get("env")
540         sudo = self.get("sudo") or False
541
542         stdout = "stdout"
543         stderr = "stderr"
544         stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
545                 else None
546
547         # Command will be run as a daemon in baground and detached from any
548         # terminal.
549         # The command to run was previously uploaded to a bash script
550         # during deployment, now we launch the remote script using 'run'
551         # method from the node.
552         cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
553         (out, err), proc = self.node.run(cmd, self.run_home, 
554             stdin = stdin, 
555             stdout = stdout,
556             stderr = stderr,
557             sudo = sudo)
558
559         # check if execution errors occurred
560         msg = " Failed to start command '%s' " % command
561         
562         if proc.poll():
563             self.fail()
564             self.error(msg, out, err)
565             raise RuntimeError, msg
566     
567         # Wait for pid file to be generated
568         pid, ppid = self.node.wait_pid(self.run_home)
569         if pid: self._pid = int(pid)
570         if ppid: self._ppid = int(ppid)
571
572         # If the process is not running, check for error information
573         # on the remote machine
574         if not self.pid or not self.ppid:
575             (out, err), proc = self.node.check_errors(self.run_home,
576                     stderr = stderr) 
577
578             # Out is what was written in the stderr file
579             if err:
580                 self.fail()
581                 msg = " Failed to start command '%s' " % command
582                 self.error(msg, out, err)
583                 raise RuntimeError, msg
584         
585     def stop(self):
586         """ Stops application execution
587         """
588         command = self.get('command') or ''
589
590         if self.state == ResourceState.STARTED:
591         
592             self.info("Stopping command '%s' " % command)
593         
594             # If the command is running in foreground (it was launched using
595             # the node 'execute' method), then we use the handler to the Popen
596             # process to kill it. Else we send a kill signal using the pid and ppid
597             # retrieved after running the command with the node 'run' method
598             if self._proc:
599                 self._proc.kill()
600             else:
601                 # Only try to kill the process if the pid and ppid
602                 # were retrieved
603                 if self.pid and self.ppid:
604                     (out, err), proc = self.node.kill(self.pid, self.ppid,
605                             sudo = self._sudo_kill)
606
607                     # TODO: check if execution errors occurred
608                     if proc.poll() or err:
609                         msg = " Failed to STOP command '%s' " % self.get("command")
610                         self.error(msg, out, err)
611                         self.fail()
612         
613         if self.state == ResourceState.STARTED:
614             super(LinuxApplication, self).stop()
615
616     def release(self):
617         self.info("Releasing resource")
618
619         tear_down = self.get("tearDown")
620         if tear_down:
621             self.node.execute(tear_down)
622
623         self.stop()
624
625         if self.state != ResourceState.FAILED:
626             self.info("Resource released")
627
628             super(LinuxApplication, self).release()
629    
630     @property
631     def state(self):
632         """ Returns the state of the application
633         """
634         if self._state == ResourceState.STARTED:
635             if self.in_foreground:
636                 # Check if the process we used to execute the command
637                 # is still running ...
638                 retcode = self._proc.poll()
639
640                 # retcode == None -> running
641                 # retcode > 0 -> error
642                 # retcode == 0 -> finished
643                 if retcode:
644                     out = ""
645                     msg = " Failed to execute command '%s'" % self.get("command")
646                     err = self._proc.stderr.read()
647                     self.error(msg, out, err)
648                     self.fail()
649
650                 elif retcode == 0:
651                     self.finish()
652             else:
653                 # We need to query the status of the command we launched in 
654                 # background. In order to avoid overwhelming the remote host and
655                 # the local processor with too many ssh queries, the state is only
656                 # requested every 'state_check_delay' seconds.
657                 state_check_delay = 0.5
658                 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
659                     if self.pid and self.ppid:
660                         # Make sure the process is still running in background
661                         status = self.node.status(self.pid, self.ppid)
662
663                         if status == ProcStatus.FINISHED:
664                             # If the program finished, check if execution
665                             # errors occurred
666                             (out, err), proc = self.node.check_errors(
667                                     self.run_home)
668
669                             if err:
670                                 msg = "Failed to execute command '%s'" % \
671                                         self.get("command")
672                                 self.error(msg, out, err)
673                                 self.fail()
674                             else:
675                                 self.finish()
676
677                     self._last_state_check = tnow()
678
679         return self._state
680
681     def execute_command(self, command, 
682             env = None,
683             sudo = False,
684             stdin = None,
685             forward_x11 = False,
686             blocking = False):
687
688         environ = ""
689         if env:
690             environ = self.node.format_environment(env, inline = True)
691         command = environ + command
692         command = self.replace_paths(command)
693
694         return self.node.execute(command,
695                 sudo = sudo,
696                 stdin = stdin,
697                 forward_x11 = forward_x11,
698                 blocking = blocking)
699
700     def replace_paths(self, command):
701         """
702         Replace all special path tags with shell-escaped actual paths.
703         """
704         return ( command
705             .replace("${USR}", self.node.usr_dir)
706             .replace("${LIB}", self.node.lib_dir)
707             .replace("${BIN}", self.node.bin_dir)
708             .replace("${SRC}", self.node.src_dir)
709             .replace("${SHARE}", self.node.share_dir)
710             .replace("${EXP}", self.node.exp_dir)
711             .replace("${EXP_HOME}", self.node.exp_home)
712             .replace("${APP_HOME}", self.app_home)
713             .replace("${RUN_HOME}", self.run_home)
714             .replace("${NODE_HOME}", self.node.node_home)
715             .replace("${HOME}", self.node.home_dir)
716             )
717
718     def valid_connection(self, guid):
719         # TODO: Validate!
720         return True
721