Porting changes in LinuxApplication directory structure to CCNx RMs
[nepi.git] / src / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
23     reschedule_delay
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sshfuncs import ProcStatus
26 from nepi.util.timefuncs import tnow, tdiffsec
27
28 import os
29 import subprocess
30
31 # TODO: Resolve wildcards in commands!!
32 # TODO: During provisioning, everything that is not scp could be
33 #       uploaded to a same script, http_sources download, etc...
34 #       and like that require performing less ssh connections!!!
35 # TODO: Make stdin be a symlink to the original file in ${SHARE}
36 #       - later use md5sum to check wether the file needs to be re-upload
37
38
39 @clsinit
40 class LinuxApplication(ResourceManager):
41     """
42     .. class:: Class Args :
43       
44         :param ec: The Experiment controller
45         :type ec: ExperimentController
46         :param guid: guid of the RM
47         :type guid: int
48
49     .. note::
50
51     A LinuxApplication RM represents a process that can be executed in
52     a remote Linux host using SSH.
53
54     The LinuxApplication RM takes care of uploadin sources and any files
55     needed to run the experiment, to the remote host. 
56     It also allows to provide source compilation (build) and installation 
57     instructions, and takes care of automating the sources build and 
58     installation tasks for the user.
59
60     It is important to note that files uploaded to the remote host have
61     two possible scopes: single-experiment or multi-experiment.
62     Single experiment files are those that will not be re-used by other 
63     experiments. Multi-experiment files are those that will.
64     Sources and shared files are always made available to all experiments.
65
66     Directory structure:
67
68     The directory structure used by LinuxApplication RM at the Linux
69     host is the following:
70
71         ${HOME}/nepi-usr --> Base directory for multi-experiment files
72                       |
73         ${LIB}        |- /lib --> Base directory for libraries
74         ${BIN}        |- /bin --> Base directory for binary files
75         ${SRC}        |- /src --> Base directory for sources
76         ${SHARE}      |- /share --> Base directory for other files
77
78         ${HOME}/nepi-exp --> Base directory for single-experiment files
79                       |
80         ${EXP_HOME}   |- /<exp-id>  --> Base directory for experiment exp-id
81                           |
82         ${APP_HOME}       |- /<app-guid> --> Base directory for application 
83                                |     specific files (e.g. command.sh, input)
84                                | 
85         ${RUN_HOME}            |- /<run-id> --> Base directory for run specific
86
87     """
88
89     _rtype = "LinuxApplication"
90
91     @classmethod
92     def _register_attributes(cls):
93         command = Attribute("command", "Command to execute at application start. "
94                 "Note that commands will be executed in the ${RUN_HOME} directory, "
95                 "make sure to take this into account when using relative paths. ", 
96                 flags = Flags.ExecReadOnly)
97         forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", 
98                 flags = Flags.ExecReadOnly)
99         env = Attribute("env", "Environment variables string for command execution",
100                 flags = Flags.ExecReadOnly)
101         sudo = Attribute("sudo", "Run with root privileges", 
102                 flags = Flags.ExecReadOnly)
103         depends = Attribute("depends", 
104                 "Space-separated list of packages required to run the application",
105                 flags = Flags.ExecReadOnly)
106         sources = Attribute("sources", 
107                 "Space-separated list of regular files to be uploaded to ${SRC} "
108                 "directory prior to building. Archives won't be expanded automatically. "
109                 "Sources are globally available for all experiments unless "
110                 "cleanHome is set to True (This will delete all sources). ",
111                 flags = Flags.ExecReadOnly)
112         files = Attribute("files", 
113                 "Space-separated list of regular miscellaneous files to be uploaded "
114                 "to ${SHARE} directory. "
115                 "Files are globally available for all experiments unless "
116                 "cleanHome is set to True (This will delete all files). ",
117                 flags = Flags.ExecReadOnly)
118         libs = Attribute("libs", 
119                 "Space-separated list of libraries (e.g. .so files) to be uploaded "
120                 "to ${LIB} directory. "
121                 "Libraries are globally available for all experiments unless "
122                 "cleanHome is set to True (This will delete all files). ",
123                 flags = Flags.ExecReadOnly)
124         bins = Attribute("bins", 
125                 "Space-separated list of binary files to be uploaded "
126                 "to ${BIN} directory. "
127                 "Binaries are globally available for all experiments unless "
128                 "cleanHome is set to True (This will delete all files). ",
129                 flags = Flags.ExecReadOnly)
130         code = Attribute("code", 
131                 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
132                 flags = Flags.ExecReadOnly)
133         build = Attribute("build", 
134                 "Build commands to execute after deploying the sources. "
135                 "Sources are uploaded to the ${SRC} directory and code "
136                 "is uploaded to the ${APP_HOME} directory. \n"
137                 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
138                 "./configure && make && make clean.\n"
139                 "Make sure to make the build commands return with a nonzero exit "
140                 "code on error.",
141                 flags = Flags.ReadOnly)
142         install = Attribute("install", 
143                 "Commands to transfer built files to their final destinations. "
144                 "Install commands are executed after build commands. ",
145                 flags = Flags.ReadOnly)
146         stdin = Attribute("stdin", "Standard input for the 'command'", 
147                 flags = Flags.ExecReadOnly)
148         tear_down = Attribute("tearDown", "Command to be executed just before " 
149                 "releasing the resource", 
150                 flags = Flags.ReadOnly)
151
152         cls._register_attribute(command)
153         cls._register_attribute(forward_x11)
154         cls._register_attribute(env)
155         cls._register_attribute(sudo)
156         cls._register_attribute(depends)
157         cls._register_attribute(sources)
158         cls._register_attribute(code)
159         cls._register_attribute(files)
160         cls._register_attribute(bins)
161         cls._register_attribute(libs)
162         cls._register_attribute(build)
163         cls._register_attribute(install)
164         cls._register_attribute(stdin)
165         cls._register_attribute(tear_down)
166
167     @classmethod
168     def _register_traces(cls):
169         stdout = Trace("stdout", "Standard output stream")
170         stderr = Trace("stderr", "Standard error stream")
171
172         cls._register_trace(stdout)
173         cls._register_trace(stderr)
174
175     def __init__(self, ec, guid):
176         super(LinuxApplication, self).__init__(ec, guid)
177         self._pid = None
178         self._ppid = None
179         self._home = "app-%s" % self.guid
180         self._in_foreground = False
181
182         # keep a reference to the running process handler when 
183         # the command is not executed as remote daemon in background
184         self._proc = None
185
186         # timestamp of last state check of the application
187         self._last_state_check = tnow()
188
189     def log_message(self, msg):
190         return " guid %d - host %s - %s " % (self.guid, 
191                 self.node.get("hostname"), msg)
192
193     @property
194     def node(self):
195         node = self.get_connected(LinuxNode.rtype())
196         if node: return node[0]
197         return None
198
199     @property
200     def app_home(self):
201         return os.path.join(self.node.exp_home, self._home)
202
203     @property
204     def run_home(self):
205         return os.path.join(self.app_home, self.ec.run_id)
206
207     @property
208     def pid(self):
209         return self._pid
210
211     @property
212     def ppid(self):
213         return self._ppid
214
215     @property
216     def in_foreground(self):
217         """ Returns True if the command needs to be executed in foreground.
218         This means that command will be executed using 'execute' instead of
219         'run' ('run' executes a command in background and detached from the 
220         terminal)
221         
222         When using X11 forwarding option, the command can not run in background
223         and detached from a terminal, since we need to keep the terminal attached 
224         to interact with it.
225         """
226         return self.get("forwardX11") or self._in_foreground
227
228     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
229         self.info("Retrieving '%s' trace %s " % (name, attr))
230
231         path = os.path.join(self.run_home, name)
232         
233         command = "(test -f %s && echo 'success') || echo 'error'" % path
234         (out, err), proc = self.node.execute(command)
235
236         if (err and proc.poll()) or out.find("error") != -1:
237             msg = " Couldn't find trace %s " % name
238             self.error(msg, out, err)
239             return None
240     
241         if attr == TraceAttr.PATH:
242             return path
243
244         if attr == TraceAttr.ALL:
245             (out, err), proc = self.node.check_output(self.run_home, name)
246             
247             if err and proc.poll():
248                 msg = " Couldn't read trace %s " % name
249                 self.error(msg, out, err)
250                 return None
251
252             return out
253
254         if attr == TraceAttr.STREAM:
255             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
256         elif attr == TraceAttr.SIZE:
257             cmd = "stat -c%%s %s " % path
258
259         (out, err), proc = self.node.execute(cmd)
260
261         if err and proc.poll():
262             msg = " Couldn't find trace %s " % name
263             self.error(msg, out, err)
264             return None
265         
266         if attr == TraceAttr.SIZE:
267             out = int(out.strip())
268
269         return out
270             
271     def provision(self):
272         # create run dir for application
273         self.node.mkdir(self.run_home)
274     
275         steps = [
276             # upload sources
277             self.upload_sources,
278             # upload files
279             self.upload_files,
280             # upload binaries
281             self.upload_binaries,
282             # upload libraries
283             self.upload_libraries,
284             # upload code
285             self.upload_code,
286             # upload stdin
287             self.upload_stdin,
288             # install dependencies
289             self.install_dependencies,
290             # build
291             self.build,
292             # Install
293             self.install]
294
295         # Since provisioning takes a long time, before
296         # each step we check that the EC is still 
297         for step in steps:
298             if self.ec.finished:
299                 raise RuntimeError, "EC finished"
300
301             step()
302
303         self.upload_start_command()
304        
305         self.info("Provisioning finished")
306
307         super(LinuxApplication, self).provision()
308
309     def upload_start_command(self):
310         # Upload command to remote bash script
311         # - only if command can be executed in background and detached
312         command = self.get("command")
313
314         if command and not self.in_foreground:
315             self.info("Uploading command '%s'" % command)
316
317             # replace application specific paths in the command
318             command = self.replace_paths(command)
319             
320             # replace application specific paths in the environment
321             env = self.get("env")
322             env = env and self.replace_paths(env)
323
324             shfile = os.path.join(self.app_home, "start.sh")
325
326             self.node.upload_command(command, 
327                     shfile = shfile,
328                     env = env)
329
330     def upload_sources(self):
331         sources = self.get("sources")
332
333         if sources:
334             self.info("Uploading sources ")
335
336             sources = sources.split(' ')
337
338             # Separate sources that should be downloaded from 
339             # the web, from sources that should be uploaded from
340             # the local machine
341             command = []
342             for source in list(sources):
343                 if source.startswith("http") or source.startswith("https"):
344                     # remove the hhtp source from the sources list
345                     sources.remove(source)
346
347                     command.append( " ( " 
348                             # Check if the source already exists
349                             " ls ${SRC}/%(basename)s "
350                             " || ( "
351                             # If source doesn't exist, download it and check
352                             # that it it downloaded ok
353                             "   wget -c --directory-prefix=${SRC} %(source)s && "
354                             "   ls ${SRC}/%(basename)s "
355                             " ) ) " % {
356                                 "basename": os.path.basename(source),
357                                 "source": source
358                                 })
359
360             if command:
361                 command = " && ".join(command)
362
363                 # replace application specific paths in the command
364                 command = self.replace_paths(command)
365                 
366                 # Upload the command to a bash script and run it
367                 # in background ( but wait until the command has
368                 # finished to continue )
369                 self.node.run_and_wait(command, self.run_home,
370                         shfile = os.path.join(self.app_home, "http_sources.sh"),
371                         overwrite = False,
372                         pidfile = "http_sources_pidfile", 
373                         ecodefile = "http_sources_exitcode", 
374                         stdout = "http_sources_stdout", 
375                         stderr = "http_sources_stderr")
376
377             if sources:
378                 sources = ' '.join(sources)
379                 self.node.upload(sources, self.node.src_dir, overwrite = False)
380
381     def upload_files(self):
382         files = self.get("files")
383
384         if files:
385             self.info("Uploading files %s " % files)
386             self.node.upload(files, self.node.share_dir, overwrite = False)
387
388     def upload_libraries(self):
389         libs = self.get("libs")
390
391         if libs:
392             self.info("Uploading libraries %s " % libaries)
393             self.node.upload(libs, self.node.lib_dir, overwrite = False)
394
395     def upload_binaries(self):
396         bins = self.get("bins")
397
398         if bins:
399             self.info("Uploading binaries %s " % binaries)
400             self.node.upload(bins, self.node.bin_dir, overwrite = False)
401
402     def upload_code(self):
403         code = self.get("code")
404
405         if code:
406             self.info("Uploading code")
407
408             dst = os.path.join(self.app_home, "code")
409             self.node.upload(code, dst, overwrite = False, text = True)
410
411     def upload_stdin(self):
412         stdin = self.get("stdin")
413         if stdin:
414             # create dir for sources
415             self.info("Uploading stdin")
416             
417             dst = os.path.join(self.app_home, "stdin")
418             self.node.upload(stdin, dst, overwrite = False, text = True)
419
420     def install_dependencies(self):
421         depends = self.get("depends")
422         if depends:
423             self.info("Installing dependencies %s" % depends)
424             self.node.install_packages(depends, self.app_home, self.run_home)
425
426     def build(self):
427         build = self.get("build")
428
429         if build:
430             self.info("Building sources ")
431             
432             # replace application specific paths in the command
433             command = self.replace_paths(build)
434
435             # Upload the command to a bash script and run it
436             # in background ( but wait until the command has
437             # finished to continue )
438             self.node.run_and_wait(command, self.run_home,
439                     shfile = os.path.join(self.app_home, "build.sh"),
440                     overwrite = False,
441                     pidfile = "build_pidfile", 
442                     ecodefile = "build_exitcode", 
443                     stdout = "build_stdout", 
444                     stderr = "build_stderr")
445  
446     def install(self):
447         install = self.get("install")
448
449         if install:
450             self.info("Installing sources ")
451
452             # replace application specific paths in the command
453             command = self.replace_paths(install)
454
455             # Upload the command to a bash script and run it
456             # in background ( but wait until the command has
457             # finished to continue )
458             self.node.run_and_wait(command, self.run_home,
459                     shfile = os.path.join(self.app_home, "install.sh"),
460                     overwrite = False,
461                     pidfile = "install_pidfile", 
462                     ecodefile = "install_exitcode", 
463                     stdout = "install_stdout", 
464                     stderr = "install_stderr")
465
466     def deploy(self):
467         # Wait until node is associated and deployed
468         node = self.node
469         if not node or node.state < ResourceState.READY:
470             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
471             self.ec.schedule(reschedule_delay, self.deploy)
472         else:
473             try:
474                 command = self.get("command") or ""
475                 self.info("Deploying command '%s' " % command)
476                 self.discover()
477                 self.provision()
478             except:
479                 self._state = ResourceState.FAILED
480                 raise
481
482             super(LinuxApplication, self).deploy()
483
484     def start(self):
485         command = self.get("command")
486
487         self.info("Starting command '%s'" % command)
488
489         if not command:
490             # If no command was given (i.e. Application was used for dependency
491             # installation), then the application is directly marked as FINISHED
492             self._state = ResourceState.FINISHED
493         else:
494
495             if self.in_foreground:
496                 self._start_in_foreground()
497             else:
498                 self._start_in_background()
499
500             super(LinuxApplication, self).start()
501
502     def _start_in_foreground(self):
503         command = self.get("command")
504         sudo = self.get("sudo") or False
505         x11 = self.get("forwardX11")
506
507         # For a command being executed in foreground, if there is stdin,
508         # it is expected to be text string not a file or pipe
509         stdin = self.get("stdin") or None
510
511         # Command will be launched in foreground and attached to the
512         # terminal using the node 'execute' in non blocking mode.
513
514         # Export environment
515         env = self.get("env")
516         environ = self.node.format_environment(env, inline = True)
517         command = environ + command
518         command = self.replace_paths(command)
519
520         # We save the reference to the process in self._proc 
521         # to be able to kill the process from the stop method.
522         # We also set blocking = False, since we don't want the
523         # thread to block until the execution finishes.
524         (out, err), self._proc = self.node.execute(command,
525                 sudo = sudo,
526                 stdin = stdin,
527                 forward_x11 = x11,
528                 blocking = False)
529
530         if self._proc.poll():
531             self._state = ResourceState.FAILED
532             self.error(msg, out, err)
533             raise RuntimeError, msg
534
535     def _start_in_background(self):
536         command = self.get("command")
537         env = self.get("env")
538         sudo = self.get("sudo") or False
539
540         stdout = "stdout"
541         stderr = "stderr"
542         stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
543                 else None
544
545         # Command will be run as a daemon in baground and detached from any
546         # terminal.
547         # The command to run was previously uploaded to a bash script
548         # during deployment, now we launch the remote script using 'run'
549         # method from the node.
550         cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
551         (out, err), proc = self.node.run(cmd, self.run_home, 
552             stdin = stdin, 
553             stdout = stdout,
554             stderr = stderr,
555             sudo = sudo)
556
557         # check if execution errors occurred
558         msg = " Failed to start command '%s' " % command
559         
560         if proc.poll():
561             self._state = ResourceState.FAILED
562             self.error(msg, out, err)
563             raise RuntimeError, msg
564     
565         # Wait for pid file to be generated
566         pid, ppid = self.node.wait_pid(self.run_home)
567         if pid: self._pid = int(pid)
568         if ppid: self._ppid = int(ppid)
569
570         # If the process is not running, check for error information
571         # on the remote machine
572         if not self.pid or not self.ppid:
573             (out, err), proc = self.node.check_errors(self.run_home,
574                     stderr = stderr) 
575
576             # Out is what was written in the stderr file
577             if err:
578                 self._state = ResourceState.FAILED
579                 msg = " Failed to start command '%s' " % command
580                 self.error(msg, out, err)
581                 raise RuntimeError, msg
582         
583     def stop(self):
584         """ Stops application execution
585         """
586         command = self.get('command') or ''
587
588         if self.state == ResourceState.STARTED:
589             stopped = True
590
591             self.info("Stopping command '%s'" % command)
592         
593             # If the command is running in foreground (it was launched using
594             # the node 'execute' method), then we use the handler to the Popen
595             # process to kill it. Else we send a kill signal using the pid and ppid
596             # retrieved after running the command with the node 'run' method
597
598             if self._proc:
599                 self._proc.kill()
600             else:
601                 # Only try to kill the process if the pid and ppid
602                 # were retrieved
603                 if self.pid and self.ppid:
604                     (out, err), proc = self.node.kill(self.pid, self.ppid)
605
606                     if out or err:
607                         # check if execution errors occurred
608                         msg = " Failed to STOP command '%s' " % self.get("command")
609                         self.error(msg, out, err)
610                         self._state = ResourceState.FAILED
611                         stopped = False
612
613             if stopped:
614                 super(LinuxApplication, self).stop()
615
616     def release(self):
617         self.info("Releasing resource")
618
619         tear_down = self.get("tearDown")
620         if tear_down:
621             self.node.execute(tear_down)
622
623         self.stop()
624
625         if self.state == ResourceState.STOPPED:
626             super(LinuxApplication, self).release()
627     
628     @property
629     def state(self):
630         """ Returns the state of the application
631         """
632         if self._state == ResourceState.STARTED:
633             if self.in_foreground:
634                 # Check if the process we used to execute the command
635                 # is still running ...
636                 retcode = self._proc.poll()
637
638                 # retcode == None -> running
639                 # retcode > 0 -> error
640                 # retcode == 0 -> finished
641                 if retcode:
642                     out = ""
643                     msg = " Failed to execute command '%s'" % self.get("command")
644                     err = self._proc.stderr.read()
645                     self.error(msg, out, err)
646                     self._state = ResourceState.FAILED
647                 elif retcode == 0:
648                     self._state = ResourceState.FINISHED
649
650             else:
651                 # We need to query the status of the command we launched in 
652                 # background. In oredr to avoid overwhelming the remote host and
653                 # the local processor with too many ssh queries, the state is only
654                 # requested every 'state_check_delay' seconds.
655                 state_check_delay = 0.5
656                 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
657                     # check if execution errors occurred
658                     (out, err), proc = self.node.check_errors(self.run_home)
659
660                     if err:
661                         msg = " Failed to execute command '%s'" % self.get("command")
662                         self.error(msg, out, err)
663                         self._state = ResourceState.FAILED
664
665                     elif self.pid and self.ppid:
666                         # No execution errors occurred. Make sure the background
667                         # process with the recorded pid is still running.
668                         status = self.node.status(self.pid, self.ppid)
669
670                         if status == ProcStatus.FINISHED:
671                             self._state = ResourceState.FINISHED
672
673                     self._last_state_check = tnow()
674
675         return self._state
676
677     def replace_paths(self, command):
678         """
679         Replace all special path tags with shell-escaped actual paths.
680         """
681         return ( command
682             .replace("${USR}", self.node.usr_dir)
683             .replace("${LIB}", self.node.lib_dir)
684             .replace("${BIN}", self.node.bin_dir)
685             .replace("${SRC}", self.node.src_dir)
686             .replace("${SHARE}", self.node.share_dir)
687             .replace("${EXP}", self.node.exp_dir)
688             .replace("${EXP_HOME}", self.node.exp_home)
689             .replace("${APP_HOME}", self.app_home)
690             .replace("${RUN_HOME}", self.run_home)
691             .replace("${NODE_HOME}", self.node.node_home)
692             .replace("${HOME}", self.node.home_dir)
693             )
694
695     def valid_connection(self, guid):
696         # TODO: Validate!
697         return True
698