LinuxApplication: making single deploy.sh script out of http_sources.sh, build.sh...
[nepi.git] / src / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
23     reschedule_delay
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sshfuncs import ProcStatus
26 from nepi.util.timefuncs import tnow, tdiffsec
27
28 import os
29 import subprocess
30
31 # TODO: Resolve wildcards in commands!!
32 # TODO: During provisioning, everything that is not scp could be
33 #       uploaded to a same script, http_sources download, etc...
34 #       and like that require performing less ssh connections!!!
35 # TODO: Make stdin be a symlink to the original file in ${SHARE}
36 #       - later use md5sum to check wether the file needs to be re-upload
37
38
39 @clsinit
40 class LinuxApplication(ResourceManager):
41     """
42     .. class:: Class Args :
43       
44         :param ec: The Experiment controller
45         :type ec: ExperimentController
46         :param guid: guid of the RM
47         :type guid: int
48
49     .. note::
50
51     A LinuxApplication RM represents a process that can be executed in
52     a remote Linux host using SSH.
53
54     The LinuxApplication RM takes care of uploadin sources and any files
55     needed to run the experiment, to the remote host. 
56     It also allows to provide source compilation (build) and installation 
57     instructions, and takes care of automating the sources build and 
58     installation tasks for the user.
59
60     It is important to note that files uploaded to the remote host have
61     two possible scopes: single-experiment or multi-experiment.
62     Single experiment files are those that will not be re-used by other 
63     experiments. Multi-experiment files are those that will.
64     Sources and shared files are always made available to all experiments.
65
66     Directory structure:
67
68     The directory structure used by LinuxApplication RM at the Linux
69     host is the following:
70
71         ${HOME}/nepi-usr --> Base directory for multi-experiment files
72                       |
73         ${LIB}        |- /lib --> Base directory for libraries
74         ${BIN}        |- /bin --> Base directory for binary files
75         ${SRC}        |- /src --> Base directory for sources
76         ${SHARE}      |- /share --> Base directory for other files
77
78         ${HOME}/nepi-exp --> Base directory for single-experiment files
79                       |
80         ${EXP_HOME}   |- /<exp-id>  --> Base directory for experiment exp-id
81                           |
82         ${APP_HOME}       |- /<app-guid> --> Base directory for application 
83                                |     specific files (e.g. command.sh, input)
84                                | 
85         ${RUN_HOME}            |- /<run-id> --> Base directory for run specific
86
87     """
88
89     _rtype = "LinuxApplication"
90
91     @classmethod
92     def _register_attributes(cls):
93         command = Attribute("command", "Command to execute at application start. "
94                 "Note that commands will be executed in the ${RUN_HOME} directory, "
95                 "make sure to take this into account when using relative paths. ", 
96                 flags = Flags.ExecReadOnly)
97         forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", 
98                 flags = Flags.ExecReadOnly)
99         env = Attribute("env", "Environment variables string for command execution",
100                 flags = Flags.ExecReadOnly)
101         sudo = Attribute("sudo", "Run with root privileges", 
102                 flags = Flags.ExecReadOnly)
103         depends = Attribute("depends", 
104                 "Space-separated list of packages required to run the application",
105                 flags = Flags.ExecReadOnly)
106         sources = Attribute("sources", 
107                 "Space-separated list of regular files to be uploaded to ${SRC} "
108                 "directory prior to building. Archives won't be expanded automatically. "
109                 "Sources are globally available for all experiments unless "
110                 "cleanHome is set to True (This will delete all sources). ",
111                 flags = Flags.ExecReadOnly)
112         files = Attribute("files", 
113                 "Space-separated list of regular miscellaneous files to be uploaded "
114                 "to ${SHARE} directory. "
115                 "Files are globally available for all experiments unless "
116                 "cleanHome is set to True (This will delete all files). ",
117                 flags = Flags.ExecReadOnly)
118         libs = Attribute("libs", 
119                 "Space-separated list of libraries (e.g. .so files) to be uploaded "
120                 "to ${LIB} directory. "
121                 "Libraries are globally available for all experiments unless "
122                 "cleanHome is set to True (This will delete all files). ",
123                 flags = Flags.ExecReadOnly)
124         bins = Attribute("bins", 
125                 "Space-separated list of binary files to be uploaded "
126                 "to ${BIN} directory. "
127                 "Binaries are globally available for all experiments unless "
128                 "cleanHome is set to True (This will delete all files). ",
129                 flags = Flags.ExecReadOnly)
130         code = Attribute("code", 
131                 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
132                 flags = Flags.ExecReadOnly)
133         build = Attribute("build", 
134                 "Build commands to execute after deploying the sources. "
135                 "Sources are uploaded to the ${SRC} directory and code "
136                 "is uploaded to the ${APP_HOME} directory. \n"
137                 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
138                 "./configure && make && make clean.\n"
139                 "Make sure to make the build commands return with a nonzero exit "
140                 "code on error.",
141                 flags = Flags.ReadOnly)
142         install = Attribute("install", 
143                 "Commands to transfer built files to their final destinations. "
144                 "Install commands are executed after build commands. ",
145                 flags = Flags.ReadOnly)
146         stdin = Attribute("stdin", "Standard input for the 'command'", 
147                 flags = Flags.ExecReadOnly)
148         tear_down = Attribute("tearDown", "Command to be executed just before " 
149                 "releasing the resource", 
150                 flags = Flags.ReadOnly)
151
152         cls._register_attribute(command)
153         cls._register_attribute(forward_x11)
154         cls._register_attribute(env)
155         cls._register_attribute(sudo)
156         cls._register_attribute(depends)
157         cls._register_attribute(sources)
158         cls._register_attribute(code)
159         cls._register_attribute(files)
160         cls._register_attribute(bins)
161         cls._register_attribute(libs)
162         cls._register_attribute(build)
163         cls._register_attribute(install)
164         cls._register_attribute(stdin)
165         cls._register_attribute(tear_down)
166
167     @classmethod
168     def _register_traces(cls):
169         stdout = Trace("stdout", "Standard output stream")
170         stderr = Trace("stderr", "Standard error stream")
171
172         cls._register_trace(stdout)
173         cls._register_trace(stderr)
174
175     def __init__(self, ec, guid):
176         super(LinuxApplication, self).__init__(ec, guid)
177         self._pid = None
178         self._ppid = None
179         self._home = "app-%s" % self.guid
180         self._in_foreground = False
181
182         # keep a reference to the running process handler when 
183         # the command is not executed as remote daemon in background
184         self._proc = None
185
186         # timestamp of last state check of the application
187         self._last_state_check = tnow()
188
189     def log_message(self, msg):
190         return " guid %d - host %s - %s " % (self.guid, 
191                 self.node.get("hostname"), msg)
192
193     @property
194     def node(self):
195         node = self.get_connected(LinuxNode.rtype())
196         if node: return node[0]
197         return None
198
199     @property
200     def app_home(self):
201         return os.path.join(self.node.exp_home, self._home)
202
203     @property
204     def run_home(self):
205         return os.path.join(self.app_home, self.ec.run_id)
206
207     @property
208     def pid(self):
209         return self._pid
210
211     @property
212     def ppid(self):
213         return self._ppid
214
215     @property
216     def in_foreground(self):
217         """ Returns True if the command needs to be executed in foreground.
218         This means that command will be executed using 'execute' instead of
219         'run' ('run' executes a command in background and detached from the 
220         terminal)
221         
222         When using X11 forwarding option, the command can not run in background
223         and detached from a terminal, since we need to keep the terminal attached 
224         to interact with it.
225         """
226         return self.get("forwardX11") or self._in_foreground
227
228     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
229         self.info("Retrieving '%s' trace %s " % (name, attr))
230
231         path = os.path.join(self.run_home, name)
232         
233         command = "(test -f %s && echo 'success') || echo 'error'" % path
234         (out, err), proc = self.node.execute(command)
235
236         if (err and proc.poll()) or out.find("error") != -1:
237             msg = " Couldn't find trace %s " % name
238             self.error(msg, out, err)
239             return None
240     
241         if attr == TraceAttr.PATH:
242             return path
243
244         if attr == TraceAttr.ALL:
245             (out, err), proc = self.node.check_output(self.run_home, name)
246             
247             if err and proc.poll():
248                 msg = " Couldn't read trace %s " % name
249                 self.error(msg, out, err)
250                 return None
251
252             return out
253
254         if attr == TraceAttr.STREAM:
255             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
256         elif attr == TraceAttr.SIZE:
257             cmd = "stat -c%%s %s " % path
258
259         (out, err), proc = self.node.execute(cmd)
260
261         if err and proc.poll():
262             msg = " Couldn't find trace %s " % name
263             self.error(msg, out, err)
264             return None
265         
266         if attr == TraceAttr.SIZE:
267             out = int(out.strip())
268
269         return out
270             
271     def provision(self):
272         # create run dir for application
273         self.node.mkdir(self.run_home)
274    
275         # List of all the provision methods to invoke
276         steps = [
277             # upload sources
278             self.upload_sources,
279             # upload files
280             self.upload_files,
281             # upload binaries
282             self.upload_binaries,
283             # upload libraries
284             self.upload_libraries,
285             # upload code
286             self.upload_code,
287             # upload stdin
288             self.upload_stdin,
289             # install dependencies
290             self.install_dependencies,
291             # build
292             self.build,
293             # Install
294             self.install]
295
296         command = []
297
298         # Since provisioning takes a long time, before
299         # each step we check that the EC is still 
300         for step in steps:
301             if self.ec.finished:
302                 raise RuntimeError, "EC finished"
303             
304             ret = step()
305             if ret:
306                 command.append(ret)
307
308         # upload deploy script
309         deploy_command = ";".join(command)
310         self.execute_deploy_command(deploy_command)
311
312         # upload start script
313         self.upload_start_command()
314        
315         self.info("Provisioning finished")
316
317         super(LinuxApplication, self).provision()
318
319     def upload_start_command(self):
320         # Upload command to remote bash script
321         # - only if command can be executed in background and detached
322         command = self.get("command")
323
324         if command and not self.in_foreground:
325             self.info("Uploading command '%s'" % command)
326
327             # replace application specific paths in the command
328             command = self.replace_paths(command)
329             
330             # replace application specific paths in the environment
331             env = self.get("env")
332             env = env and self.replace_paths(env)
333
334             shfile = os.path.join(self.app_home, "start.sh")
335
336             self.node.upload_command(command, 
337                     shfile = shfile,
338                     env = env)
339
340     def execute_deploy_command(self, command):
341         if command:
342             # Upload the command to a bash script and run it
343             # in background ( but wait until the command has
344             # finished to continue )
345             shfile = os.path.join(self.app_home, "deploy.sh")
346             self.node.run_and_wait(command, self.run_home,
347                     shfile = shfile, 
348                     overwrite = False,
349                     pidfile = "deploy_pidfile", 
350                     ecodefile = "deploy_exitcode", 
351                     stdout = "deploy_stdout", 
352                     stderr = "deploy_stderr")
353
354     def upload_sources(self):
355         sources = self.get("sources")
356    
357         command = ""
358
359         if sources:
360             self.info("Uploading sources ")
361
362             sources = sources.split(' ')
363
364             # Separate sources that should be downloaded from 
365             # the web, from sources that should be uploaded from
366             # the local machine
367             command = []
368             for source in list(sources):
369                 if source.startswith("http") or source.startswith("https"):
370                     # remove the hhtp source from the sources list
371                     sources.remove(source)
372
373                     command.append( " ( " 
374                             # Check if the source already exists
375                             " ls ${SRC}/%(basename)s "
376                             " || ( "
377                             # If source doesn't exist, download it and check
378                             # that it it downloaded ok
379                             "   wget -c --directory-prefix=${SRC} %(source)s && "
380                             "   ls ${SRC}/%(basename)s "
381                             " ) ) " % {
382                                 "basename": os.path.basename(source),
383                                 "source": source
384                                 })
385
386             command = " && ".join(command)
387
388             # replace application specific paths in the command
389             command = self.replace_paths(command)
390        
391             if sources:
392                 sources = ' '.join(sources)
393                 self.node.upload(sources, self.node.src_dir, overwrite = False)
394
395         return command
396
397     def upload_files(self):
398         files = self.get("files")
399
400         if files:
401             self.info("Uploading files %s " % files)
402             self.node.upload(files, self.node.share_dir, overwrite = False)
403
404     def upload_libraries(self):
405         libs = self.get("libs")
406
407         if libs:
408             self.info("Uploading libraries %s " % libaries)
409             self.node.upload(libs, self.node.lib_dir, overwrite = False)
410
411     def upload_binaries(self):
412         bins = self.get("bins")
413
414         if bins:
415             self.info("Uploading binaries %s " % binaries)
416             self.node.upload(bins, self.node.bin_dir, overwrite = False)
417
418     def upload_code(self):
419         code = self.get("code")
420
421         if code:
422             self.info("Uploading code")
423
424             dst = os.path.join(self.app_home, "code")
425             self.node.upload(code, dst, overwrite = False, text = True)
426
427     def upload_stdin(self):
428         stdin = self.get("stdin")
429         if stdin:
430             # create dir for sources
431             self.info("Uploading stdin")
432             
433             dst = os.path.join(self.app_home, "stdin")
434             self.node.upload(stdin, dst, overwrite = False, text = True)
435
436     def install_dependencies(self):
437         depends = self.get("depends")
438         if depends:
439             self.info("Installing dependencies %s" % depends)
440             self.node.install_packages(depends, self.app_home, self.run_home)
441
442     def build(self):
443         build = self.get("build")
444
445         if build:
446             self.info("Building sources ")
447             
448             # replace application specific paths in the command
449             return self.replace_paths(build)
450
451     def install(self):
452         install = self.get("install")
453
454         if install:
455             self.info("Installing sources ")
456
457             # replace application specific paths in the command
458             return self.replace_paths(install)
459
460     def deploy(self):
461         # Wait until node is associated and deployed
462         node = self.node
463         if not node or node.state < ResourceState.READY:
464             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
465             self.ec.schedule(reschedule_delay, self.deploy)
466         else:
467             try:
468                 command = self.get("command") or ""
469                 self.info("Deploying command '%s' " % command)
470                 self.discover()
471                 self.provision()
472             except:
473                 self.fail()
474                 raise
475
476             super(LinuxApplication, self).deploy()
477
478     def start(self):
479         command = self.get("command")
480
481         self.info("Starting command '%s'" % command)
482
483         if not command:
484             # If no command was given (i.e. Application was used for dependency
485             # installation), then the application is directly marked as FINISHED
486             self._state = ResourceState.FINISHED
487         else:
488
489             if self.in_foreground:
490                 self._start_in_foreground()
491             else:
492                 self._start_in_background()
493
494             super(LinuxApplication, self).start()
495
496     def _start_in_foreground(self):
497         command = self.get("command")
498         sudo = self.get("sudo") or False
499         x11 = self.get("forwardX11")
500
501         # For a command being executed in foreground, if there is stdin,
502         # it is expected to be text string not a file or pipe
503         stdin = self.get("stdin") or None
504
505         # Command will be launched in foreground and attached to the
506         # terminal using the node 'execute' in non blocking mode.
507
508         # Export environment
509         env = self.get("env")
510         environ = self.node.format_environment(env, inline = True)
511         command = environ + command
512         command = self.replace_paths(command)
513
514         # We save the reference to the process in self._proc 
515         # to be able to kill the process from the stop method.
516         # We also set blocking = False, since we don't want the
517         # thread to block until the execution finishes.
518         (out, err), self._proc = self.node.execute(command,
519                 sudo = sudo,
520                 stdin = stdin,
521                 forward_x11 = x11,
522                 blocking = False)
523
524         if self._proc.poll():
525             self.fail()
526             self.error(msg, out, err)
527             raise RuntimeError, msg
528
529     def _start_in_background(self):
530         command = self.get("command")
531         env = self.get("env")
532         sudo = self.get("sudo") or False
533
534         stdout = "stdout"
535         stderr = "stderr"
536         stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
537                 else None
538
539         # Command will be run as a daemon in baground and detached from any
540         # terminal.
541         # The command to run was previously uploaded to a bash script
542         # during deployment, now we launch the remote script using 'run'
543         # method from the node.
544         cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
545         (out, err), proc = self.node.run(cmd, self.run_home, 
546             stdin = stdin, 
547             stdout = stdout,
548             stderr = stderr,
549             sudo = sudo)
550
551         # check if execution errors occurred
552         msg = " Failed to start command '%s' " % command
553         
554         if proc.poll():
555             self.fail()
556             self.error(msg, out, err)
557             raise RuntimeError, msg
558     
559         # Wait for pid file to be generated
560         pid, ppid = self.node.wait_pid(self.run_home)
561         if pid: self._pid = int(pid)
562         if ppid: self._ppid = int(ppid)
563
564         # If the process is not running, check for error information
565         # on the remote machine
566         if not self.pid or not self.ppid:
567             (out, err), proc = self.node.check_errors(self.run_home,
568                     stderr = stderr) 
569
570             # Out is what was written in the stderr file
571             if err:
572                 self.fail()
573                 msg = " Failed to start command '%s' " % command
574                 self.error(msg, out, err)
575                 raise RuntimeError, msg
576         
577     def stop(self):
578         """ Stops application execution
579         """
580         command = self.get('command') or ''
581
582         if self.state == ResourceState.STARTED:
583             stopped = True
584
585             self.info("Stopping command '%s'" % command)
586         
587             # If the command is running in foreground (it was launched using
588             # the node 'execute' method), then we use the handler to the Popen
589             # process to kill it. Else we send a kill signal using the pid and ppid
590             # retrieved after running the command with the node 'run' method
591
592             if self._proc:
593                 self._proc.kill()
594             else:
595                 # Only try to kill the process if the pid and ppid
596                 # were retrieved
597                 if self.pid and self.ppid:
598                     (out, err), proc = self.node.kill(self.pid, self.ppid)
599
600                     if out or err:
601                         # check if execution errors occurred
602                         msg = " Failed to STOP command '%s' " % self.get("command")
603                         self.error(msg, out, err)
604                         self.fail()
605                         stopped = False
606
607             if stopped:
608                 super(LinuxApplication, self).stop()
609
610     def release(self):
611         self.info("Releasing resource")
612
613         tear_down = self.get("tearDown")
614         if tear_down:
615             self.node.execute(tear_down)
616
617         self.stop()
618
619         if self.state == ResourceState.STOPPED:
620             super(LinuxApplication, self).release()
621     
622     @property
623     def state(self):
624         """ Returns the state of the application
625         """
626         if self._state == ResourceState.STARTED:
627             if self.in_foreground:
628                 # Check if the process we used to execute the command
629                 # is still running ...
630                 retcode = self._proc.poll()
631
632                 # retcode == None -> running
633                 # retcode > 0 -> error
634                 # retcode == 0 -> finished
635                 if retcode:
636                     out = ""
637                     msg = " Failed to execute command '%s'" % self.get("command")
638                     err = self._proc.stderr.read()
639                     self.error(msg, out, err)
640                     self.fail()
641                 elif retcode == 0:
642                     self._state = ResourceState.FINISHED
643
644             else:
645                 # We need to query the status of the command we launched in 
646                 # background. In oredr to avoid overwhelming the remote host and
647                 # the local processor with too many ssh queries, the state is only
648                 # requested every 'state_check_delay' seconds.
649                 state_check_delay = 0.5
650                 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
651                     # check if execution errors occurred
652                     (out, err), proc = self.node.check_errors(self.run_home)
653
654                     if err:
655                         msg = " Failed to execute command '%s'" % self.get("command")
656                         self.error(msg, out, err)
657                         self.fail()
658
659                     elif self.pid and self.ppid:
660                         # No execution errors occurred. Make sure the background
661                         # process with the recorded pid is still running.
662                         status = self.node.status(self.pid, self.ppid)
663
664                         if status == ProcStatus.FINISHED:
665                             self._state = ResourceState.FINISHED
666
667                     self._last_state_check = tnow()
668
669         return self._state
670
671     def replace_paths(self, command):
672         """
673         Replace all special path tags with shell-escaped actual paths.
674         """
675         return ( command
676             .replace("${USR}", self.node.usr_dir)
677             .replace("${LIB}", self.node.lib_dir)
678             .replace("${BIN}", self.node.bin_dir)
679             .replace("${SRC}", self.node.src_dir)
680             .replace("${SHARE}", self.node.share_dir)
681             .replace("${EXP}", self.node.exp_dir)
682             .replace("${EXP_HOME}", self.node.exp_home)
683             .replace("${APP_HOME}", self.app_home)
684             .replace("${RUN_HOME}", self.run_home)
685             .replace("${NODE_HOME}", self.node.node_home)
686             .replace("${HOME}", self.node.home_dir)
687             )
688
689     def valid_connection(self, guid):
690         # TODO: Validate!
691         return True
692