LinuxApplication: Changed directory structure to store experiment files in the Linux...
[nepi.git] / src / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
23     reschedule_delay
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sshfuncs import ProcStatus
26 from nepi.util.timefuncs import tnow, tdiffsec
27
28 import os
29 import subprocess
30
31 # TODO: Resolve wildcards in commands!!
32 # TODO: During provisioning, everything that is not scp could be
33 #       uploaded to a same script, http_sources download, etc...
34 #       and like that require performing less ssh connections!!!
35
36
37 @clsinit
38 class LinuxApplication(ResourceManager):
39     """
40     .. class:: Class Args :
41       
42         :param ec: The Experiment controller
43         :type ec: ExperimentController
44         :param guid: guid of the RM
45         :type guid: int
46
47     .. note::
48
49     A LinuxApplication RM represents a process that can be executed in
50     a remote Linux host using SSH.
51
52     The LinuxApplication RM takes care of uploadin sources and any files
53     needed to run the experiment, to the remote host. 
54     It also allows to provide source compilation (build) and installation 
55     instructions, and takes care of automating the sources build and 
56     installation tasks for the user.
57
58     It is important to note that files uploaded to the remote host have
59     two possible scopes: single-experiment or multi-experiment.
60     Single experiment files are those that will not be re-used by other 
61     experiments. Multi-experiment files are those that will.
62     Sources and shared files are always made available to all experiments.
63
64     Directory structure:
65
66     The directory structure used by LinuxApplication RM at the Linux
67     host is the following:
68
69         ${HOME}/nepi-usr --> Base directory for multi-experiment files
70                       |
71         ${LIB}        |- /lib --> Base directory for libraries
72         ${BIN}        |- /bin --> Base directory for binary files
73         ${SRC}        |- /src --> Base directory for sources
74         ${SHARE}      |- /share --> Base directory for other files
75
76         ${HOME}/nepi-exp --> Base directory for single-experiment files
77                       |
78         ${EXP_HOME}   |- /<exp-id>  --> Base directory for experiment exp-id
79                           |
80         ${APP_HOME}       |- /<app-guid> --> Base directory for application 
81                                |     specific files (e.g. command.sh, input)
82                                | 
83         ${RUN_HOME}            |- /<run-id> --> Base directory for run specific
84
85     """
86
87     _rtype = "LinuxApplication"
88
89     @classmethod
90     def _register_attributes(cls):
91         command = Attribute("command", "Command to execute at application start. "
92                 "Note that commands will be executed in the ${RUN_HOME} directory, "
93                 "make sure to take this into account when using relative paths. ", 
94                 flags = Flags.ExecReadOnly)
95         forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", 
96                 flags = Flags.ExecReadOnly)
97         env = Attribute("env", "Environment variables string for command execution",
98                 flags = Flags.ExecReadOnly)
99         sudo = Attribute("sudo", "Run with root privileges", 
100                 flags = Flags.ExecReadOnly)
101         depends = Attribute("depends", 
102                 "Space-separated list of packages required to run the application",
103                 flags = Flags.ExecReadOnly)
104         sources = Attribute("sources", 
105                 "Space-separated list of regular files to be uploaded to ${SRC} "
106                 "directory prior to building. Archives won't be expanded automatically. "
107                 "Sources are globally available for all experiments unless "
108                 "cleanHome is set to True (This will delete all sources). ",
109                 flags = Flags.ExecReadOnly)
110         files = Attribute("files", 
111                 "Space-separated list of regular miscellaneous files to be uploaded "
112                 "to ${SHARE} directory. "
113                 "Files are globally available for all experiments unless "
114                 "cleanHome is set to True (This will delete all files). ",
115                 flags = Flags.ExecReadOnly)
116         libs = Attribute("libs", 
117                 "Space-separated list of libraries (e.g. .so files) to be uploaded "
118                 "to ${LIB} directory. "
119                 "Libraries are globally available for all experiments unless "
120                 "cleanHome is set to True (This will delete all files). ",
121                 flags = Flags.ExecReadOnly)
122         bins = Attribute("bins", 
123                 "Space-separated list of binary files to be uploaded "
124                 "to ${BIN} directory. "
125                 "Binaries are globally available for all experiments unless "
126                 "cleanHome is set to True (This will delete all files). ",
127                 flags = Flags.ExecReadOnly)
128         code = Attribute("code", 
129                 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
130                 flags = Flags.ExecReadOnly)
131         build = Attribute("build", 
132                 "Build commands to execute after deploying the sources. "
133                 "Sources are uploaded to the ${SRC} directory and code "
134                 "is uploaded to the ${APP_HOME} directory. \n"
135                 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
136                 "./configure && make && make clean.\n"
137                 "Make sure to make the build commands return with a nonzero exit "
138                 "code on error.",
139                 flags = Flags.ReadOnly)
140         install = Attribute("install", 
141                 "Commands to transfer built files to their final destinations. "
142                 "Install commands are executed after build commands. ",
143                 flags = Flags.ReadOnly)
144         stdin = Attribute("stdin", "Standard input for the 'command'", 
145                 flags = Flags.ExecReadOnly)
146         tear_down = Attribute("tearDown", "Command to be executed just before " 
147                 "releasing the resource", 
148                 flags = Flags.ReadOnly)
149
150         cls._register_attribute(command)
151         cls._register_attribute(forward_x11)
152         cls._register_attribute(env)
153         cls._register_attribute(sudo)
154         cls._register_attribute(depends)
155         cls._register_attribute(sources)
156         cls._register_attribute(code)
157         cls._register_attribute(files)
158         cls._register_attribute(bins)
159         cls._register_attribute(libs)
160         cls._register_attribute(build)
161         cls._register_attribute(install)
162         cls._register_attribute(stdin)
163         cls._register_attribute(tear_down)
164
165     @classmethod
166     def _register_traces(cls):
167         stdout = Trace("stdout", "Standard output stream")
168         stderr = Trace("stderr", "Standard error stream")
169
170         cls._register_trace(stdout)
171         cls._register_trace(stderr)
172
173     def __init__(self, ec, guid):
174         super(LinuxApplication, self).__init__(ec, guid)
175         self._pid = None
176         self._ppid = None
177         self._home = "app-%s" % self.guid
178         self._in_foreground = False
179
180         # keep a reference to the running process handler when 
181         # the command is not executed as remote daemon in background
182         self._proc = None
183
184         # timestamp of last state check of the application
185         self._last_state_check = tnow()
186
187     def log_message(self, msg):
188         return " guid %d - host %s - %s " % (self.guid, 
189                 self.node.get("hostname"), msg)
190
191     @property
192     def node(self):
193         node = self.get_connected(LinuxNode.rtype())
194         if node: return node[0]
195         return None
196
197     @property
198     def app_home(self):
199         return os.path.join(self.node.exp_home, self._home)
200
201     @property
202     def run_home(self):
203         return os.path.join(self.app_home, self.ec.run_id)
204
205     @property
206     def pid(self):
207         return self._pid
208
209     @property
210     def ppid(self):
211         return self._ppid
212
213     @property
214     def in_foreground(self):
215         """ Returns True if the command needs to be executed in foreground.
216         This means that command will be executed using 'execute' instead of
217         'run' ('run' executes a command in background and detached from the 
218         terminal)
219         
220         When using X11 forwarding option, the command can not run in background
221         and detached from a terminal, since we need to keep the terminal attached 
222         to interact with it.
223         """
224         return self.get("forwardX11") or self._in_foreground
225
226     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
227         self.info("Retrieving '%s' trace %s " % (name, attr))
228
229         path = os.path.join(self.run_home, name)
230         
231         command = "(test -f %s && echo 'success') || echo 'error'" % path
232         (out, err), proc = self.node.execute(command)
233
234         if (err and proc.poll()) or out.find("error") != -1:
235             msg = " Couldn't find trace %s " % name
236             self.error(msg, out, err)
237             return None
238     
239         if attr == TraceAttr.PATH:
240             return path
241
242         if attr == TraceAttr.ALL:
243             (out, err), proc = self.node.check_output(self.run_home, name)
244             
245             if err and proc.poll():
246                 msg = " Couldn't read trace %s " % name
247                 self.error(msg, out, err)
248                 return None
249
250             return out
251
252         if attr == TraceAttr.STREAM:
253             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
254         elif attr == TraceAttr.SIZE:
255             cmd = "stat -c%%s %s " % path
256
257         (out, err), proc = self.node.execute(cmd)
258
259         if err and proc.poll():
260             msg = " Couldn't find trace %s " % name
261             self.error(msg, out, err)
262             return None
263         
264         if attr == TraceAttr.SIZE:
265             out = int(out.strip())
266
267         return out
268             
269     def provision(self):
270         # create run dir for application
271         self.node.mkdir(self.run_home)
272     
273         steps = [
274             # upload sources
275             self.upload_sources,
276             # upload files
277             self.upload_files,
278             # upload binaries
279             self.upload_binaries,
280             # upload libraries
281             self.upload_libraries,
282             # upload code
283             self.upload_code,
284             # upload stdin
285             self.upload_stdin,
286             # install dependencies
287             self.install_dependencies,
288             # build
289             self.build,
290             # Install
291             self.install]
292
293         # Since provisioning takes a long time, before
294         # each step we check that the EC is still 
295         for step in steps:
296             if self.ec.finished:
297                 raise RuntimeError, "EC finished"
298
299             step()
300
301         # Upload command to remote bash script
302         # - only if command can be executed in background and detached
303         command = self.get("command")
304
305         if command and not self.in_foreground:
306             self.info("Uploading command '%s'" % command)
307
308             # replace application specific paths in the command
309             command = self.replace_paths(command)
310             
311             # replace application specific paths in the environment
312             env = self.get("env")
313             env = env and self.replace_paths(env)
314
315             shfile = os.path.join(self.app_home, "app.sh")
316
317             self.node.upload_command(command, 
318                     shfile = shfile,
319                     env = env)
320        
321         self.info("Provisioning finished")
322
323         super(LinuxApplication, self).provision()
324
325     def upload_sources(self):
326         sources = self.get("sources")
327
328         if sources:
329             self.info("Uploading sources ")
330
331             sources = sources.split(' ')
332
333             # Separate sources that should be downloaded from 
334             # the web, from sources that should be uploaded from
335             # the local machine
336             command = []
337             for source in list(sources):
338                 if source.startswith("http") or source.startswith("https"):
339                     # remove the hhtp source from the sources list
340                     sources.remove(source)
341
342                     command.append( " ( " 
343                             # Check if the source already exists
344                             " ls ${SRC}/%(basename)s "
345                             " || ( "
346                             # If source doesn't exist, download it and check
347                             # that it it downloaded ok
348                             "   wget -c --directory-prefix=${SRC} %(source)s && "
349                             "   ls ${SRC}/%(basename)s "
350                             " ) ) " % {
351                                 "basename": os.path.basename(source),
352                                 "source": source
353                                 })
354
355             if command:
356                 command = " && ".join(command)
357
358                 # replace application specific paths in the command
359                 command = self.replace_paths(command)
360                 
361                 # Upload the command to a bash script and run it
362                 # in background ( but wait until the command has
363                 # finished to continue )
364                 self.node.run_and_wait(command, self.run_home,
365                         shfile = os.path.join(self.app_home, "http_sources.sh"),
366                         overwrite = False,
367                         pidfile = "http_sources_pidfile", 
368                         ecodefile = "http_sources_exitcode", 
369                         stdout = "http_sources_stdout", 
370                         stderr = "http_sources_stderr")
371
372             if sources:
373                 sources = ' '.join(sources)
374                 self.node.upload(sources, self.node.src_dir, overwrite = False)
375
376     def upload_files(self):
377         files = self.get("files")
378
379         if files:
380             self.info("Uploading files %s " % files)
381             self.node.upload(files, self.node.share_dir, overwrite = False)
382
383     def upload_libraries(self):
384         libs = self.get("libs")
385
386         if libs:
387             self.info("Uploading libraries %s " % libaries)
388             self.node.upload(libs, self.node.lib_dir, overwrite = False)
389
390     def upload_binaries(self):
391         bins = self.get("bins")
392
393         if bins:
394             self.info("Uploading binaries %s " % binaries)
395             self.node.upload(bins, self.node.bin_dir, overwrite = False)
396
397     def upload_code(self):
398         code = self.get("code")
399
400         if code:
401             self.info("Uploading code")
402
403             dst = os.path.join(self.app_home, "code")
404             self.node.upload(code, dst, overwrite = False, text = True)
405
406     def upload_stdin(self):
407         stdin = self.get("stdin")
408         if stdin:
409             # create dir for sources
410             self.info("Uploading stdin")
411             
412             dst = os.path.join(self.app_home, "stdin")
413             self.node.upload(stdin, dst, overwrite = False, text = True)
414
415     def install_dependencies(self):
416         depends = self.get("depends")
417         if depends:
418             self.info("Installing dependencies %s" % depends)
419             self.node.install_packages(depends, self.app_home, self.run_home)
420
421     def build(self):
422         build = self.get("build")
423
424         if build:
425             self.info("Building sources ")
426             
427             # replace application specific paths in the command
428             command = self.replace_paths(build)
429
430             # Upload the command to a bash script and run it
431             # in background ( but wait until the command has
432             # finished to continue )
433             self.node.run_and_wait(command, self.run_home,
434                     shfile = os.path.join(self.app_home, "build.sh"),
435                     overwrite = False,
436                     pidfile = "build_pidfile", 
437                     ecodefile = "build_exitcode", 
438                     stdout = "build_stdout", 
439                     stderr = "build_stderr")
440  
441     def install(self):
442         install = self.get("install")
443
444         if install:
445             self.info("Installing sources ")
446
447             # replace application specific paths in the command
448             command = self.replace_paths(install)
449
450             # Upload the command to a bash script and run it
451             # in background ( but wait until the command has
452             # finished to continue )
453             self.node.run_and_wait(command, self.run_home,
454                     shfile = os.path.join(self.app_home, "install.sh"),
455                     overwrite = False,
456                     pidfile = "install_pidfile", 
457                     ecodefile = "install_exitcode", 
458                     stdout = "install_stdout", 
459                     stderr = "install_stderr")
460
461     def deploy(self):
462         # Wait until node is associated and deployed
463         node = self.node
464         if not node or node.state < ResourceState.READY:
465             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
466             self.ec.schedule(reschedule_delay, self.deploy)
467         else:
468             try:
469                 command = self.get("command") or ""
470                 self.info("Deploying command '%s' " % command)
471                 self.discover()
472                 self.provision()
473             except:
474                 self._state = ResourceState.FAILED
475                 raise
476
477             super(LinuxApplication, self).deploy()
478
479     def start(self):
480         command = self.get("command")
481
482         self.info("Starting command '%s'" % command)
483
484         if not command:
485             # If no command was given (i.e. Application was used for dependency
486             # installation), then the application is directly marked as FINISHED
487             self._state = ResourceState.FINISHED
488         else:
489
490             if self.in_foreground:
491                 self._start_in_foreground()
492             else:
493                 self._start_in_background()
494
495             super(LinuxApplication, self).start()
496
497     def _start_in_foreground(self):
498         command = self.get("command")
499         sudo = self.get("sudo") or False
500         x11 = self.get("forwardX11")
501
502         # For a command being executed in foreground, if there is stdin,
503         # it is expected to be text string not a file or pipe
504         stdin = self.get("stdin") or None
505
506         # Command will be launched in foreground and attached to the
507         # terminal using the node 'execute' in non blocking mode.
508
509         # Export environment
510         env = self.get("env")
511         environ = self.node.format_environment(env, inline = True)
512         command = environ + command
513         command = self.replace_paths(command)
514
515         # We save the reference to the process in self._proc 
516         # to be able to kill the process from the stop method.
517         # We also set blocking = False, since we don't want the
518         # thread to block until the execution finishes.
519         (out, err), self._proc = self.node.execute(command,
520                 sudo = sudo,
521                 stdin = stdin,
522                 forward_x11 = x11,
523                 blocking = False)
524
525         if self._proc.poll():
526             self._state = ResourceState.FAILED
527             self.error(msg, out, err)
528             raise RuntimeError, msg
529
530     def _start_in_background(self):
531         command = self.get("command")
532         env = self.get("env")
533         sudo = self.get("sudo") or False
534
535         stdout = "stdout"
536         stderr = "stderr"
537         stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
538                 else None
539
540         # Command will be run as a daemon in baground and detached from any
541         # terminal.
542         # The command to run was previously uploaded to a bash script
543         # during deployment, now we launch the remote script using 'run'
544         # method from the node.
545         cmd = "bash %s" % os.path.join(self.app_home, "app.sh")
546         (out, err), proc = self.node.run(cmd, self.run_home, 
547             stdin = stdin, 
548             stdout = stdout,
549             stderr = stderr,
550             sudo = sudo)
551
552         # check if execution errors occurred
553         msg = " Failed to start command '%s' " % command
554         
555         if proc.poll():
556             self._state = ResourceState.FAILED
557             self.error(msg, out, err)
558             raise RuntimeError, msg
559     
560         # Wait for pid file to be generated
561         pid, ppid = self.node.wait_pid(self.run_home)
562         if pid: self._pid = int(pid)
563         if ppid: self._ppid = int(ppid)
564
565         # If the process is not running, check for error information
566         # on the remote machine
567         if not self.pid or not self.ppid:
568             (out, err), proc = self.node.check_errors(self.run_home,
569                     stderr = stderr) 
570
571             # Out is what was written in the stderr file
572             if err:
573                 self._state = ResourceState.FAILED
574                 msg = " Failed to start command '%s' " % command
575                 self.error(msg, out, err)
576                 raise RuntimeError, msg
577         
578     def stop(self):
579         """ Stops application execution
580         """
581         command = self.get('command') or ''
582
583         if self.state == ResourceState.STARTED:
584             stopped = True
585
586             self.info("Stopping command '%s'" % command)
587         
588             # If the command is running in foreground (it was launched using
589             # the node 'execute' method), then we use the handler to the Popen
590             # process to kill it. Else we send a kill signal using the pid and ppid
591             # retrieved after running the command with the node 'run' method
592
593             if self._proc:
594                 self._proc.kill()
595             else:
596                 # Only try to kill the process if the pid and ppid
597                 # were retrieved
598                 if self.pid and self.ppid:
599                     (out, err), proc = self.node.kill(self.pid, self.ppid)
600
601                     if out or err:
602                         # check if execution errors occurred
603                         msg = " Failed to STOP command '%s' " % self.get("command")
604                         self.error(msg, out, err)
605                         self._state = ResourceState.FAILED
606                         stopped = False
607
608             if stopped:
609                 super(LinuxApplication, self).stop()
610
611     def release(self):
612         self.info("Releasing resource")
613
614         tear_down = self.get("tearDown")
615         if tear_down:
616             self.node.execute(tear_down)
617
618         self.stop()
619
620         if self.state == ResourceState.STOPPED:
621             super(LinuxApplication, self).release()
622     
623     @property
624     def state(self):
625         """ Returns the state of the application
626         """
627         if self._state == ResourceState.STARTED:
628             if self.in_foreground:
629                 # Check if the process we used to execute the command
630                 # is still running ...
631                 retcode = self._proc.poll()
632
633                 # retcode == None -> running
634                 # retcode > 0 -> error
635                 # retcode == 0 -> finished
636                 if retcode:
637                     out = ""
638                     msg = " Failed to execute command '%s'" % self.get("command")
639                     err = self._proc.stderr.read()
640                     self.error(msg, out, err)
641                     self._state = ResourceState.FAILED
642                 elif retcode == 0:
643                     self._state = ResourceState.FINISHED
644
645             else:
646                 # We need to query the status of the command we launched in 
647                 # background. In oredr to avoid overwhelming the remote host and
648                 # the local processor with too many ssh queries, the state is only
649                 # requested every 'state_check_delay' seconds.
650                 state_check_delay = 0.5
651                 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
652                     # check if execution errors occurred
653                     (out, err), proc = self.node.check_errors(self.run_home)
654
655                     if err:
656                         msg = " Failed to execute command '%s'" % self.get("command")
657                         self.error(msg, out, err)
658                         self._state = ResourceState.FAILED
659
660                     elif self.pid and self.ppid:
661                         # No execution errors occurred. Make sure the background
662                         # process with the recorded pid is still running.
663                         status = self.node.status(self.pid, self.ppid)
664
665                         if status == ProcStatus.FINISHED:
666                             self._state = ResourceState.FINISHED
667
668                     self._last_state_check = tnow()
669
670         return self._state
671
672     def replace_paths(self, command):
673         """
674         Replace all special path tags with shell-escaped actual paths.
675         """
676         return ( command
677             .replace("${USR}", self.node.usr_dir)
678             .replace("${LIB}", self.node.lib_dir)
679             .replace("${BIN}", self.node.bin_dir)
680             .replace("${SRC}", self.node.src_dir)
681             .replace("${SHARE}", self.node.share_dir)
682             .replace("${EXP}", self.node.exp_dir)
683             .replace("${EXP_HOME}", self.node.exp_home)
684             .replace("${APP_HOME}", self.app_home)
685             .replace("${RUN_HOME}", self.run_home)
686             .replace("${NODE_HOME}", self.node.node_home)
687             .replace("${HOME}", self.node.home_dir)
688             )
689
690     def valid_connection(self, guid):
691         # TODO: Validate!
692         return True
693