Adding working UdpTunnel for Planetlab and Linux
[nepi.git] / src / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
23     reschedule_delay
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sshfuncs import ProcStatus
26 from nepi.util.timefuncs import tnow, tdiffsec
27
28 import os
29 import subprocess
30
31 # TODO: Resolve wildcards in commands!!
32
33 @clsinit
34 class LinuxApplication(ResourceManager):
35     """
36     .. class:: Class Args :
37       
38         :param ec: The Experiment controller
39         :type ec: ExperimentController
40         :param guid: guid of the RM
41         :type guid: int
42
43     .. note::
44
45     A LinuxApplication RM represents a process that can be executed in
46     a remote Linux host using SSH.
47
48     The LinuxApplication RM takes care of uploadin sources and any files
49     needed to run the experiment, to the remote host. 
50     It also allows to provide source compilation (build) and installation 
51     instructions, and takes care of automating the sources build and 
52     installation tasks for the user.
53
54     It is important to note that files uploaded to the remote host have
55     two possible scopes: single-experiment or multi-experiment.
56     Single experiment files are those that will not be re-used by other 
57     experiments. Multi-experiment files are those that will.
58     Sources and shared files are always made available to all experiments.
59
60     Directory structure:
61
62     The directory structure used by LinuxApplication RM at the Linux
63     host is the following:
64
65         ${HOME}/nepi-usr --> Base directory for multi-experiment files
66                       |
67         ${LIB}        |- /lib --> Base directory for libraries
68         ${BIN}        |- /bin --> Base directory for binary files
69         ${SRC}        |- /src --> Base directory for sources
70         ${SHARE}      |- /share --> Base directory for other files
71
72         ${HOME}/nepi-exp --> Base directory for single-experiment files
73                       |
74         ${EXP_HOME}   |- /<exp-id>  --> Base directory for experiment exp-id
75                           |
76         ${APP_HOME}       |- /<app-guid> --> Base directory for application 
77                                |     specific files (e.g. command.sh, input)
78                                | 
79         ${RUN_HOME}            |- /<run-id> --> Base directory for run specific
80
81     """
82
83     _rtype = "LinuxApplication"
84
85     @classmethod
86     def _register_attributes(cls):
87         command = Attribute("command", "Command to execute at application start. "
88                 "Note that commands will be executed in the ${RUN_HOME} directory, "
89                 "make sure to take this into account when using relative paths. ", 
90                 flags = Flags.ExecReadOnly)
91         forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", 
92                 flags = Flags.ExecReadOnly)
93         env = Attribute("env", "Environment variables string for command execution",
94                 flags = Flags.ExecReadOnly)
95         sudo = Attribute("sudo", "Run with root privileges", 
96                 flags = Flags.ExecReadOnly)
97         depends = Attribute("depends", 
98                 "Space-separated list of packages required to run the application",
99                 flags = Flags.ExecReadOnly)
100         sources = Attribute("sources", 
101                 "Space-separated list of regular files to be uploaded to ${SRC} "
102                 "directory prior to building. Archives won't be expanded automatically. "
103                 "Sources are globally available for all experiments unless "
104                 "cleanHome is set to True (This will delete all sources). ",
105                 flags = Flags.ExecReadOnly)
106         files = Attribute("files", 
107                 "Space-separated list of regular miscellaneous files to be uploaded "
108                 "to ${SHARE} directory. "
109                 "Files are globally available for all experiments unless "
110                 "cleanHome is set to True (This will delete all files). ",
111                 flags = Flags.ExecReadOnly)
112         libs = Attribute("libs", 
113                 "Space-separated list of libraries (e.g. .so files) to be uploaded "
114                 "to ${LIB} directory. "
115                 "Libraries are globally available for all experiments unless "
116                 "cleanHome is set to True (This will delete all files). ",
117                 flags = Flags.ExecReadOnly)
118         bins = Attribute("bins", 
119                 "Space-separated list of binary files to be uploaded "
120                 "to ${BIN} directory. "
121                 "Binaries are globally available for all experiments unless "
122                 "cleanHome is set to True (This will delete all files). ",
123                 flags = Flags.ExecReadOnly)
124         code = Attribute("code", 
125                 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
126                 flags = Flags.ExecReadOnly)
127         build = Attribute("build", 
128                 "Build commands to execute after deploying the sources. "
129                 "Sources are uploaded to the ${SRC} directory and code "
130                 "is uploaded to the ${APP_HOME} directory. \n"
131                 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
132                 "./configure && make && make clean.\n"
133                 "Make sure to make the build commands return with a nonzero exit "
134                 "code on error.",
135                 flags = Flags.ReadOnly)
136         install = Attribute("install", 
137                 "Commands to transfer built files to their final destinations. "
138                 "Install commands are executed after build commands. ",
139                 flags = Flags.ReadOnly)
140         stdin = Attribute("stdin", "Standard input for the 'command'", 
141                 flags = Flags.ExecReadOnly)
142         tear_down = Attribute("tearDown", "Command to be executed just before " 
143                 "releasing the resource", 
144                 flags = Flags.ReadOnly)
145
146         cls._register_attribute(command)
147         cls._register_attribute(forward_x11)
148         cls._register_attribute(env)
149         cls._register_attribute(sudo)
150         cls._register_attribute(depends)
151         cls._register_attribute(sources)
152         cls._register_attribute(code)
153         cls._register_attribute(files)
154         cls._register_attribute(bins)
155         cls._register_attribute(libs)
156         cls._register_attribute(build)
157         cls._register_attribute(install)
158         cls._register_attribute(stdin)
159         cls._register_attribute(tear_down)
160
161     @classmethod
162     def _register_traces(cls):
163         stdout = Trace("stdout", "Standard output stream")
164         stderr = Trace("stderr", "Standard error stream")
165
166         cls._register_trace(stdout)
167         cls._register_trace(stderr)
168
169     def __init__(self, ec, guid):
170         super(LinuxApplication, self).__init__(ec, guid)
171         self._pid = None
172         self._ppid = None
173         self._home = "app-%s" % self.guid
174         # whether the command should run in foreground attached
175         # to a terminal
176         self._in_foreground = False
177
178         # whether to use sudo to kill the application process
179         self._sudo_kill = False
180
181         # keep a reference to the running process handler when 
182         # the command is not executed as remote daemon in background
183         self._proc = None
184
185         # timestamp of last state check of the application
186         self._last_state_check = tnow()
187
188     def log_message(self, msg):
189         return " guid %d - host %s - %s " % (self.guid, 
190                 self.node.get("hostname"), msg)
191
192     @property
193     def node(self):
194         node = self.get_connected(LinuxNode.rtype())
195         if node: return node[0]
196         return None
197
198     @property
199     def app_home(self):
200         return os.path.join(self.node.exp_home, self._home)
201
202     @property
203     def run_home(self):
204         return os.path.join(self.app_home, self.ec.run_id)
205
206     @property
207     def pid(self):
208         return self._pid
209
210     @property
211     def ppid(self):
212         return self._ppid
213
214     @property
215     def in_foreground(self):
216         """ Returns True if the command needs to be executed in foreground.
217         This means that command will be executed using 'execute' instead of
218         'run' ('run' executes a command in background and detached from the 
219         terminal)
220         
221         When using X11 forwarding option, the command can not run in background
222         and detached from a terminal, since we need to keep the terminal attached 
223         to interact with it.
224         """
225         return self.get("forwardX11") or self._in_foreground
226
227     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
228         self.info("Retrieving '%s' trace %s " % (name, attr))
229
230         path = os.path.join(self.run_home, name)
231         
232         command = "(test -f %s && echo 'success') || echo 'error'" % path
233         (out, err), proc = self.node.execute(command)
234
235         if (err and proc.poll()) or out.find("error") != -1:
236             msg = " Couldn't find trace %s " % name
237             self.error(msg, out, err)
238             return None
239     
240         if attr == TraceAttr.PATH:
241             return path
242
243         if attr == TraceAttr.ALL:
244             (out, err), proc = self.node.check_output(self.run_home, name)
245             
246             if proc.poll():
247                 msg = " Couldn't read trace %s " % name
248                 self.error(msg, out, err)
249                 return None
250
251             return out
252
253         if attr == TraceAttr.STREAM:
254             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
255         elif attr == TraceAttr.SIZE:
256             cmd = "stat -c%%s %s " % path
257
258         (out, err), proc = self.node.execute(cmd)
259
260         if proc.poll():
261             msg = " Couldn't find trace %s " % name
262             self.error(msg, out, err)
263             return None
264         
265         if attr == TraceAttr.SIZE:
266             out = int(out.strip())
267
268         return out
269             
270     def provision(self):
271         # create run dir for application
272         self.node.mkdir(self.run_home)
273    
274         # List of all the provision methods to invoke
275         steps = [
276             # upload sources
277             self.upload_sources,
278             # upload files
279             self.upload_files,
280             # upload binaries
281             self.upload_binaries,
282             # upload libraries
283             self.upload_libraries,
284             # upload code
285             self.upload_code,
286             # upload stdin
287             self.upload_stdin,
288             # install dependencies
289             self.install_dependencies,
290             # build
291             self.build,
292             # Install
293             self.install]
294
295         command = []
296
297         # Since provisioning takes a long time, before
298         # each step we check that the EC is still 
299         for step in steps:
300             if self.ec.finished:
301                 raise RuntimeError, "EC finished"
302             
303             ret = step()
304             if ret:
305                 command.append(ret)
306
307         # upload deploy script
308         deploy_command = ";".join(command)
309         self.execute_deploy_command(deploy_command)
310
311         # upload start script
312         self.upload_start_command()
313        
314         self.info("Provisioning finished")
315
316         super(LinuxApplication, self).provision()
317
318     def upload_start_command(self):
319         # Upload command to remote bash script
320         # - only if command can be executed in background and detached
321         command = self.get("command")
322
323         if command and not self.in_foreground:
324             self.info("Uploading command '%s'" % command)
325
326             # replace application specific paths in the command
327             command = self.replace_paths(command)
328             
329             # replace application specific paths in the environment
330             env = self.get("env")
331             env = env and self.replace_paths(env)
332
333             shfile = os.path.join(self.app_home, "start.sh")
334
335             self.node.upload_command(command, 
336                     shfile = shfile,
337                     env = env,
338                     overwrite = False)
339
340     def execute_deploy_command(self, command):
341         if command:
342             # Upload the command to a bash script and run it
343             # in background ( but wait until the command has
344             # finished to continue )
345             shfile = os.path.join(self.app_home, "deploy.sh")
346             self.node.run_and_wait(command, self.run_home,
347                     shfile = shfile, 
348                     overwrite = False,
349                     pidfile = "deploy_pidfile", 
350                     ecodefile = "deploy_exitcode", 
351                     stdout = "deploy_stdout", 
352                     stderr = "deploy_stderr")
353
354     def upload_sources(self):
355         sources = self.get("sources")
356    
357         command = ""
358
359         if sources:
360             self.info("Uploading sources ")
361
362             sources = sources.split(' ')
363
364             # Separate sources that should be downloaded from 
365             # the web, from sources that should be uploaded from
366             # the local machine
367             command = []
368             for source in list(sources):
369                 if source.startswith("http") or source.startswith("https"):
370                     # remove the hhtp source from the sources list
371                     sources.remove(source)
372
373                     command.append( " ( " 
374                             # Check if the source already exists
375                             " ls ${SRC}/%(basename)s "
376                             " || ( "
377                             # If source doesn't exist, download it and check
378                             # that it it downloaded ok
379                             "   wget -c --directory-prefix=${SRC} %(source)s && "
380                             "   ls ${SRC}/%(basename)s "
381                             " ) ) " % {
382                                 "basename": os.path.basename(source),
383                                 "source": source
384                                 })
385
386             command = " && ".join(command)
387
388             # replace application specific paths in the command
389             command = self.replace_paths(command)
390        
391             if sources:
392                 sources = ' '.join(sources)
393                 self.node.upload(sources, self.node.src_dir, overwrite = False)
394
395         return command
396
397     def upload_files(self):
398         files = self.get("files")
399
400         if files:
401             self.info("Uploading files %s " % files)
402             self.node.upload(files, self.node.share_dir, overwrite = False)
403
404     def upload_libraries(self):
405         libs = self.get("libs")
406
407         if libs:
408             self.info("Uploading libraries %s " % libaries)
409             self.node.upload(libs, self.node.lib_dir, overwrite = False)
410
411     def upload_binaries(self):
412         bins = self.get("bins")
413
414         if bins:
415             self.info("Uploading binaries %s " % binaries)
416             self.node.upload(bins, self.node.bin_dir, overwrite = False)
417
418     def upload_code(self):
419         code = self.get("code")
420
421         if code:
422             self.info("Uploading code")
423
424             dst = os.path.join(self.app_home, "code")
425             self.node.upload(code, dst, overwrite = False, text = True)
426
427     def upload_stdin(self):
428         stdin = self.get("stdin")
429         if stdin:
430             # create dir for sources
431             self.info("Uploading stdin")
432             
433             # upload stdin file to ${SHARE_DIR} directory
434             basename = os.path.basename(stdin)
435             dst = os.path.join(self.node.share_dir, basename)
436             self.node.upload(stdin, dst, overwrite = False, text = True)
437
438             # create "stdin" symlink on ${APP_HOME} directory
439             command = "( cd %(app_home)s ; [ ! -f stdin ] &&  ln -s %(stdin)s stdin )" % ({
440                 "app_home": self.app_home, 
441                 "stdin": dst })
442
443             return command
444
445     def install_dependencies(self):
446         depends = self.get("depends")
447         if depends:
448             self.info("Installing dependencies %s" % depends)
449             self.node.install_packages(depends, self.app_home, self.run_home)
450
451     def build(self):
452         build = self.get("build")
453
454         if build:
455             self.info("Building sources ")
456             
457             # replace application specific paths in the command
458             return self.replace_paths(build)
459
460     def install(self):
461         install = self.get("install")
462
463         if install:
464             self.info("Installing sources ")
465
466             # replace application specific paths in the command
467             return self.replace_paths(install)
468
469     def deploy(self):
470         # Wait until node is associated and deployed
471         node = self.node
472         if not node or node.state < ResourceState.READY:
473             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
474             self.ec.schedule(reschedule_delay, self.deploy)
475         else:
476             try:
477                 command = self.get("command") or ""
478                 self.info("Deploying command '%s' " % command)
479                 self.discover()
480                 self.provision()
481             except:
482                 self.fail()
483                 raise
484
485             super(LinuxApplication, self).deploy()
486
487     def start(self):
488         command = self.get("command")
489
490         self.info("Starting command '%s'" % command)
491
492         if not command:
493             # If no command was given (i.e. Application was used for dependency
494             # installation), then the application is directly marked as FINISHED
495             self._state = ResourceState.FINISHED
496         else:
497
498             if self.in_foreground:
499                 self._run_in_foreground()
500             else:
501                 self._run_in_background()
502
503             super(LinuxApplication, self).start()
504
505     def _run_in_foreground(self):
506         command = self.get("command")
507         sudo = self.get("sudo") or False
508         x11 = self.get("forwardX11")
509
510         # For a command being executed in foreground, if there is stdin,
511         # it is expected to be text string not a file or pipe
512         stdin = self.get("stdin") or None
513
514         # Command will be launched in foreground and attached to the
515         # terminal using the node 'execute' in non blocking mode.
516
517         # We save the reference to the process in self._proc 
518         # to be able to kill the process from the stop method.
519         # We also set blocking = False, since we don't want the
520         # thread to block until the execution finishes.
521         (out, err), self._proc = self.execute_command(self, command, 
522                 env = env,
523                 sudo = sudo,
524                 stdin = stdin,
525                 forward_x11 = x11,
526                 blocking = False)
527
528         if self._proc.poll():
529             self.fail()
530             self.error(msg, out, err)
531             raise RuntimeError, msg
532
533     def _run_in_background(self):
534         command = self.get("command")
535         env = self.get("env")
536         sudo = self.get("sudo") or False
537
538         stdout = "stdout"
539         stderr = "stderr"
540         stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
541                 else None
542
543         # Command will be run as a daemon in baground and detached from any
544         # terminal.
545         # The command to run was previously uploaded to a bash script
546         # during deployment, now we launch the remote script using 'run'
547         # method from the node.
548         cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
549         (out, err), proc = self.node.run(cmd, self.run_home, 
550             stdin = stdin, 
551             stdout = stdout,
552             stderr = stderr,
553             sudo = sudo)
554
555         # check if execution errors occurred
556         msg = " Failed to start command '%s' " % command
557         
558         if proc.poll():
559             self.fail()
560             self.error(msg, out, err)
561             raise RuntimeError, msg
562     
563         # Wait for pid file to be generated
564         pid, ppid = self.node.wait_pid(self.run_home)
565         if pid: self._pid = int(pid)
566         if ppid: self._ppid = int(ppid)
567
568         # If the process is not running, check for error information
569         # on the remote machine
570         if not self.pid or not self.ppid:
571             (out, err), proc = self.node.check_errors(self.run_home,
572                     stderr = stderr) 
573
574             # Out is what was written in the stderr file
575             if err:
576                 self.fail()
577                 msg = " Failed to start command '%s' " % command
578                 self.error(msg, out, err)
579                 raise RuntimeError, msg
580         
581     def stop(self):
582         """ Stops application execution
583         """
584         command = self.get('command') or ''
585
586         if self.state == ResourceState.STARTED:
587         
588             stopped = True
589
590             self.info("Stopping command '%s'" % command)
591         
592             # If the command is running in foreground (it was launched using
593             # the node 'execute' method), then we use the handler to the Popen
594             # process to kill it. Else we send a kill signal using the pid and ppid
595             # retrieved after running the command with the node 'run' method
596
597             if self._proc:
598                 self._proc.kill()
599             else:
600                 # Only try to kill the process if the pid and ppid
601                 # were retrieved
602                 if self.pid and self.ppid:
603                     (out, err), proc = self.node.kill(self.pid, self.ppid,
604                             sudo = self._sudo_kill)
605
606                     if proc.poll() or err:
607                         # check if execution errors occurred
608                         msg = " Failed to STOP command '%s' " % self.get("command")
609                         self.error(msg, out, err)
610                         self.fail()
611                         stopped = False
612
613             if stopped:
614                 super(LinuxApplication, self).stop()
615
616     def release(self):
617         self.info("Releasing resource")
618
619         tear_down = self.get("tearDown")
620         if tear_down:
621             self.node.execute(tear_down)
622
623         self.stop()
624
625         if self.state == ResourceState.STOPPED:
626             super(LinuxApplication, self).release()
627     
628     @property
629     def state(self):
630         """ Returns the state of the application
631         """
632         if self._state == ResourceState.STARTED:
633             if self.in_foreground:
634                 # Check if the process we used to execute the command
635                 # is still running ...
636                 retcode = self._proc.poll()
637
638                 # retcode == None -> running
639                 # retcode > 0 -> error
640                 # retcode == 0 -> finished
641                 if retcode:
642                     out = ""
643                     msg = " Failed to execute command '%s'" % self.get("command")
644                     err = self._proc.stderr.read()
645                     self.error(msg, out, err)
646                     self.fail()
647                 elif retcode == 0:
648                     self._state = ResourceState.FINISHED
649
650             else:
651                 # We need to query the status of the command we launched in 
652                 # background. In order to avoid overwhelming the remote host and
653                 # the local processor with too many ssh queries, the state is only
654                 # requested every 'state_check_delay' seconds.
655                 state_check_delay = 0.5
656                 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
657                     # check if execution errors occurred
658                     (out, err), proc = self.node.check_errors(self.run_home)
659
660                     if err:
661                         msg = " Failed to execute command '%s'" % self.get("command")
662                         self.error(msg, out, err)
663                         self.fail()
664
665                     elif self.pid and self.ppid:
666                         # No execution errors occurred. Make sure the background
667                         # process with the recorded pid is still running.
668                         status = self.node.status(self.pid, self.ppid)
669
670                         if status == ProcStatus.FINISHED:
671                             self._state = ResourceState.FINISHED
672
673                     self._last_state_check = tnow()
674
675         return self._state
676
677     def execute_command(self, command, 
678             env = None,
679             sudo = False,
680             stdin = None,
681             forward_x11 = False,
682             blocking = False):
683
684         environ = ""
685         if env:
686             environ = self.node.format_environment(env, inline = True)
687         command = environ + command
688         command = self.replace_paths(command)
689
690         return self.node.execute(command,
691                 sudo = sudo,
692                 stdin = stdin,
693                 forward_x11 = forward_x11,
694                 blocking = blocking)
695
696     def replace_paths(self, command):
697         """
698         Replace all special path tags with shell-escaped actual paths.
699         """
700         return ( command
701             .replace("${USR}", self.node.usr_dir)
702             .replace("${LIB}", self.node.lib_dir)
703             .replace("${BIN}", self.node.bin_dir)
704             .replace("${SRC}", self.node.src_dir)
705             .replace("${SHARE}", self.node.share_dir)
706             .replace("${EXP}", self.node.exp_dir)
707             .replace("${EXP_HOME}", self.node.exp_home)
708             .replace("${APP_HOME}", self.app_home)
709             .replace("${RUN_HOME}", self.run_home)
710             .replace("${NODE_HOME}", self.node.node_home)
711             .replace("${HOME}", self.node.home_dir)
712             )
713
714     def valid_connection(self, guid):
715         # TODO: Validate!
716         return True
717