nothing wrong with a stderr being not empty
[nepi.git] / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18
19 import os
20 import subprocess
21 import logging
22
23 from nepi.execution.attribute import Attribute, Flags, Types
24 from nepi.execution.trace import Trace, TraceAttr
25 from nepi.execution.resource import ResourceManager, clsinit_copy, \
26         ResourceState
27 from nepi.resources.linux.node import LinuxNode
28 from nepi.util.sshfuncs import ProcStatus, STDOUT
29 from nepi.util.timefuncs import tnow, tdiffsec
30
31 # to debug, just use
32 # logging.getLogger('application').setLevel(logging.DEBUG)
33 logger = logging.getLogger("application")
34
35 # TODO: Resolve wildcards in commands!!
36 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
37
38 @clsinit_copy
39 class LinuxApplication(ResourceManager):
40     """
41     .. class:: Class Args :
42       
43         :param ec: The Experiment controller
44         :type ec: ExperimentController
45         :param guid: guid of the RM
46         :type guid: int
47
48     .. note::
49
50         A LinuxApplication RM represents a process that can be executed in
51         a remote Linux host using SSH.
52
53         The LinuxApplication RM takes care of uploadin sources and any files
54         needed to run the experiment, to the remote host. 
55         It also allows to provide source compilation (build) and installation 
56         instructions, and takes care of automating the sources build and 
57         installation tasks for the user.
58
59         It is important to note that files uploaded to the remote host have
60         two possible scopes: single-experiment or multi-experiment.
61         Single experiment files are those that will not be re-used by other 
62         experiments. Multi-experiment files are those that will.
63         Sources and shared files are always made available to all experiments.
64
65         Directory structure:
66
67         The directory structure used by LinuxApplication RM at the Linux
68         host is the following:
69
70         ${HOME}/.nepi/nepi-usr --> Base directory for multi-experiment files
71                       |
72         ${LIB}        |- /lib --> Base directory for libraries
73         ${BIN}        |- /bin --> Base directory for binary files
74         ${SRC}        |- /src --> Base directory for sources
75         ${SHARE}      |- /share --> Base directory for other files
76
77         ${HOME}/.nepi/nepi-exp --> Base directory for single-experiment files
78                       |
79         ${EXP_HOME}   |- /<exp-id>  --> Base directory for experiment exp-id
80                           |
81         ${APP_HOME}       |- /<app-guid> --> Base directory for application 
82                                |     specific files (e.g. command.sh, input)
83                                | 
84         ${RUN_HOME}            |- /<run-id> --> Base directory for run specific
85
86     """
87
88     _rtype = "linux::Application"
89     _help = "Runs an application on a Linux host with a BASH command "
90     _platform = "linux"
91
92     @classmethod
93     def _register_attributes(cls):
94         cls._register_attribute(
95             Attribute("command", "Command to execute at application start. "
96                       "Note that commands will be executed in the ${RUN_HOME} directory, "
97                       "make sure to take this into account when using relative paths. ", 
98                       flags = Flags.Design))
99         cls._register_attribute(
100             Attribute("forwardX11",
101                       "Enables X11 forwarding for SSH connections", 
102                       flags = Flags.Design))
103         cls._register_attribute(
104             Attribute("env",
105                       "Environment variables string for command execution",
106                       flags = Flags.Design))
107         cls._register_attribute(
108             Attribute("sudo",
109                       "Run with root privileges", 
110                       flags = Flags.Design))
111         cls._register_attribute(
112             Attribute("depends", 
113                       "Space-separated list of packages required to run the application",
114                       flags = Flags.Design))
115         cls._register_attribute(
116             Attribute("sources", 
117                       "semi-colon separated list of regular files to be uploaded to ${SRC} "
118                       "directory prior to building. Archives won't be expanded automatically. "
119                       "Sources are globally available for all experiments unless "
120                       "cleanHome is set to True (This will delete all sources). ",
121                       flags = Flags.Design))
122         cls._register_attribute(
123             Attribute("files", 
124                       "semi-colon separated list of regular miscellaneous files to be uploaded "
125                       "to ${SHARE} directory. "
126                       "Files are globally available for all experiments unless "
127                       "cleanHome is set to True (This will delete all files). ",
128                       flags = Flags.Design))
129         cls._register_attribute(
130             Attribute("libs", 
131                       "semi-colon separated list of libraries (e.g. .so files) to be uploaded "
132                       "to ${LIB} directory. "
133                       "Libraries are globally available for all experiments unless "
134                       "cleanHome is set to True (This will delete all files). ",
135                       flags = Flags.Design))
136         cls._register_attribute(
137             Attribute("bins", 
138                       "semi-colon separated list of binary files to be uploaded "
139                       "to ${BIN} directory. "
140                       "Binaries are globally available for all experiments unless "
141                       "cleanHome is set to True (This will delete all files). ",
142                       flags = Flags.Design))
143         cls._register_attribute(
144             Attribute("code", 
145                       "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
146                       flags = Flags.Design))
147         cls._register_attribute(
148             Attribute("build", 
149                       "Build commands to execute after deploying the sources. "
150                       "Sources are uploaded to the ${SRC} directory and code "
151                       "is uploaded to the ${APP_HOME} directory. \n"
152                       "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
153                       "./configure && make && make clean.\n"
154                       "Make sure to make the build commands return with a nonzero exit "
155                       "code on error.",
156                       flags = Flags.Design))
157         cls._register_attribute(
158             Attribute("install", 
159                       "Commands to transfer built files to their final destinations. "
160                       "Install commands are executed after build commands. ",
161                       flags = Flags.Design))
162         cls._register_attribute(
163             Attribute("stdin", "Standard input for the 'command'", 
164                       flags = Flags.Design))
165         cls._register_attribute(
166             Attribute("tearDown",
167                       "Command to be executed just before releasing the resource", 
168                       flags = Flags.Design))
169         cls._register_attribute(
170             Attribute("splitStderr", 
171                       "requests stderr to be retrieved separately",
172                       default = False))
173
174     @classmethod
175     def _register_traces(cls):
176         cls._register_trace(
177             Trace("stdout", "Standard output stream", enabled = True))
178         cls._register_trace(
179             Trace("stderr", "Standard error stream", enabled = True))
180
181     def __init__(self, ec, guid):
182         super(LinuxApplication, self).__init__(ec, guid)
183         self._pid = None
184         self._ppid = None
185         self._node = None
186         self._home = "app-{}".format(self.guid)
187
188         # whether the command should run in foreground attached
189         # to a terminal
190         self._in_foreground = False
191
192         # whether to use sudo to kill the application process
193         self._sudo_kill = False
194
195         # keep a reference to the running process handler when 
196         # the command is not executed as remote daemon in background
197         self._proc = None
198
199         # timestamp of last state check of the application
200         self._last_state_check = tnow()
201         
202     def log_message(self, msg):
203         return " guid {} - host {} - {} "\
204             .format(self.guid, self.node.get("hostname"), msg)
205
206     @property
207     def node(self):
208         if not self._node:
209             node = self.get_connected(LinuxNode.get_rtype())
210             if not node: 
211                 msg = "Application {} guid {} NOT connected to Node"\
212                       .format(self._rtype, self.guid)
213                 raise RuntimeError(msg)
214
215             self._node = node[0]
216
217         return self._node
218
219     @property
220     def app_home(self):
221         return os.path.join(self.node.exp_home, self._home)
222
223     @property
224     def run_home(self):
225         return os.path.join(self.app_home, self.ec.run_id)
226
227     @property
228     def pid(self):
229         return self._pid
230
231     @property
232     def ppid(self):
233         return self._ppid
234
235     @property
236     def in_foreground(self):
237         """
238         Returns True if the command needs to be executed in foreground.
239         This means that command will be executed using 'execute' instead of
240         'run' ('run' executes a command in background and detached from the 
241         terminal)
242         
243         When using X11 forwarding option, the command can not run in background
244         and detached from a terminal, since we need to keep the terminal attached 
245         to interact with it.
246         """
247         return self.get("forwardX11") or self._in_foreground
248
249     def trace_filepath(self, filename):
250         return os.path.join(self.run_home, filename)
251
252     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
253         self.info("Retrieving '{}' trace {} ".format(name, attr))
254
255         path = self.trace_filepath(name)
256         logger.debug("trace: path= {}".format(path))
257         
258         command = "(test -f {} && echo 'success') || echo 'error'".format(path)
259         (out, err), proc = self.node.execute(command)
260
261         if (err and proc.poll()) or out.find("error") != -1:
262             msg = " Couldn't find trace {} ".format(name)
263             self.error(msg, out, err)
264             return None
265     
266         if attr == TraceAttr.PATH:
267             return path
268
269         if attr == TraceAttr.ALL:
270             (out, err), proc = self.node.check_output(self.run_home, name)
271             
272             if proc.poll():
273                 msg = " Couldn't read trace {} ".format(name)
274                 self.error(msg, out, err)
275                 return None
276
277             return out
278
279         if attr == TraceAttr.STREAM:
280             cmd = "dd if={} bs={} count=1 skip={}".format(path, block, offset)
281         elif attr == TraceAttr.SIZE:
282             cmd = "stat -c {} ".format(path)
283
284         (out, err), proc = self.node.execute(cmd)
285
286         if proc.poll():
287             msg = " Couldn't find trace {} ".format(name)
288             self.error(msg, out, err)
289             return None
290         
291         if attr == TraceAttr.SIZE:
292             out = int(out.strip())
293
294         return out
295
296     def do_provision(self):
297         # take a snapshot of the system if user is root
298         # to ensure that cleanProcess will not kill
299         # pre-existent processes
300         if self.node.get("username") == 'root':
301             import pickle
302             procs = dict()
303             ps_aux = "ps aux | awk '{print $2,$11}'"
304             (out, err), proc = self.node.execute(ps_aux)
305             if len(out) != 0:
306                 for line in out.strip().split("\n"):
307                     parts = line.strip().split(" ")
308                     procs[parts[0]] = parts[1]
309                 with open("/tmp/save.proc", "wb") as pickle_file:
310                     pickle.dump(procs, pickle_file)
311             
312         # create run dir for application
313         self.node.mkdir(self.run_home)
314    
315         # List of all the provision methods to invoke
316         steps = [
317             # upload sources
318             self.upload_sources,
319             # upload files
320             self.upload_files,
321             # upload binaries
322             self.upload_binaries,
323             # upload libraries
324             self.upload_libraries,
325             # upload code
326             self.upload_code,
327             # upload stdin
328             self.upload_stdin,
329             # install dependencies
330             self.install_dependencies,
331             # build
332             self.build,
333             # Install
334             self.install,
335         ]
336
337         command = []
338
339         # Since provisioning takes a long time, before
340         # each step we check that the EC is still 
341         for step in steps:
342             if self.ec.abort:
343                 self.debug("Interrupting provisioning. EC says 'ABORT")
344                 return
345             
346             ret = step()
347             if ret:
348                 command.append(ret)
349
350         # upload deploy script
351         deploy_command = ";".join(command)
352         self.execute_deploy_command(deploy_command)
353
354         # upload start script
355         self.upload_start_command()
356        
357         self.info("Provisioning finished")
358
359         super(LinuxApplication, self).do_provision()
360
361     def upload_start_command(self, overwrite = False):
362         # Upload command to remote bash script
363         # - only if command can be executed in background and detached
364         command = self.get("command")
365
366         if command and not self.in_foreground:
367 #            self.info("Uploading command '{}'".format(command))
368
369             # replace application specific paths in the command
370             command = self.replace_paths(command)
371             # replace application specific paths in the environment
372             env = self.get("env")
373             env = env and self.replace_paths(env)
374
375             shfile = os.path.join(self.app_home, "start.sh")
376
377             self.node.upload_command(command, 
378                                      shfile = shfile,
379                                      env = env,
380                                      overwrite = overwrite)
381
382     def execute_deploy_command(self, command, prefix="deploy"):
383         if command:
384             # replace application specific paths in the command
385             command = self.replace_paths(command)
386             
387             # replace application specific paths in the environment
388             env = self.get("env")
389             env = env and self.replace_paths(env)
390
391             # Upload the command to a bash script and run it
392             # in background ( but wait until the command has
393             # finished to continue )
394             shfile = os.path.join(self.app_home, "{}.sh".format(prefix))
395             # low-level spawn tools in both sshfuncs and execfuncs
396             # expect stderr=sshfuncs.STDOUT to mean std{out,err} are merged
397             stderr = "{}_stderr".format(prefix) \
398                      if self.get("splitStderr") \
399                         else STDOUT
400             print("{} : prefix = {}, command={}, stderr={}"
401                   .format(self, prefix, command, stderr))
402             self.node.run_and_wait(command, self.run_home,
403                                    shfile = shfile, 
404                                    overwrite = False,
405                                    pidfile = "{}_pidfile".format(prefix), 
406                                    ecodefile = "{}_exitcode".format(prefix), 
407                                    stdout = "{}_stdout".format(prefix),
408                                    stderr = stderr)
409
410     def upload_sources(self, sources = None, src_dir = None):
411         if not sources:
412             sources = self.get("sources")
413    
414         command = ""
415
416         if not src_dir:
417             src_dir = self.node.src_dir
418
419         if sources:
420             self.info("Uploading sources ")
421
422             sources = [str.strip(source) for source in sources.split(";")]
423
424             # Separate sources that should be downloaded from 
425             # the web, from sources that should be uploaded from
426             # the local machine
427             command = []
428             for source in list(sources):
429                 if source.startswith("http") or source.startswith("https"):
430                     # remove the hhtp source from the sources list
431                     sources.remove(source)
432
433                     command.append(
434                         " ( " 
435                         # Check if the source already exists
436                         " ls {src_dir}/{basename} "
437                         " || ( "
438                         # If source doesn't exist, download it and check
439                         # that it it downloaded ok
440                         "   wget -c --directory-prefix={src_dir} {source} && "
441                         "   ls {src_dir}/{basename} "
442                         " ) ) ".format(
443                             basename = os.path.basename(source),
444                             source = source,
445                             src_dir = src_dir
446                         ))
447
448             command = " && ".join(command)
449
450             # replace application specific paths in the command
451             command = self.replace_paths(command)
452        
453             if sources:
454                 sources = ';'.join(sources)
455                 self.node.upload(sources, src_dir, overwrite = False)
456
457         return command
458
459     def upload_files(self, files = None):
460         if not files:
461             files = self.get("files")
462
463         if files:
464             self.info("Uploading files {} ".format(files))
465             self.node.upload(files, self.node.share_dir, overwrite = False)
466
467     def upload_libraries(self, libs = None):
468         if not libs:
469             libs = self.get("libs")
470
471         if libs:
472             self.info("Uploading libraries {} ".format(libs))
473             self.node.upload(libs, self.node.lib_dir, overwrite = False)
474
475     def upload_binaries(self, bins = None):
476         if not bins:
477             bins = self.get("bins")
478
479         if bins:
480             self.info("Uploading binaries {} ".format(bins))
481             self.node.upload(bins, self.node.bin_dir, overwrite = False)
482
483     def upload_code(self, code = None):
484         if not code:
485             code = self.get("code")
486
487         if code:
488             self.info("Uploading code")
489             dst = os.path.join(self.app_home, "code")
490             self.node.upload(code, dst, overwrite = False, text = True, executable = True)
491
492     def upload_stdin(self, stdin = None):
493         if not stdin:
494            stdin = self.get("stdin")
495
496         if stdin:
497             # create dir for sources
498             self.info("Uploading stdin")
499             
500             # upload stdin file to ${SHARE_DIR} directory
501             if os.path.isfile(stdin):
502                 basename = os.path.basename(stdin)
503                 dst = os.path.join(self.node.share_dir, basename)
504             else:
505                 dst = os.path.join(self.app_home, "stdin")
506
507             self.node.upload(stdin, dst, overwrite = False, text = True)
508
509             # create "stdin" symlink on ${APP_HOME} directory
510             command = "( cd {app_home} ; [ ! -f stdin ] &&  ln -s {stdin} stdin )"\
511                       .format(app_home = self.app_home, stdin = dst)
512             return command
513
514     def install_dependencies(self, depends = None):
515         if not depends:
516             depends = self.get("depends")
517
518         if depends:
519             self.info("Installing dependencies {}".format(depends))
520             return self.node.install_packages_command(depends)
521
522     def build(self, build = None):
523         if not build:
524             build = self.get("build")
525
526         if build:
527             self.info("Building sources ")
528             
529             # replace application specific paths in the command
530             return self.replace_paths(build)
531
532     def install(self, install = None):
533         if not install:
534             install = self.get("install")
535
536         if install:
537             self.info("Installing sources ")
538
539             # replace application specific paths in the command
540             return self.replace_paths(install)
541
542     def do_deploy(self):
543         # Wait until node is associated and deployed
544         node = self.node
545         if not node or node.state < ResourceState.READY:
546             self.debug("---- RESCHEDULING DEPLOY ---- node state {} ".format(self.node.state))
547             self.ec.schedule(self.reschedule_delay, self.deploy)
548         else:
549             command = self.get("command") or ""
550             self.info("Deploying command '{}' ".format(command))
551             self.do_discover()
552             self.do_provision()
553
554             super(LinuxApplication, self).do_deploy()
555    
556     def do_start(self):
557         command = self.get("command")
558
559         self.info("Starting command '{}'".format(command))
560
561         if not command:
562             # If no command was given (i.e. Application was used for dependency
563             # installation), then the application is directly marked as STOPPED
564             super(LinuxApplication, self).set_stopped()
565         else:
566             if self.in_foreground:
567                 self._run_in_foreground()
568             else:
569                 self._run_in_background()
570
571             super(LinuxApplication, self).do_start()
572
573     def _run_in_foreground(self):
574         command = self.get("command")
575         sudo = self.get("sudo") or False
576         x11 = self.get("forwardX11")
577         env = self.get("env")
578
579         # Command will be launched in foreground and attached to the
580         # terminal using the node 'execute' in non blocking mode.
581
582         # We save the reference to the process in self._proc 
583         # to be able to kill the process from the stop method.
584         # We also set blocking = False, since we don't want the
585         # thread to block until the execution finishes.
586         (out, err), self._proc = self.execute_command(command, 
587                                                       env = env,
588                                                       sudo = sudo,
589                                                       forward_x11 = x11,
590                                                       blocking = False)
591
592         if self._proc.poll():
593             self.error(msg, out, err)
594             raise RuntimeError(msg)
595
596     def _run_in_background(self):
597         command = self.get("command")
598         env = self.get("env")
599         sudo = self.get("sudo") or False
600
601         stdout = "stdout"
602         # low-level spawn tools in both sshfuncs and execfuncs
603         # expect stderr=sshfuncs.STDOUT to mean std{out,err} are merged
604         stderr = "stderr" \
605                  if self.get("splitStderr") \
606                     else STDOUT
607         stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
608                 else None
609
610         # Command will be run as a daemon in baground and detached from any
611         # terminal.
612         # The command to run was previously uploaded to a bash script
613         # during deployment, now we launch the remote script using 'run'
614         # method from the node.
615         cmd = "bash {}".format(os.path.join(self.app_home, "start.sh"))
616         (out, err), proc = self.node.run(cmd, self.run_home, 
617                                          stdin = stdin, 
618                                          stdout = stdout,
619                                          stderr = stderr,
620                                          sudo = sudo)
621
622         # check if execution errors occurred
623         msg = " Failed to start command '{}' ".format(command)
624         
625         if proc.poll():
626             self.error(msg, out, err)
627             raise RuntimeError(msg)
628     
629         # Wait for pid file to be generated
630         pid, ppid = self.node.wait_pid(self.run_home)
631         if pid: self._pid = int(pid)
632         if ppid: self._ppid = int(ppid)
633
634         # If the process is not running, check for error information
635         # on the remote machine
636         if not self.pid or not self.ppid:
637             (out, err), proc = self.node.check_errors(self.run_home,
638                                                       stderr = stderr) 
639
640             # Out is what was written in the stderr file
641             if err:
642                 msg = " Failed to start command '{}' ".format(command)
643                 self.error(msg, out, err)
644                 raise RuntimeError(msg)
645     
646     def do_stop(self):
647         """ Stops application execution
648         """
649         command = self.get('command') or ''
650
651         if self.state == ResourceState.STARTED:
652         
653             self.info("Stopping command '{}' ".format(command))
654         
655             # If the command is running in foreground (it was launched using
656             # the node 'execute' method), then we use the handler to the Popen
657             # process to kill it. Else we send a kill signal using the pid and ppid
658             # retrieved after running the command with the node 'run' method
659             if self._proc:
660                 self._proc.kill()
661             else:
662                 # Only try to kill the process if the pid and ppid
663                 # were retrieved
664                 if self.pid and self.ppid:
665                     (out, err), proc = self.node.kill(self.pid, self.ppid,
666                                                       sudo = self._sudo_kill)
667
668                     """
669                     # TODO: check if execution errors occurred
670                     if (proc and proc.poll()) or err:
671                         msg = " Failed to STOP command '{}' ".format(self.get("command"))
672                         self.error(msg, out, err)
673                     """
674
675             super(LinuxApplication, self).do_stop()
676
677     def do_release(self):
678         self.info("Releasing resource")
679
680         self.do_stop()
681         
682         tear_down = self.get("tearDown")
683         if tear_down:
684             self.node.execute(tear_down)
685
686         hard_release = self.get("hardRelease")
687         if hard_release:
688             self.node.rmdir(self.app_home)
689
690         super(LinuxApplication, self).do_release()
691         
692     @property
693     def state(self):
694         """ Returns the state of the application
695         """
696         if self._state == ResourceState.STARTED:
697             if self.in_foreground:
698                 # Check if the process we used to execute the command
699                 # is still running ...
700                 retcode = self._proc.poll()
701
702                 # retcode == None -> running
703                 # retcode > 0 -> error
704                 # retcode == 0 -> finished
705                 if retcode:
706                     out = ""
707                     msg = " Failed to execute command '{}'".format(self.get("command"))
708                     err = self._proc.stderr.read()
709                     self.error(msg, out, err)
710                     self.do_fail()
711
712                 elif retcode == 0:
713                     self.set_stopped()
714             else:
715                 # We need to query the status of the command we launched in 
716                 # background. In order to avoid overwhelming the remote host and
717                 # the local processor with too many ssh queries, the state is only
718                 # requested every 'state_check_delay' seconds.
719                 state_check_delay = 0.5
720                 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
721                     if self.pid and self.ppid:
722                         # Make sure the process is still running in background
723                         status = self.node.status(self.pid, self.ppid)
724
725                         if status == ProcStatus.FINISHED:
726                             # If the program finished, check if execution
727                             # errors occurred
728                             (out, err), proc \
729                                 = self.node.check_errors(self.run_home)
730
731                             if err:
732                                 # Thierry : there's nothing wrong with a non-empty
733                                 # stderr, is there ?
734                                 #msg = "Failed to execute command '{}'"\
735                                 #      .format(self.get("command"))
736                                 #self.error(msg, out, err)
737                                 #self.do_fail()
738                                 # xxx TODO  OTOH it would definitely make sense
739                                 # to check the exitcode
740                                 pass
741                             else:
742                                 self.set_stopped()
743
744                     self._last_state_check = tnow()
745
746         return self._state
747
748     def execute_command(self, command, 
749                         env = None,
750                         sudo = False,
751                         tty = False,
752                         forward_x11 = False,
753                         blocking = False):
754
755         environ = ""
756         if env:
757             environ = self.node.format_environment(env, inline = True)
758         command = environ + command
759         command = self.replace_paths(command)
760
761         return self.node.execute(command,
762                                  sudo = sudo,
763                                  tty = tty,
764                                  forward_x11 = forward_x11,
765                                  blocking = blocking)
766
767     def replace_paths(self, command, node = None, app_home = None, run_home = None):
768         """
769         Replace all special path tags with shell-escaped actual paths.
770         """
771         if not node:
772             node = self.node
773
774         if not app_home:
775             app_home = self.app_home
776
777         if not run_home:
778             run_home = self.run_home
779
780         return ( command
781                  .replace("${USR}", node.usr_dir)
782                  .replace("${LIB}", node.lib_dir)
783                  .replace("${BIN}", node.bin_dir)
784                  .replace("${SRC}", node.src_dir)
785                  .replace("${SHARE}", node.share_dir)
786                  .replace("${EXP}", node.exp_dir)
787                  .replace("${EXP_HOME}", node.exp_home)
788                  .replace("${APP_HOME}", app_home)
789                  .replace("${RUN_HOME}", run_home)
790                  .replace("${NODE_HOME}", node.node_home)
791                  .replace("${HOME}", node.home_dir)
792                  # a shortcut to refer to the file uploaded as 'code = '
793                  .replace("${CODE}", "{}/code".format(app_home))
794              )
795
796     def valid_connection(self, guid):
797         # TODO: Validate!
798         return True