Adding trace support for ns3 RMs
[nepi.git] / src / nepi / resources / linux / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.trace import Trace, TraceAttr
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23         ResourceState, reschedule_delay
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sshfuncs import ProcStatus
26 from nepi.util.timefuncs import tnow, tdiffsec
27
28 import os
29 import subprocess
30
31 # TODO: Resolve wildcards in commands!!
32 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
33
34 @clsinit_copy
35 class LinuxApplication(ResourceManager):
36     """
37     .. class:: Class Args :
38       
39         :param ec: The Experiment controller
40         :type ec: ExperimentController
41         :param guid: guid of the RM
42         :type guid: int
43
44     .. note::
45
46         A LinuxApplication RM represents a process that can be executed in
47         a remote Linux host using SSH.
48
49         The LinuxApplication RM takes care of uploadin sources and any files
50         needed to run the experiment, to the remote host. 
51         It also allows to provide source compilation (build) and installation 
52         instructions, and takes care of automating the sources build and 
53         installation tasks for the user.
54
55         It is important to note that files uploaded to the remote host have
56         two possible scopes: single-experiment or multi-experiment.
57         Single experiment files are those that will not be re-used by other 
58         experiments. Multi-experiment files are those that will.
59         Sources and shared files are always made available to all experiments.
60
61         Directory structure:
62
63         The directory structure used by LinuxApplication RM at the Linux
64         host is the following:
65
66         ${HOME}/nepi-usr --> Base directory for multi-experiment files
67                       |
68         ${LIB}        |- /lib --> Base directory for libraries
69         ${BIN}        |- /bin --> Base directory for binary files
70         ${SRC}        |- /src --> Base directory for sources
71         ${SHARE}      |- /share --> Base directory for other files
72
73         ${HOME}/nepi-exp --> Base directory for single-experiment files
74                       |
75         ${EXP_HOME}   |- /<exp-id>  --> Base directory for experiment exp-id
76                           |
77         ${APP_HOME}       |- /<app-guid> --> Base directory for application 
78                                |     specific files (e.g. command.sh, input)
79                                | 
80         ${RUN_HOME}            |- /<run-id> --> Base directory for run specific
81
82     """
83
84     _rtype = "LinuxApplication"
85     _help = "Runs an application on a Linux host with a BASH command "
86     _backend_type = "linux"
87
88     @classmethod
89     def _register_attributes(cls):
90         command = Attribute("command", "Command to execute at application start. "
91                 "Note that commands will be executed in the ${RUN_HOME} directory, "
92                 "make sure to take this into account when using relative paths. ", 
93                 flags = Flags.Design)
94         forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", 
95                 flags = Flags.Design)
96         env = Attribute("env", "Environment variables string for command execution",
97                 flags = Flags.Design)
98         sudo = Attribute("sudo", "Run with root privileges", 
99                 flags = Flags.Design)
100         depends = Attribute("depends", 
101                 "Space-separated list of packages required to run the application",
102                 flags = Flags.Design)
103         sources = Attribute("sources", 
104                 "Space-separated list of regular files to be uploaded to ${SRC} "
105                 "directory prior to building. Archives won't be expanded automatically. "
106                 "Sources are globally available for all experiments unless "
107                 "cleanHome is set to True (This will delete all sources). ",
108                 flags = Flags.Design)
109         files = Attribute("files", 
110                 "Space-separated list of regular miscellaneous files to be uploaded "
111                 "to ${SHARE} directory. "
112                 "Files are globally available for all experiments unless "
113                 "cleanHome is set to True (This will delete all files). ",
114                 flags = Flags.Design)
115         libs = Attribute("libs", 
116                 "Space-separated list of libraries (e.g. .so files) to be uploaded "
117                 "to ${LIB} directory. "
118                 "Libraries are globally available for all experiments unless "
119                 "cleanHome is set to True (This will delete all files). ",
120                 flags = Flags.Design)
121         bins = Attribute("bins", 
122                 "Space-separated list of binary files to be uploaded "
123                 "to ${BIN} directory. "
124                 "Binaries are globally available for all experiments unless "
125                 "cleanHome is set to True (This will delete all files). ",
126                 flags = Flags.Design)
127         code = Attribute("code", 
128                 "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
129                 flags = Flags.Design)
130         build = Attribute("build", 
131                 "Build commands to execute after deploying the sources. "
132                 "Sources are uploaded to the ${SRC} directory and code "
133                 "is uploaded to the ${APP_HOME} directory. \n"
134                 "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
135                 "./configure && make && make clean.\n"
136                 "Make sure to make the build commands return with a nonzero exit "
137                 "code on error.",
138                 flags = Flags.Design)
139         install = Attribute("install", 
140                 "Commands to transfer built files to their final destinations. "
141                 "Install commands are executed after build commands. ",
142                 flags = Flags.Design)
143         stdin = Attribute("stdin", "Standard input for the 'command'", 
144                 flags = Flags.Design)
145         tear_down = Attribute("tearDown", "Command to be executed just before " 
146                 "releasing the resource", 
147                 flags = Flags.Design)
148
149         cls._register_attribute(command)
150         cls._register_attribute(forward_x11)
151         cls._register_attribute(env)
152         cls._register_attribute(sudo)
153         cls._register_attribute(depends)
154         cls._register_attribute(sources)
155         cls._register_attribute(code)
156         cls._register_attribute(files)
157         cls._register_attribute(bins)
158         cls._register_attribute(libs)
159         cls._register_attribute(build)
160         cls._register_attribute(install)
161         cls._register_attribute(stdin)
162         cls._register_attribute(tear_down)
163
164     @classmethod
165     def _register_traces(cls):
166         stdout = Trace("stdout", "Standard output stream", enabled = True)
167         stderr = Trace("stderr", "Standard error stream", enabled = True)
168
169         cls._register_trace(stdout)
170         cls._register_trace(stderr)
171
172     def __init__(self, ec, guid):
173         super(LinuxApplication, self).__init__(ec, guid)
174         self._pid = None
175         self._ppid = None
176         self._home = "app-%s" % self.guid
177         # whether the command should run in foreground attached
178         # to a terminal
179         self._in_foreground = False
180
181         # whether to use sudo to kill the application process
182         self._sudo_kill = False
183
184         # keep a reference to the running process handler when 
185         # the command is not executed as remote daemon in background
186         self._proc = None
187
188         # timestamp of last state check of the application
189         self._last_state_check = tnow()
190
191     def log_message(self, msg):
192         return " guid %d - host %s - %s " % (self.guid, 
193                 self.node.get("hostname"), msg)
194
195     @property
196     def node(self):
197         node = self.get_connected(LinuxNode.get_rtype())
198         if node: return node[0]
199         return None
200
201     @property
202     def app_home(self):
203         return os.path.join(self.node.exp_home, self._home)
204
205     @property
206     def run_home(self):
207         return os.path.join(self.app_home, self.ec.run_id)
208
209     @property
210     def pid(self):
211         return self._pid
212
213     @property
214     def ppid(self):
215         return self._ppid
216
217     @property
218     def in_foreground(self):
219         """ Returns True if the command needs to be executed in foreground.
220         This means that command will be executed using 'execute' instead of
221         'run' ('run' executes a command in background and detached from the 
222         terminal)
223         
224         When using X11 forwarding option, the command can not run in background
225         and detached from a terminal, since we need to keep the terminal attached 
226         to interact with it.
227         """
228         return self.get("forwardX11") or self._in_foreground
229
230     def trace_filepath(self, filename):
231         return os.path.join(self.run_home, filename)
232
233     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
234         self.info("Retrieving '%s' trace %s " % (name, attr))
235
236         path = self.trace_filepath(name)
237         
238         command = "(test -f %s && echo 'success') || echo 'error'" % path
239         (out, err), proc = self.node.execute(command)
240
241         if (err and proc.poll()) or out.find("error") != -1:
242             msg = " Couldn't find trace %s " % name
243             self.error(msg, out, err)
244             return None
245     
246         if attr == TraceAttr.PATH:
247             return path
248
249         if attr == TraceAttr.ALL:
250             (out, err), proc = self.node.check_output(self.run_home, name)
251             
252             if proc.poll():
253                 msg = " Couldn't read trace %s " % name
254                 self.error(msg, out, err)
255                 return None
256
257             return out
258
259         if attr == TraceAttr.STREAM:
260             cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
261         elif attr == TraceAttr.SIZE:
262             cmd = "stat -c%%s %s " % path
263
264         (out, err), proc = self.node.execute(cmd)
265
266         if proc.poll():
267             msg = " Couldn't find trace %s " % name
268             self.error(msg, out, err)
269             return None
270         
271         if attr == TraceAttr.SIZE:
272             out = int(out.strip())
273
274         return out
275
276     def do_provision(self):
277         # create run dir for application
278         self.node.mkdir(self.run_home)
279    
280         # List of all the provision methods to invoke
281         steps = [
282             # upload sources
283             self.upload_sources,
284             # upload files
285             self.upload_files,
286             # upload binaries
287             self.upload_binaries,
288             # upload libraries
289             self.upload_libraries,
290             # upload code
291             self.upload_code,
292             # upload stdin
293             self.upload_stdin,
294             # install dependencies
295             self.install_dependencies,
296             # build
297             self.build,
298             # Install
299             self.install]
300
301         command = []
302
303         # Since provisioning takes a long time, before
304         # each step we check that the EC is still 
305         for step in steps:
306             if self.ec.abort:
307                 self.debug("Interrupting provisioning. EC says 'ABORT")
308                 return
309             
310             ret = step()
311             if ret:
312                 command.append(ret)
313
314         # upload deploy script
315         deploy_command = ";".join(command)
316         self.execute_deploy_command(deploy_command)
317
318         # upload start script
319         self.upload_start_command()
320        
321         self.info("Provisioning finished")
322
323         super(LinuxApplication, self).do_provision()
324
325     def upload_start_command(self, overwrite = False):
326         # Upload command to remote bash script
327         # - only if command can be executed in background and detached
328         command = self.get("command")
329
330         if command and not self.in_foreground:
331             self.info("Uploading command '%s'" % command)
332
333             # replace application specific paths in the command
334             command = self.replace_paths(command)
335             
336             # replace application specific paths in the environment
337             env = self.get("env")
338             env = env and self.replace_paths(env)
339
340             shfile = os.path.join(self.app_home, "start.sh")
341
342             self.node.upload_command(command, 
343                     shfile = shfile,
344                     env = env,
345                     overwrite = overwrite)
346
347     def execute_deploy_command(self, command):
348         if command:
349             # Upload the command to a bash script and run it
350             # in background ( but wait until the command has
351             # finished to continue )
352             shfile = os.path.join(self.app_home, "deploy.sh")
353             self.node.run_and_wait(command, self.run_home,
354                     shfile = shfile, 
355                     overwrite = False,
356                     pidfile = "deploy_pidfile", 
357                     ecodefile = "deploy_exitcode", 
358                     stdout = "deploy_stdout", 
359                     stderr = "deploy_stderr")
360
361     def upload_sources(self):
362         sources = self.get("sources")
363    
364         command = ""
365
366         if sources:
367             self.info("Uploading sources ")
368
369             sources = sources.split(' ')
370
371             # Separate sources that should be downloaded from 
372             # the web, from sources that should be uploaded from
373             # the local machine
374             command = []
375             for source in list(sources):
376                 if source.startswith("http") or source.startswith("https"):
377                     # remove the hhtp source from the sources list
378                     sources.remove(source)
379
380                     command.append( " ( " 
381                             # Check if the source already exists
382                             " ls ${SRC}/%(basename)s "
383                             " || ( "
384                             # If source doesn't exist, download it and check
385                             # that it it downloaded ok
386                             "   wget -c --directory-prefix=${SRC} %(source)s && "
387                             "   ls ${SRC}/%(basename)s "
388                             " ) ) " % {
389                                 "basename": os.path.basename(source),
390                                 "source": source
391                                 })
392
393             command = " && ".join(command)
394
395             # replace application specific paths in the command
396             command = self.replace_paths(command)
397        
398             if sources:
399                 sources = ' '.join(sources)
400                 self.node.upload(sources, self.node.src_dir, overwrite = False)
401
402         return command
403
404     def upload_files(self):
405         files = self.get("files")
406
407         if files:
408             self.info("Uploading files %s " % files)
409             self.node.upload(files, self.node.share_dir, overwrite = False)
410
411     def upload_libraries(self):
412         libs = self.get("libs")
413
414         if libs:
415             self.info("Uploading libraries %s " % libaries)
416             self.node.upload(libs, self.node.lib_dir, overwrite = False)
417
418     def upload_binaries(self):
419         bins = self.get("bins")
420
421         if bins:
422             self.info("Uploading binaries %s " % binaries)
423             self.node.upload(bins, self.node.bin_dir, overwrite = False)
424
425     def upload_code(self):
426         code = self.get("code")
427
428         if code:
429             self.info("Uploading code")
430
431             dst = os.path.join(self.app_home, "code")
432             self.node.upload(code, dst, overwrite = False, text = True)
433
434     def upload_stdin(self):
435         stdin = self.get("stdin")
436         if stdin:
437             # create dir for sources
438             self.info("Uploading stdin")
439             
440             # upload stdin file to ${SHARE_DIR} directory
441             basename = os.path.basename(stdin)
442             dst = os.path.join(self.node.share_dir, basename)
443             self.node.upload(stdin, dst, overwrite = False, text = True)
444
445             # create "stdin" symlink on ${APP_HOME} directory
446             command = "( cd %(app_home)s ; [ ! -f stdin ] &&  ln -s %(stdin)s stdin )" % ({
447                 "app_home": self.app_home, 
448                 "stdin": dst })
449
450             return command
451
452     def install_dependencies(self):
453         depends = self.get("depends")
454         if depends:
455             self.info("Installing dependencies %s" % depends)
456             return self.node.install_packages_command(depends)
457
458     def build(self):
459         build = self.get("build")
460
461         if build:
462             self.info("Building sources ")
463             
464             # replace application specific paths in the command
465             return self.replace_paths(build)
466
467     def install(self):
468         install = self.get("install")
469
470         if install:
471             self.info("Installing sources ")
472
473             # replace application specific paths in the command
474             return self.replace_paths(install)
475
476     def do_deploy(self):
477         # Wait until node is associated and deployed
478         node = self.node
479         if not node or node.state < ResourceState.READY:
480             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
481             self.ec.schedule(reschedule_delay, self.deploy)
482         else:
483             command = self.get("command") or ""
484             self.info("Deploying command '%s' " % command)
485             self.do_discover()
486             self.do_provision()
487
488             super(LinuxApplication, self).do_deploy()
489    
490     def do_start(self):
491         command = self.get("command")
492
493         self.info("Starting command '%s'" % command)
494
495         if not command:
496             # If no command was given (i.e. Application was used for dependency
497             # installation), then the application is directly marked as STOPPED
498             super(LinuxApplication, self).set_stopped()
499         else:
500             if self.in_foreground:
501                 self._run_in_foreground()
502             else:
503                 self._run_in_background()
504
505             super(LinuxApplication, self).do_start()
506
507     def _run_in_foreground(self):
508         command = self.get("command")
509         sudo = self.get("sudo") or False
510         x11 = self.get("forwardX11")
511         env = self.get("env")
512
513         # Command will be launched in foreground and attached to the
514         # terminal using the node 'execute' in non blocking mode.
515
516         # We save the reference to the process in self._proc 
517         # to be able to kill the process from the stop method.
518         # We also set blocking = False, since we don't want the
519         # thread to block until the execution finishes.
520         (out, err), self._proc = self.execute_command(command, 
521                 env = env,
522                 sudo = sudo,
523                 forward_x11 = x11,
524                 blocking = False)
525
526         if self._proc.poll():
527             self.error(msg, out, err)
528             raise RuntimeError, msg
529
530     def _run_in_background(self):
531         command = self.get("command")
532         env = self.get("env")
533         sudo = self.get("sudo") or False
534
535         stdout = "stdout"
536         stderr = "stderr"
537         stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
538                 else None
539
540         # Command will be run as a daemon in baground and detached from any
541         # terminal.
542         # The command to run was previously uploaded to a bash script
543         # during deployment, now we launch the remote script using 'run'
544         # method from the node.
545         cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
546         (out, err), proc = self.node.run(cmd, self.run_home, 
547             stdin = stdin, 
548             stdout = stdout,
549             stderr = stderr,
550             sudo = sudo)
551
552         # check if execution errors occurred
553         msg = " Failed to start command '%s' " % command
554         
555         if proc.poll():
556             self.error(msg, out, err)
557             raise RuntimeError, msg
558     
559         # Wait for pid file to be generated
560         pid, ppid = self.node.wait_pid(self.run_home)
561         if pid: self._pid = int(pid)
562         if ppid: self._ppid = int(ppid)
563
564         # If the process is not running, check for error information
565         # on the remote machine
566         if not self.pid or not self.ppid:
567             (out, err), proc = self.node.check_errors(self.run_home,
568                     stderr = stderr) 
569
570             # Out is what was written in the stderr file
571             if err:
572                 msg = " Failed to start command '%s' " % command
573                 self.error(msg, out, err)
574                 raise RuntimeError, msg
575     
576     def do_stop(self):
577         """ Stops application execution
578         """
579         command = self.get('command') or ''
580
581         if self.state == ResourceState.STARTED:
582         
583             self.info("Stopping command '%s' " % command)
584         
585             # If the command is running in foreground (it was launched using
586             # the node 'execute' method), then we use the handler to the Popen
587             # process to kill it. Else we send a kill signal using the pid and ppid
588             # retrieved after running the command with the node 'run' method
589             if self._proc:
590                 self._proc.kill()
591             else:
592                 # Only try to kill the process if the pid and ppid
593                 # were retrieved
594                 if self.pid and self.ppid:
595                     (out, err), proc = self.node.kill(self.pid, self.ppid,
596                             sudo = self._sudo_kill)
597
598                     # TODO: check if execution errors occurred
599                     if (proc and proc.poll()) or err:
600                         msg = " Failed to STOP command '%s' " % self.get("command")
601                         self.error(msg, out, err)
602         
603             super(LinuxApplication, self).do_stop()
604
605     def do_release(self):
606         self.info("Releasing resource")
607
608         tear_down = self.get("tearDown")
609         if tear_down:
610             self.node.execute(tear_down)
611
612         self.do_stop()
613
614         super(LinuxApplication, self).do_release()
615         
616     @property
617     def state(self):
618         """ Returns the state of the application
619         """
620         if self._state == ResourceState.STARTED:
621             if self.in_foreground:
622                 # Check if the process we used to execute the command
623                 # is still running ...
624                 retcode = self._proc.poll()
625
626                 # retcode == None -> running
627                 # retcode > 0 -> error
628                 # retcode == 0 -> finished
629                 if retcode:
630                     out = ""
631                     msg = " Failed to execute command '%s'" % self.get("command")
632                     err = self._proc.stderr.read()
633                     self.error(msg, out, err)
634                     self.do_fail()
635
636                 elif retcode == 0:
637                     self.set_stopped()
638             else:
639                 # We need to query the status of the command we launched in 
640                 # background. In order to avoid overwhelming the remote host and
641                 # the local processor with too many ssh queries, the state is only
642                 # requested every 'state_check_delay' seconds.
643                 state_check_delay = 0.5
644                 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
645                     if self.pid and self.ppid:
646                         # Make sure the process is still running in background
647                         status = self.node.status(self.pid, self.ppid)
648
649                         if status == ProcStatus.FINISHED:
650                             # If the program finished, check if execution
651                             # errors occurred
652                             (out, err), proc = self.node.check_errors(
653                                     self.run_home)
654
655                             if err:
656                                 msg = "Failed to execute command '%s'" % \
657                                         self.get("command")
658                                 self.error(msg, out, err)
659                                 self.do_fail()
660                             else:
661                                 self.set_stopped()
662
663                     self._last_state_check = tnow()
664
665         return self._state
666
667     def execute_command(self, command, 
668             env = None,
669             sudo = False,
670             forward_x11 = False,
671             blocking = False):
672
673         environ = ""
674         if env:
675             environ = self.node.format_environment(env, inline = True)
676         command = environ + command
677         command = self.replace_paths(command)
678
679         return self.node.execute(command,
680                 sudo = sudo,
681                 forward_x11 = forward_x11,
682                 blocking = blocking)
683
684     def replace_paths(self, command):
685         """
686         Replace all special path tags with shell-escaped actual paths.
687         """
688         return ( command
689             .replace("${USR}", self.node.usr_dir)
690             .replace("${LIB}", self.node.lib_dir)
691             .replace("${BIN}", self.node.bin_dir)
692             .replace("${SRC}", self.node.src_dir)
693             .replace("${SHARE}", self.node.share_dir)
694             .replace("${EXP}", self.node.exp_dir)
695             .replace("${EXP_HOME}", self.node.exp_home)
696             .replace("${APP_HOME}", self.app_home)
697             .replace("${RUN_HOME}", self.run_home)
698             .replace("${NODE_HOME}", self.node.node_home)
699             .replace("${HOME}", self.node.home_dir)
700             )
701
702     def valid_connection(self, guid):
703         # TODO: Validate!
704         return True
705